You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 15:40:02 UTC

[3/5] flink git commit: [FLINK-1418] [apis] Change print() to print on the client and to eagerly execute the program.

[FLINK-1418] [apis] Change print() to print on the client and to eagerly execute the program.

print() now uses collect() internally


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9c15620
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9c15620
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9c15620

Branch: refs/heads/master
Commit: e9c1562034dabc34fa46d4fd8411321db0a6c637
Parents: 939e3fc
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Apr 28 10:52:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 13:12:45 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java | 38 ++++++++++++--------
 .../flink/api/java/MultipleInvokationsTest.java |  2 +-
 .../optimizer/DistinctCompilationTest.java      |  6 ++--
 .../java/GroupReduceCompilationTest.java        | 12 +++----
 .../optimizer/java/ReduceCompilationTest.java   |  8 ++---
 .../org/apache/flink/api/scala/DataSet.scala    |  9 ++---
 6 files changed, 44 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 18cb01d..4f2942e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.api.java;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
@@ -50,7 +47,6 @@ import org.apache.flink.api.java.functions.SelectByMaxFunction;
 import org.apache.flink.api.java.functions.SelectByMinFunction;
 import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
 import org.apache.flink.api.java.operators.AggregateOperator;
@@ -86,8 +82,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ExceptionUtils;
 
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * A DataSet represents a collection of elements of the same type.<br/>
@@ -1336,11 +1335,17 @@ public abstract class DataSet<T> {
 	/**
 	 * Writes a DataSet to the standard output stream (stdout).<br/>
 	 * For each element of the DataSet the result of {@link Object#toString()} is written.
-	 * 
-	 *  @return The DataSink that writes the DataSet.
+	 * This triggers execute() automatically.
 	 */
-	public DataSink<T> print() {
-		return output(new PrintingOutputFormat<T>(false));
+	public void print() {
+		try {
+			List<T> elements = this.collect();
+			for (T e: elements) {
+				System.out.println(e);
+			}
+		} catch (Exception e) {
+			System.out.println("Could not retrieve values for printing: " + ExceptionUtils.stringifyException(e));
+		}
 	}
 
 	/**
@@ -1357,11 +1362,16 @@ public abstract class DataSet<T> {
 	/**
 	 * Writes a DataSet to the standard error stream (stderr).<br/>
 	 * For each element of the DataSet the result of {@link Object#toString()} is written.
-	 * 
-	 * @return The DataSink that writes the DataSet.
 	 */
-	public DataSink<T> printToErr() {
-		return output(new PrintingOutputFormat<T>(true));
+	public void printToErr() {
+		try {
+			List<T> elements = this.collect();
+			for (T e: elements) {
+				System.err.println(e);
+			}
+		} catch (Exception e) {
+			System.err.println("Could not retrieve values for printing: " + e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
index 3638f70..c0ca6c2 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
@@ -36,7 +36,7 @@ public class MultipleInvokationsTest {
 			// ----------- Execution 1 ---------------
 			
 			DataSet<String> data = env.fromElements("Some", "test", "data").name("source1");
-			data.print().name("print1");
+			data.print();
 			data.output(new DiscardingOutputFormat<String>()).name("output1");
 			
 			{

http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 5827d9c..973f402 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -86,6 +85,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
+			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}
@@ -146,6 +146,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
+			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}
@@ -198,8 +199,9 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
+			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index 1bd4b8a..8fb4ef0 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -51,7 +51,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
 				public void reduce(Iterable<Double> values, Collector<Double> out) {}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -97,7 +97,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
-			reduced.print().name("sink");
+			reduced.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -148,7 +148,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
 				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -199,7 +199,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
-			reduced.print().name("sink");
+			reduced.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -257,7 +257,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
 				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -317,7 +317,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
-			reduced.print().name("sink");
+			reduced.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index 4197abb..2958f1a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -53,7 +53,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return value1 + value2;
 				}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -98,7 +98,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return value1 + value2;
 				}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -151,7 +151,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return null;
 				}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -211,7 +211,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return null;
 				}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 15d2f4e..5198157 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1323,9 +1323,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   /**
    * Writes a DataSet to the standard output stream (stdout). This uses [[AnyRef.toString]] on
    * each element.
+   * This triggers execute() automatically.
    */
-  def print(): DataSink[T] = {
-    output(new PrintingOutputFormat[T](false))
+  def print() = {
+    javaSet.print()
   }
 
   /**
@@ -1342,8 +1343,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * Writes a DataSet to the standard error stream (stderr). This uses [[AnyRef.toString]] on
    * each element.
    */
-  def printToErr(): DataSink[T] = {
-    output(new PrintingOutputFormat[T](true))
+  def printToErr() = {
+    javaSet.printToErr()
   }
 
   /**