You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 15:40:02 UTC
[3/5] flink git commit: [FLINK-1418] [apis] Change print() to print
on the client and to eagerly execute the program.
[FLINK-1418] [apis] Change print() to print on the client and to eagerly execute the program.
print() now uses collect() internally
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9c15620
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9c15620
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9c15620
Branch: refs/heads/master
Commit: e9c1562034dabc34fa46d4fd8411321db0a6c637
Parents: 939e3fc
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Apr 28 10:52:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 13:12:45 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 38 ++++++++++++--------
.../flink/api/java/MultipleInvokationsTest.java | 2 +-
.../optimizer/DistinctCompilationTest.java | 6 ++--
.../java/GroupReduceCompilationTest.java | 12 +++----
.../optimizer/java/ReduceCompilationTest.java | 8 ++---
.../org/apache/flink/api/scala/DataSet.scala | 9 ++---
6 files changed, 44 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 18cb01d..4f2942e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -18,10 +18,7 @@
package org.apache.flink.api.java;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
@@ -50,7 +47,6 @@ import org.apache.flink.api.java.functions.SelectByMaxFunction;
import org.apache.flink.api.java.functions.SelectByMinFunction;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
import org.apache.flink.api.java.operators.AggregateOperator;
@@ -86,8 +82,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ExceptionUtils;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
/**
* A DataSet represents a collection of elements of the same type.<br/>
@@ -1336,11 +1335,17 @@ public abstract class DataSet<T> {
/**
* Writes a DataSet to the standard output stream (stdout).<br/>
* For each element of the DataSet the result of {@link Object#toString()} is written.
- *
- * @return The DataSink that writes the DataSet.
+ * This triggers execute() automatically.
*/
- public DataSink<T> print() {
- return output(new PrintingOutputFormat<T>(false));
+ public void print() {
+ try {
+ List<T> elements = this.collect();
+ for (T e: elements) {
+ System.out.println(e);
+ }
+ } catch (Exception e) {
+ System.out.println("Could not retrieve values for printing: " + ExceptionUtils.stringifyException(e));
+ }
}
/**
@@ -1357,11 +1362,16 @@ public abstract class DataSet<T> {
/**
* Writes a DataSet to the standard error stream (stderr).<br/>
* For each element of the DataSet the result of {@link Object#toString()} is written.
- *
- * @return The DataSink that writes the DataSet.
*/
- public DataSink<T> printToErr() {
- return output(new PrintingOutputFormat<T>(true));
+ public void printToErr() {
+ try {
+ List<T> elements = this.collect();
+ for (T e: elements) {
+ System.err.println(e);
+ }
+ } catch (Exception e) {
+ System.err.println("Could not retrieve values for printing: " + e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
index 3638f70..c0ca6c2 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
@@ -36,7 +36,7 @@ public class MultipleInvokationsTest {
// ----------- Execution 1 ---------------
DataSet<String> data = env.fromElements("Some", "test", "data").name("source1");
- data.print().name("print1");
+ data.print();
data.output(new DiscardingOutputFormat<String>()).name("output1");
{
http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 5827d9c..973f402 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -86,6 +85,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
+ System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
@@ -146,6 +146,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
+ System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
@@ -198,8 +199,9 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
+ System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index 1bd4b8a..8fb4ef0 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -51,7 +51,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
public void reduce(Iterable<Double> values, Collector<Double> out) {}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -97,7 +97,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
}).name("reducer");
reduced.setCombinable(true);
- reduced.print().name("sink");
+ reduced.print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -148,7 +148,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -199,7 +199,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
}).name("reducer");
reduced.setCombinable(true);
- reduced.print().name("sink");
+ reduced.print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -257,7 +257,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -317,7 +317,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
}).name("reducer");
reduced.setCombinable(true);
- reduced.print().name("sink");
+ reduced.print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index 4197abb..2958f1a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -53,7 +53,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
return value1 + value2;
}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -98,7 +98,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
return value1 + value2;
}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -151,7 +151,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
return null;
}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -211,7 +211,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
return null;
}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 15d2f4e..5198157 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1323,9 +1323,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
/**
* Writes a DataSet to the standard output stream (stdout). This uses [[AnyRef.toString]] on
* each element.
+ * This triggers execute() automatically.
*/
- def print(): DataSink[T] = {
- output(new PrintingOutputFormat[T](false))
+ def print() = {
+ javaSet.print()
}
/**
@@ -1342,8 +1343,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* Writes a DataSet to the standard error stream (stderr). This uses [[AnyRef.toString]] on
* each element.
*/
- def printToErr(): DataSink[T] = {
- output(new PrintingOutputFormat[T](true))
+ def printToErr() = {
+ javaSet.printToErr()
}
/**