You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 15:40:03 UTC

[4/5] flink git commit: [FLINK-1418] [apis] Minor cleanups for eager on-client print() statement.

[FLINK-1418] [apis] Minor cleanups for eager on-client print() statement.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6220f34b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6220f34b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6220f34b

Branch: refs/heads/master
Commit: 6220f34b626e9ed92b190042e0d69020e19461f1
Parents: 78d954b
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 21 13:49:16 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 14:50:32 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java | 51 ++++++++++++--------
 .../flink/api/java/ExecutionEnvironment.java    | 10 ++--
 .../org/apache/flink/api/scala/DataSet.scala    | 41 ++++++++++------
 .../flink/api/scala/ExecutionEnvironment.scala  |  6 +--
 4 files changed, 66 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 133a083..157c666 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -338,7 +338,7 @@ public abstract class DataSet<T> {
 	 *
 	 * @see org.apache.flink.api.java.operators.AggregateOperator
 	 */
-	public AggregateOperator<T> sum (int field) {
+	public AggregateOperator<T> sum(int field) {
 		return aggregate(Aggregations.SUM, field);
 	}
 
@@ -997,7 +997,7 @@ public abstract class DataSet<T> {
 	 * <pre>
 	 * {@code
 	 * DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
-	 *                                                  initialState.iterateDelta(initialFeedbakSet, 100, 0);
+	 *                                                  initialState.iterateDelta(initialFeedbackSet, 100, 0);
 	 * 
 	 * DataSet<Tuple2<Long, Long>> delta = iteration.groupBy(0).aggregate(Aggregations.AVG, 1)
 	 *                                              .join(iteration.getSolutionSet()).where(0).equalTo(0)
@@ -1236,7 +1236,7 @@ public abstract class DataSet<T> {
 	 * @see TextOutputFormat
 	 */
 	public DataSink<String> writeAsFormattedText(String filePath, TextFormatter<T> formatter) {
-		return this.map(new FormattingMapper<T>(formatter)).writeAsText(filePath);
+		return map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath);
 	}
 
 	/**
@@ -1250,8 +1250,8 @@ public abstract class DataSet<T> {
 	 *
 	 * @see TextOutputFormat
 	 */
-	public DataSink<String> writeAsFormattedText(String filePath, WriteMode writeMode, final TextFormatter<T> formatter) {
-		return this.map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath, writeMode);
+	public DataSink<String> writeAsFormattedText(String filePath, WriteMode writeMode, TextFormatter<T> formatter) {
+		return map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath, writeMode);
 	}
 	
 	/**
@@ -1333,11 +1333,16 @@ public abstract class DataSet<T> {
 	}
 	
 	/**
-	 * Writes a DataSet to the standard output stream (stdout).<br/>
-	 * For each element of the DataSet the result of {@link Object#toString()} is written.
-	 * This triggers execute() automatically.
+	 * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
+	 * the print() method. For programs that are executed in a cluster, this method needs
+	 * to gather the contents of the DataSet back to the client, to print it there.
+	 * 
+	 * <p>The string written for each element is defined by the {@link Object#toString()} method.</p>
+	 * 
+	 * <p>This method immediately triggers the program execution, similar to the
+	 * {@link #collect()} and {@link #count()} methods.</p>
 	 */
-	public void print() throws Exception{
+	public void print() throws Exception {
 		List<T> elements = this.collect();
 		for (T e: elements) {
 			System.out.println(e);
@@ -1345,6 +1350,23 @@ public abstract class DataSet<T> {
 	}
 
 	/**
+	 * Prints the elements in a DataSet to the standard error stream {@link System#err} of the JVM that calls
+	 * the print() method. For programs that are executed in a cluster, this method needs
+	 * to gather the contents of the DataSet back to the client, to print it there.
+	 *
+	 * <p>The string written for each element is defined by the {@link Object#toString()} method.</p>
+	 *
+	 * <p>This method immediately triggers the program execution, similar to the
+	 * {@link #collect()} and {@link #count()} methods.</p>
+	 */
+	public void printToErr() throws Exception {
+		List<T> elements = this.collect();
+		for (T e: elements) {
+			System.err.println(e);
+		}
+	}
+	
+	/**
 	 * Writes a DataSet to the standard output stream (stdout).<br/>
 	 * For each element of the DataSet the result of {@link Object#toString()} is written.
 	 *
@@ -1354,17 +1376,6 @@ public abstract class DataSet<T> {
 	public DataSink<T> print(String sinkIdentifier) {
 		return output(new PrintingOutputFormat<T>(sinkIdentifier, false));
 	}
-	
-	/**
-	 * Writes a DataSet to the standard error stream (stderr).<br/>
-	 * For each element of the DataSet the result of {@link Object#toString()} is written.
-	 */
-	public void printToErr() throws Exception{
-		List<T> elements = this.collect();
-		for (T e: elements) {
-			System.err.println(e);
-		}
-	}
 
 	/**
 	 * Writes a DataSet to the standard error stream (stderr).<br/>

http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 0f65b79..9c76409 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -94,9 +94,6 @@ import com.google.common.base.Preconditions;
  */
 public abstract class ExecutionEnvironment {
 
-
-	protected JobExecutionResult lastJobExecutionResult;
-
 	private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
 	
 	/** The environment of the context (local by default, cluster if invoked through command line) */
@@ -116,8 +113,11 @@ public abstract class ExecutionEnvironment {
 	
 	private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
 
-	private ExecutionConfig config = new ExecutionConfig();
+	private final ExecutionConfig config = new ExecutionConfig();
 
+	/** Result from the latest execution, to be make it retrievable when using eager execution methods */
+	protected JobExecutionResult lastJobExecutionResult;
+	
 	/** Flag to indicate whether sinks have been cleared in previous executions */
 	private boolean wasExecuted = false;
 
@@ -240,6 +240,8 @@ public abstract class ExecutionEnvironment {
 
 	/**
 	 * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
+	 * 
+	 * @return The execution result from the latest job execution.
 	 */
 	public JobExecutionResult getLastJobExecutionResult(){
 		return this.lastJobExecutionResult;

http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 5198157..e283e95 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1319,16 +1319,37 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def output(outputFormat: OutputFormat[T]): DataSink[T] = {
     javaSet.output(outputFormat)
   }
-
+  
   /**
-   * Writes a DataSet to the standard output stream (stdout). This uses [[AnyRef.toString]] on
-   * each element.
-   * This triggers execute() automatically.
+   * Prints the elements in a DataSet to the standard output stream [[System.out]] of the
+   * JVM that calls the print() method. For programs that are executed in a cluster, this
+   * method needs to gather the contents of the DataSet back to the client, to print it
+   * there.
+   *
+   * The string written for each element is defined by the [[AnyRef.toString]] method.
+   *
+   * This method immediately triggers the program execution, similar to the
+   * [[collect()]] and [[count()]] methods.
    */
-  def print() = {
+  def print(): Unit = {
     javaSet.print()
   }
-
+  
+  /**
+   * Prints the elements in a DataSet to the standard error stream [[System.err]] of the
+   * JVM that calls the print() method. For programs that are executed in a cluster, this
+   * method needs to gather the contents of the DataSet back to the client, to print it
+   * there.
+   *
+   * The string written for each element is defined by the [[AnyRef.toString]] method.
+   *
+   * This method immediately triggers the program execution, similar to the
+   * [[collect()]] and [[count()]] methods.
+   */
+  def printToErr(): Unit = {
+    javaSet.printToErr()
+  }
+  
   /**
    * *
    * Writes a DataSet to the standard output stream (stdout) with a sink identifier prefixed.
@@ -1340,14 +1361,6 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
-   * Writes a DataSet to the standard error stream (stderr). This uses [[AnyRef.toString]] on
-   * each element.
-   */
-  def printToErr() = {
-    javaSet.printToErr()
-  }
-
-  /**
    * Writes a DataSet to the standard error stream (stderr) with a sink identifier prefixed.
    * This uses [[AnyRef.toString]] on each element.
    * @param sinkIdentifier The string to prefix the output with.

http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 6cc327a..e01ff3b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -130,11 +130,9 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   def getId: UUID = {
     javaEnv.getId
   }
-
-
+  
   /**
-   * retrieves JobExecutionResult from last job execution (for "eager" print)
-   * @return JobExecutionResult form last job execution
+   * Gets the JobExecutionResult of the last executed job.
    */
   def getLastJobExecutionResult = javaEnv.getLastJobExecutionResult