You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 15:40:00 UTC
[1/5] flink git commit: [FLINK-1418] [apis] Fix eager print() and
adjust all tests and examples to not fail due to "eager" print method
Repository: flink
Updated Branches:
refs/heads/master 939e3fc40 -> ad1d9362c
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
index ff429b8..65b9756 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
@@ -55,7 +56,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
input1
.join(input2, JoinHint.REPARTITION_HASH_FIRST).where(1).equalTo(0).withPartitioner(partitioner)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -114,7 +115,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
input1
.join(input2, JoinHint.REPARTITION_HASH_FIRST)
.where("b").equalTo("a").withPartitioner(partitioner)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Pojo2, Pojo3>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -176,7 +177,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
.where(new Pojo2KeySelector())
.equalTo(new Pojo3KeySelector())
.withPartitioner(partitioner)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Pojo2, Pojo3>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -248,7 +249,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
grouped
.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
.with(new DummyFlatJoinFunction<Tuple3<Long,Long,Long>>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
index ab83dba..25b17f8 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer.java;
import static org.junit.Assert.fail;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
@@ -54,7 +55,7 @@ public class DeltaIterationDependenciesTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> result = deltaIteration.closeWith(delta, nextWorkset);
- result.print();
+ result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
index 96758b1..e5b6ad5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer.java;
import static org.junit.Assert.*;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
@@ -48,7 +49,7 @@ public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
data.distinct(0)
.groupBy(0)
.sum(1)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -84,7 +85,7 @@ public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
data.distinct(1)
.groupBy(0)
.sum(1)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index 8fb4ef0..6b49dd4 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -51,7 +52,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
public void reduce(Iterable<Double> values, Collector<Double> out) {}
}).name("reducer")
- .print();
+ .output(new DiscardingOutputFormat<Double>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -97,7 +98,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
}).name("reducer");
reduced.setCombinable(true);
- reduced.print();
+ reduced.output(new DiscardingOutputFormat<Long>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -148,7 +149,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer")
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -199,7 +200,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
}).name("reducer");
reduced.setCombinable(true);
- reduced.print();
+ reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -257,7 +258,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer")
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -317,7 +318,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
}).name("reducer");
reduced.setCombinable(true);
- reduced.print();
+ reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
index 796d4ab..bcfb2ef 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -48,7 +49,7 @@ public class IterationCompilerTest extends CompilerTestBase {
env.setParallelism(43);
IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100);
- iteration.closeWith(iteration).print();
+ iteration.closeWith(iteration).output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -76,7 +77,7 @@ public class IterationCompilerTest extends CompilerTestBase {
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input, 100, 0);
iter.closeWith(iter.getWorkset(), iter.getWorkset())
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -99,7 +100,7 @@ public class IterationCompilerTest extends CompilerTestBase {
iteration.closeWith(
iteration.map(new IdentityMapper<Long>()).union(iteration.map(new IdentityMapper<Long>())))
- .print();
+ .output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -150,7 +151,7 @@ public class IterationCompilerTest extends CompilerTestBase {
.union(
iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>()))
)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
index 14d863d..b3718b0 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
@@ -131,7 +133,7 @@ public class JoinTranslationTest extends CompilerTestBase {
DataSet<Long> i1 = env.generateSequence(1, 1000);
DataSet<Long> i2 = env.generateSequence(1, 1000);
- i1.join(i2, hint).where(new IdentityKeySelector<Long>()).equalTo(new IdentityKeySelector<Long>()).print();
+ i1.join(i2, hint).where(new IdentityKeySelector<Long>()).equalTo(new IdentityKeySelector<Long>()).output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan plan = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
index 3f18e62..e1e6b5f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -45,7 +46,7 @@ public class OpenIterationTest extends CompilerTestBase {
DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>());
- mapped.print();
+ mapped.output(new DiscardingOutputFormat<Long>());
try {
env.createProgramPlan();
@@ -72,9 +73,9 @@ public class OpenIterationTest extends CompilerTestBase {
DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>());
- iteration.closeWith(mapped).print();
+ iteration.closeWith(mapped).output(new DiscardingOutputFormat<Long>());
- mapped.print();
+ mapped.output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
@@ -104,7 +105,7 @@ public class OpenIterationTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>());
- mapped.print();
+ mapped.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
try {
env.createProgramPlan();
@@ -132,7 +133,7 @@ public class OpenIterationTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> mapped = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
- mapped.print();
+ mapped.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
try {
env.createProgramPlan();
@@ -164,7 +165,7 @@ public class OpenIterationTest extends CompilerTestBase {
.where(0).equalTo(0).projectFirst(1).projectSecond(0);
iteration.closeWith(joined, joined)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
index 9c2d0d2..7f5c209 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -49,8 +50,8 @@ public class PartitionOperatorTest extends CompilerTestBase {
public int partition(Long key, int numPartitions) { return key.intValue(); }
}, 1)
.groupBy(1)
- .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
- .print();
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index 2958f1a..942aa47 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
@@ -53,7 +54,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
return value1 + value2;
}
}).name("reducer")
- .print();
+ .output(new DiscardingOutputFormat<Double>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -98,7 +99,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
return value1 + value2;
}
}).name("reducer")
- .print();
+ .output(new DiscardingOutputFormat<Long>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -151,7 +152,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
return null;
}
}).name("reducer")
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -211,7 +212,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
return null;
}
}).name("reducer")
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
index 46eb48a..3d6d90b 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
@@ -294,7 +295,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
joinedWithSolutionSet;
iter.closeWith(nextSolutionSet, nextWorkset)
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
return env.createProgramPlan();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
index fb7a80f..b23bf35 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.Optimizer;
@@ -48,7 +49,7 @@ public class CoGroupOnConflictingPartitioningsTest extends CompilerTestBase {
input.coGroup(input).where(0).equalTo(0)
.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
.withParameters(cfg)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
Plan p = env.createProgramPlan();
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
index 8a4786f..a4e520b 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.Optimizer;
@@ -46,7 +47,7 @@ public class JoinOnConflictingPartitioningsTest extends CompilerTestBase {
input.join(input).where(0).equalTo(0)
.withParameters(cfg)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
Plan p = env.createProgramPlan();
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index efa1e88..6cc327a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -131,6 +131,13 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.getId
}
+
+ /**
+ * retrieves JobExecutionResult from last job execution (for "eager" print)
+ * @return JobExecutionResult form last job execution
+ */
+ def getLastJobExecutionResult = javaEnv.getLastJobExecutionResult
+
/**
* Gets the UUID by which this environment is identified, as a string.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
index a1bd2e0..b4a27b6 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
@@ -19,6 +19,7 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
import org.apache.flink.api.java.operators.CrossOperator.ProjectCross;
@@ -296,7 +297,7 @@ public abstract class PlanBinder<INFO extends OperationInfo> {
int parentID = (Integer) receiver.getRecord(true);
DataSet parent = (DataSet) sets.get(parentID);
boolean toError = (Boolean) receiver.getRecord();
- (toError ? parent.printToErr() : parent.print()).name("PrintSink");
+ parent.output(new PrintingOutputFormat(toError));
}
private void createBroadcastVariable() throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
index d0b0164..018daf8 100644
--- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
+++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
@@ -59,7 +60,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
- result.print();
+ result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
}
Plan p = env.createProgramPlan("Spargel Connected Components");
@@ -134,7 +135,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(vcIter);
- result.print();
+ result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
}
Plan p = env.createProgramPlan("Spargel Connected Components");
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
index 2840914..7189bbe 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
@@ -25,8 +25,6 @@ import org.apache.flink.api.java.ExecutionEnvironmentFactory;
public class CollectionTestEnvironment extends CollectionEnvironment {
- protected JobExecutionResult latestResult;
-
@Override
public JobExecutionResult execute() throws Exception {
return execute("test job");
@@ -35,7 +33,7 @@ public class CollectionTestEnvironment extends CollectionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
JobExecutionResult result = super.execute(jobName);
- this.latestResult = result;
+ this.lastJobExecutionResult = result;
return result;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 2214000..e639c80 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -119,7 +119,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
// call the test program
try {
testProgram();
- this.latestExecutionResult = env.latestResult;
+ this.latestExecutionResult = env.getLastJobExecutionResult();
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -171,7 +171,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
// call the test program
try {
testProgram();
- this.latestExecutionResult = env.latestResult;
+ this.latestExecutionResult = env.getLastJobExecutionResult();
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -224,7 +224,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
// call the test program
try {
testProgram();
- this.latestExecutionResult = env.latestResult;
+ this.latestExecutionResult = env.getLastJobExecutionResult();
}
catch (Exception e) {
System.err.println(e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index cf1caeb..25f2c83 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -36,9 +36,6 @@ public class TestEnvironment extends ExecutionEnvironment {
private final ForkableFlinkMiniCluster executor;
- protected JobExecutionResult latestResult;
-
-
public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
this.executor = executor;
setParallelism(parallelism);
@@ -54,8 +51,8 @@ public class TestEnvironment extends ExecutionEnvironment {
SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
- this.latestResult = result.toJobExecutionResult(getClass().getClassLoader());
- return this.latestResult;
+ this.lastJobExecutionResult = result.toJobExecutionResult(getClass().getClassLoader());
+ return this.lastJobExecutionResult;
}
catch (Exception e) {
System.err.println(e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
index 27c1644..aea448f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.operators.DriverStrategy;
@@ -58,8 +59,8 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
DataSet<Tuple2<Long, Double>> result = constructPlan(inputData, 10);
// add two sinks, to test the case of branching after an iteration
- result.print();
- result.print();
+ result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
+ result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
index 4edd68e..a3b7572 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
@@ -23,6 +23,7 @@ import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
import static org.junit.Assert.fail;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -85,7 +86,7 @@ public class PageRankCompilerTest extends CompilerTestBase{
// termination condition
.filter(new EpsilonFilter()));
- finalPageRanks.print();
+ finalPageRanks.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
// get the plan and compile it
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
index 25cc089..2775d09 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.junit.Test
import org.apache.flink.api.common.InvalidProgramException
@@ -37,7 +38,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
(result, ws)
}
- iteration.print()
+ iteration.output(new DiscardingOutputFormat[(Int, String)])
}
@Test
@@ -51,7 +52,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
(result, ws)
}
- iteration.print()
+ iteration.output(new DiscardingOutputFormat[(Int,String)])
}
@Test(expected = classOf[InvalidProgramException])
@@ -65,7 +66,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
(result, ws)
}
- iteration.print()
+ iteration.output(new DiscardingOutputFormat[(Int,String)])
}
@Test(expected = classOf[InvalidProgramException])
@@ -79,7 +80,8 @@ class DeltaIterationSanityCheckTest extends Serializable {
(result, ws)
}
- iteration.print() }
+ iteration.output(new DiscardingOutputFormat[(Int,String)])
+ }
@Test(expected = classOf[InvalidProgramException])
def testIncorrectJoinWithSolution3(): Unit = {
@@ -92,7 +94,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
(result, ws)
}
- iteration.print()
+ iteration.output(new DiscardingOutputFormat[(Int,String)])
}
@Test
@@ -106,7 +108,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
(result, ws)
}
- iteration.print()
+ iteration.output(new DiscardingOutputFormat[(Int,String)])
}
@Test
@@ -120,7 +122,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
(result, ws)
}
- iteration.print()
+ iteration.output(new DiscardingOutputFormat[(Int,String)])
}
@Test(expected = classOf[InvalidProgramException])
@@ -134,7 +136,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
(result, ws)
}
- iteration.print()
+ iteration.output(new DiscardingOutputFormat[(Int,String)])
}
@Test(expected = classOf[InvalidProgramException])
@@ -148,7 +150,8 @@ class DeltaIterationSanityCheckTest extends Serializable {
(result, ws)
}
- iteration.print() }
+ iteration.output(new DiscardingOutputFormat[(Int,String)])
+ }
@Test(expected = classOf[InvalidProgramException])
def testIncorrectCoGroupWithSolution3(): Unit = {
@@ -161,6 +164,6 @@ class DeltaIterationSanityCheckTest extends Serializable {
(result, ws)
}
- iteration.print()
+ iteration.output(new DiscardingOutputFormat[(Int,String)])
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
index 3fefa01..97a0f87 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.compiler
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.optimizer.util.CompilerTestBase
import org.junit.Test
import org.junit.Assert._
@@ -38,8 +39,8 @@ class PartitionOperatorTranslationTest extends CompilerTestBase {
def partition(key: Long, numPartitions: Int): Int = key.intValue()
}, 1)
.groupBy(1).reduceGroup( x => x)
- .print()
-
+ .output(new DiscardingOutputFormat[Iterator[(Long, Long)]])
+
val p = env.createProgramPlan()
val op = compileNoStats(p)
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
index eecc347..cc2c81e 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.api.scala.functions
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.junit.Assert._
import org.apache.flink.api.common.functions.RichJoinFunction
import org.apache.flink.api.common.functions.RichMapFunction
@@ -46,7 +47,8 @@ class SemanticPropertiesTranslationTest {
val env = ExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements((3L, "test", 42))
- input.map(new WildcardForwardMapper[(Long, String, Int)]).print()
+ input.map(new WildcardForwardMapper[(Long, String, Int)])
+ .output(new DiscardingOutputFormat[(Long, String, Int)])
val plan = env.createProgramPlan()
@@ -83,7 +85,8 @@ class SemanticPropertiesTranslationTest {
val env = ExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements((3L, "test", 42))
- input.map(new IndividualForwardMapper[Long, String, Int]).print()
+ input.map(new IndividualForwardMapper[Long, String, Int])
+ .output(new DiscardingOutputFormat[(Long, String, Int)])
val plan = env.createProgramPlan()
@@ -120,7 +123,8 @@ class SemanticPropertiesTranslationTest {
val env = ExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements((3L, "test", 42))
- input.map(new FieldTwoForwardMapper[Long, String, Int]).print()
+ input.map(new FieldTwoForwardMapper[Long, String, Int])
+ .output(new DiscardingOutputFormat[(Long, String, Int)])
val plan = env.createProgramPlan()
@@ -160,7 +164,8 @@ class SemanticPropertiesTranslationTest {
val input2 = env.fromElements((3L, 3.1415))
input1.join(input2).where(0).equalTo(0)(
- new ForwardingTupleJoin[Long, String, Long, Double]).print()
+ new ForwardingTupleJoin[Long, String, Long, Double])
+ .output(new DiscardingOutputFormat[(String, Long)])
val plan = env.createProgramPlan()
val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
@@ -204,7 +209,8 @@ class SemanticPropertiesTranslationTest {
val input2 = env.fromElements((3L, 42))
input1.join(input2).where(0).equalTo(0)(
- new ForwardingBasicJoin[(Long, String), (Long, Int)]).print()
+ new ForwardingBasicJoin[(Long, String), (Long, Int)])
+ .output(new DiscardingOutputFormat[((Long, String), (Long, Int))])
val plan = env.createProgramPlan()
val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
index 425cff6..6babbe7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.operators.GenericDataSourceBase
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.scala._
import org.junit.Assert.{assertEquals, assertTrue, fail}
import org.junit.Test
@@ -37,7 +38,8 @@ class AggregateTranslationTest {
val initialData = env.fromElements((3.141592, "foobar", 77L))
- initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).print()
+ initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2)
+ .output(new DiscardingOutputFormat[(Double, String, Long)])
val p: Plan = env.createProgramPlan()
val sink = p.getDataSinks.iterator.next
@@ -55,6 +57,7 @@ class AggregateTranslationTest {
System.err.println(e.getMessage)
e.printStackTrace()
fail("Test caused an error: " + e.getMessage)
+
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
index 8d75f2e..4d85c58 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.operators.translation
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.optimizer.util.CompilerTestBase
import org.junit.Assert._
import org.junit.Test
@@ -46,7 +47,7 @@ class CoGroupCustomPartitioningTest extends CompilerTestBase {
.coGroup(input2)
.where(1).equalTo(0)
.withPartitioner(partitioner)
- .print()
+ .output(new DiscardingOutputFormat[(Array[(Long, Long)], Array[(Long, Long, Long)])])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -110,7 +111,7 @@ class CoGroupCustomPartitioningTest extends CompilerTestBase {
.coGroup(input2)
.where("b").equalTo("a")
.withPartitioner(partitioner)
- .print()
+ .output(new DiscardingOutputFormat[(Array[Pojo2], Array[Pojo3])])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -174,7 +175,7 @@ class CoGroupCustomPartitioningTest extends CompilerTestBase {
.coGroup(input2)
.where( _.a ).equalTo( _.b )
.withPartitioner(partitioner)
- .print()
+ .output(new DiscardingOutputFormat[(Array[Pojo2], Array[Pojo3])])
val p = env.createProgramPlan()
val op = compileNoStats(p)
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
index 8d816ee..a0f93dd 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.operators.translation
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.optimizer.util.CompilerTestBase
import org.junit.Assert._
import org.junit.Test
@@ -49,7 +50,7 @@ class CoGroupGroupSortTranslationTest {
.sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING) {
(first, second) => first.buffered.head
}
- .print()
+ .output(new DiscardingOutputFormat[(Long, Long)])
val p = env.createProgramPlan()
@@ -92,7 +93,7 @@ class CoGroupGroupSortTranslationTest {
.sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING) {
(first, second) => first.buffered.head
}
- .print()
+ .output(new DiscardingOutputFormat[(Long, Long)])
val p = env.createProgramPlan()
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
index 395f36a..f81cb84 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.operators.translation
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.optimizer.util.CompilerTestBase
import org.junit.Assert._
import org.junit.Test
@@ -40,7 +41,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
data
.groupBy( _._1 ).withPartitioner(new TestPartitionerInt())
.reduce( (a,b) => a )
- .print()
+ .output(new DiscardingOutputFormat[(Int, Int)])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -73,7 +74,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
data
.groupBy( _._1 ).withPartitioner(new TestPartitionerInt())
.reduce( (a, b) => a)
- .print()
+ .output(new DiscardingOutputFormat[(Int, Int)])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -107,7 +108,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
.withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
.reduce( (a,b) => a)
- .print()
+ .output(new DiscardingOutputFormat[(Int, Int, Int)])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -141,7 +142,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
.withPartitioner(new TestPartitionerInt())
.sortGroup(_._2, Order.ASCENDING)
.reduce( (a,b) => a)
- .print()
+ .output(new DiscardingOutputFormat[(Int, Int, Int)])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -175,7 +176,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
.sortGroup(1, Order.ASCENDING)
.sortGroup(2, Order.DESCENDING)
.reduce( (a,b) => a)
- .print()
+ .output(new DiscardingOutputFormat[(Int, Int, Int, Int)])
val p = env.createProgramPlan()
val op = compileNoStats(p)
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
index a02d2af..6e40ea5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.operators.translation
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.optimizer.util.CompilerTestBase
import org.junit.Assert._
import org.junit.Test
@@ -41,7 +42,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase {
data
.groupBy("a").withPartitioner(new TestPartitionerInt())
.reduce( (a,b) => a )
- .print()
+ .output(new DiscardingOutputFormat[Pojo2])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -72,7 +73,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase {
data
.groupBy("a").withPartitioner(new TestPartitionerInt())
.reduceGroup( iter => Seq(iter.next) )
- .print()
+ .output(new DiscardingOutputFormat[Seq[Pojo2]])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -102,7 +103,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase {
.groupBy("a").withPartitioner(new TestPartitionerInt())
.sortGroup("b", Order.ASCENDING)
.reduceGroup( iter => Seq(iter.next) )
- .print()
+ .output(new DiscardingOutputFormat[Seq[Pojo3]])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -133,7 +134,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase {
.sortGroup("b", Order.ASCENDING)
.sortGroup("c", Order.DESCENDING)
.reduceGroup( iter => Seq(iter.next) )
- .print()
+ .output(new DiscardingOutputFormat[Seq[Pojo4]])
val p = env.createProgramPlan()
val op = compileNoStats(p)
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
index 25efe48..b103e9c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.operators.translation
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.junit.Assert._
import org.junit.Test
@@ -42,7 +43,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
data.groupBy(0)
.withPartitioner(new TestPartitionerInt())
.sum(1)
- .print()
+ .output(new DiscardingOutputFormat[(Int, Int)])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -73,7 +74,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
data
.groupBy(0).withPartitioner(new TestPartitionerInt())
.reduce( (a,b) => a )
- .print()
+ .output(new DiscardingOutputFormat[(Int, Int)])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -104,7 +105,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
data
.groupBy(0).withPartitioner(new TestPartitionerInt())
.reduceGroup( iter => Seq(iter.next) )
- .print()
+ .output(new DiscardingOutputFormat[Seq[(Int, Int)]])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -134,7 +135,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
.groupBy(0).withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
.reduceGroup( iter => Seq(iter.next) )
- .print()
+ .output(new DiscardingOutputFormat[Seq[(Int, Int, Int)]])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -165,7 +166,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
.sortGroup(1, Order.ASCENDING)
.sortGroup(2, Order.DESCENDING)
.reduceGroup( iter => Seq(iter.next) )
- .print()
+ .output(new DiscardingOutputFormat[Seq[(Int, Int, Int, Int)]])
val p = env.createProgramPlan()
val op = compileNoStats(p)
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
index fe30376..7ebf378 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.operators.translation
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.scala._
import org.apache.flink.optimizer.util.CompilerTestBase
import org.junit.Test
@@ -43,7 +44,7 @@ class CustomPartitioningTest extends CompilerTestBase {
data.partitionCustom(part, 0)
.mapPartition( x => x )
- .print()
+ .output(new DiscardingOutputFormat[(Int, Int)])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -113,7 +114,7 @@ class CustomPartitioningTest extends CompilerTestBase {
data
.partitionCustom(part, "a")
.mapPartition( x => x)
- .print()
+ .output(new DiscardingOutputFormat[Pojo])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -184,7 +185,7 @@ class CustomPartitioningTest extends CompilerTestBase {
data
.partitionCustom(part, pojo => pojo.a)
.mapPartition( x => x)
- .print()
+ .output(new DiscardingOutputFormat[Pojo])
val p = env.createProgramPlan()
val op = compileNoStats(p)
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
index 6aa4d75..9a400c5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.scala.operators.translation
import org.apache.flink.api.common.functions.{RichCoGroupFunction, RichMapFunction,
RichJoinFunction}
import org.apache.flink.api.common.operators.GenericDataSinkBase
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.operators.translation.WrappingFunction
import org.junit.Assert.assertArrayEquals
import org.junit.Assert.assertEquals
@@ -67,7 +68,7 @@ class DeltaIterationTranslationTest {
.setParallelism(ITERATION_PARALLELISM)
.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator)
- result.print()
+ result.output(new DiscardingOutputFormat[(Double, Long, String)])
result.writeAsText("/dev/null")
val p: Plan = env.createProgramPlan(JOB_NAME)
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
index 7836400..c540f61 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.operators.translation
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.junit.Assert
import org.junit.Test
@@ -31,7 +32,7 @@ class DistinctTranslationTest {
val input = env.fromElements("1", "2", "1", "3")
val op = input.distinct { x => x}
- op.print()
+ op.output(new DiscardingOutputFormat[String])
val p = env.createProgramPlan()
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
index 2467596..eae3db1 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.operators.translation
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.optimizer.util.CompilerTestBase
import org.junit.Assert._
import org.junit.Test
@@ -46,7 +47,7 @@ class JoinCustomPartitioningTest extends CompilerTestBase {
.join(input2, JoinHint.REPARTITION_HASH_FIRST)
.where(1).equalTo(0)
.withPartitioner(partitioner)
- .print()
+ .output(new DiscardingOutputFormat[((Long, Long), (Long, Long, Long))])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -110,7 +111,7 @@ class JoinCustomPartitioningTest extends CompilerTestBase {
.join(input2, JoinHint.REPARTITION_HASH_FIRST)
.where("b").equalTo("a")
.withPartitioner(partitioner)
- .print()
+ .output(new DiscardingOutputFormat[(Pojo2, Pojo3)])
val p = env.createProgramPlan()
val op = compileNoStats(p)
@@ -174,7 +175,7 @@ class JoinCustomPartitioningTest extends CompilerTestBase {
.join(input2, JoinHint.REPARTITION_HASH_FIRST)
.where( _.a ).equalTo( _.b )
.withPartitioner(partitioner)
- .print()
+ .output(new DiscardingOutputFormat[(Pojo2, Pojo3)])
val p = env.createProgramPlan()
val op = compileNoStats(p)
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
index e97fc21..5d3878c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.operators.translation
import org.apache.flink.api.common.operators.{GenericDataSourceBase, GenericDataSinkBase}
+import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.operators.translation.{KeyExtractingMapper,
PlanUnwrappingReduceOperator}
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
@@ -39,7 +40,8 @@ class ReduceTranslationTest {
val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
- initialData reduce { (v1, v2) => v1 } print()
+ initialData reduce { (v1, v2) => v1 } output(
+ new DiscardingOutputFormat[(Double, String, Long)])
val p = env.createProgramPlan(
@@ -70,7 +72,8 @@ class ReduceTranslationTest {
val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
- initialData.groupBy(2) reduce { (v1, v2) => v1 } print()
+ initialData.groupBy(2) reduce { (v1, v2) => v1 } output(
+ new DiscardingOutputFormat[(Double, String, Long)])
val p = env.createProgramPlan()
@@ -99,7 +102,8 @@ class ReduceTranslationTest {
val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
- initialData.groupBy { _._2 }. reduce { (v1, v2) => v1 } setParallelism(4) print()
+ initialData.groupBy { _._2 }. reduce { (v1, v2) => v1 } setParallelism(4) output(
+ new DiscardingOutputFormat[(Double, String, Long)])
val p = env.createProgramPlan()
val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 07300da..e22e0ef 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner;
import org.apache.hadoop.fs.Path;
@@ -445,13 +446,32 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
yc.init(yarnConfiguration);
yc.start();
+ // get temporary folder for writing output of wordcount example
+ File tmpOutFolder = null;
+ try{
+ tmpOutFolder = tmp.newFolder();
+ }
+ catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // get temporary file for reading input data for wordcount example
+ File tmpInFile = null;
+ try{
+ tmpInFile = tmp.newFile();
+ FileUtils.writeStringToFile(tmpInFile,WordCountData.TEXT);
+ }
+ catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+
Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
"-yn", "1",
"-yjm", "512",
"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
"-ytm", "1024",
"-ys", "2", // test requesting slots from YARN.
- "--yarndetached", job},
+ "--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()},
"The Job has been submitted with JobID",
RunTypes.CLI_FRONTEND);
@@ -490,19 +510,26 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
final ApplicationId id = tmpAppId;
// now it has finished.
- // check the output.
- File taskmanagerOut = YarnTestBase.findFile("..", new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.contains("taskmanager") && name.contains("stdout") && dir.getAbsolutePath().contains(id.toString());
+ // check the output files.
+ File[] listOfOutputFiles = tmpOutFolder.listFiles();
+
+
+ Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles);
+ LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder );
+
+ // read all output files in output folder to one output string
+ String content = "";
+ for(File f:listOfOutputFiles)
+ {
+ if(f.isFile())
+ {
+ content += FileUtils.readFileToString(f) + "\n";
}
- });
- Assert.assertNotNull("Taskmanager output not found", taskmanagerOut);
- LOG.info("The job has finished. TaskManager output file found {}", taskmanagerOut.getAbsolutePath());
- String content = FileUtils.readFileToString(taskmanagerOut);
+ }
+ //String content = FileUtils.readFileToString(taskmanagerOut);
// check for some of the wordcount outputs.
- Assert.assertTrue("Expected string '(all,2)' not found in string '"+content+"'", content.contains("(all,2)"));
- Assert.assertTrue("Expected string '(mind,1)' not found in string'"+content+"'", content.contains("(mind,1)"));
+ Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)"));
+ Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)"));
// check if the heap size for the TaskManager was set correctly
File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() {
[3/5] flink git commit: [FLINK-1418] [apis] Change print() to print
on the client and to eagerly execute the program.
Posted by se...@apache.org.
[FLINK-1418] [apis] Change print() to print on the client and to eagerly execute the program.
print() now uses collect() internally
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9c15620
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9c15620
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9c15620
Branch: refs/heads/master
Commit: e9c1562034dabc34fa46d4fd8411321db0a6c637
Parents: 939e3fc
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Apr 28 10:52:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 13:12:45 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 38 ++++++++++++--------
.../flink/api/java/MultipleInvokationsTest.java | 2 +-
.../optimizer/DistinctCompilationTest.java | 6 ++--
.../java/GroupReduceCompilationTest.java | 12 +++----
.../optimizer/java/ReduceCompilationTest.java | 8 ++---
.../org/apache/flink/api/scala/DataSet.scala | 9 ++---
6 files changed, 44 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 18cb01d..4f2942e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -18,10 +18,7 @@
package org.apache.flink.api.java;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
@@ -50,7 +47,6 @@ import org.apache.flink.api.java.functions.SelectByMaxFunction;
import org.apache.flink.api.java.functions.SelectByMinFunction;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
import org.apache.flink.api.java.operators.AggregateOperator;
@@ -86,8 +82,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ExceptionUtils;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
/**
* A DataSet represents a collection of elements of the same type.<br/>
@@ -1336,11 +1335,17 @@ public abstract class DataSet<T> {
/**
* Writes a DataSet to the standard output stream (stdout).<br/>
* For each element of the DataSet the result of {@link Object#toString()} is written.
- *
- * @return The DataSink that writes the DataSet.
+ * This triggers execute() automatically.
*/
- public DataSink<T> print() {
- return output(new PrintingOutputFormat<T>(false));
+ public void print() {
+ try {
+ List<T> elements = this.collect();
+ for (T e: elements) {
+ System.out.println(e);
+ }
+ } catch (Exception e) {
+ System.out.println("Could not retrieve values for printing: " + ExceptionUtils.stringifyException(e));
+ }
}
/**
@@ -1357,11 +1362,16 @@ public abstract class DataSet<T> {
/**
* Writes a DataSet to the standard error stream (stderr).<br/>
* For each element of the DataSet the result of {@link Object#toString()} is written.
- *
- * @return The DataSink that writes the DataSet.
*/
- public DataSink<T> printToErr() {
- return output(new PrintingOutputFormat<T>(true));
+ public void printToErr() {
+ try {
+ List<T> elements = this.collect();
+ for (T e: elements) {
+ System.err.println(e);
+ }
+ } catch (Exception e) {
+ System.err.println("Could not retrieve values for printing: " + e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
index 3638f70..c0ca6c2 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
@@ -36,7 +36,7 @@ public class MultipleInvokationsTest {
// ----------- Execution 1 ---------------
DataSet<String> data = env.fromElements("Some", "test", "data").name("source1");
- data.print().name("print1");
+ data.print();
data.output(new DiscardingOutputFormat<String>()).name("output1");
{
http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 5827d9c..973f402 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -86,6 +85,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
+ System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
@@ -146,6 +146,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
+ System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
@@ -198,8 +199,9 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(8, sinkNode.getParallelism());
}
catch (Exception e) {
+ System.err.println(e.getMessage());
e.printStackTrace();
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index 1bd4b8a..8fb4ef0 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -51,7 +51,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
public void reduce(Iterable<Double> values, Collector<Double> out) {}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -97,7 +97,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
}).name("reducer");
reduced.setCombinable(true);
- reduced.print().name("sink");
+ reduced.print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -148,7 +148,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -199,7 +199,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
}).name("reducer");
reduced.setCombinable(true);
- reduced.print().name("sink");
+ reduced.print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -257,7 +257,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -317,7 +317,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
}).name("reducer");
reduced.setCombinable(true);
- reduced.print().name("sink");
+ reduced.print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index 4197abb..2958f1a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -53,7 +53,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
return value1 + value2;
}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -98,7 +98,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
return value1 + value2;
}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -151,7 +151,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
return null;
}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -211,7 +211,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
return null;
}
}).name("reducer")
- .print().name("sink");
+ .print();
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 15d2f4e..5198157 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1323,9 +1323,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
/**
* Writes a DataSet to the standard output stream (stdout). This uses [[AnyRef.toString]] on
* each element.
+ * This triggers execute() automatically.
*/
- def print(): DataSink[T] = {
- output(new PrintingOutputFormat[T](false))
+ def print() = {
+ javaSet.print()
}
/**
@@ -1342,8 +1343,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* Writes a DataSet to the standard error stream (stderr). This uses [[AnyRef.toString]] on
* each element.
*/
- def printToErr(): DataSink[T] = {
- output(new PrintingOutputFormat[T](true))
+ def printToErr() = {
+ javaSet.printToErr()
}
/**
[2/5] flink git commit: [FLINK-1418] [apis] Fix eager print() and
adjust all tests and examples to not fail due to "eager" print method
Posted by se...@apache.org.
[FLINK-1418] [apis] Fix eager print() and adjust all tests and examples to not fail due to "eager" print method
- Add lastJobExecutionResult for getting the result of the last execution, when executing "eager" execution methods
This closes #699
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78d954b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78d954b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78d954b8
Branch: refs/heads/master
Commit: 78d954b858933f8f8ecc1ace01839b4e6080def1
Parents: e9c1562
Author: Nikolaas Steenbergen <Ni...@googlemail.com>
Authored: Tue May 5 10:49:54 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 13:12:45 2015 +0200
----------------------------------------------------------------------
.../client/program/ContextEnvironment.java | 4 +-
.../java/graph/ConnectedComponents.java | 6 +-
.../examples/java/graph/EnumTrianglesBasic.java | 6 +-
.../examples/java/graph/EnumTrianglesOpt.java | 6 +-
.../examples/java/graph/PageRankBasic.java | 4 +-
.../flink/examples/java/misc/PiEstimation.java | 2 -
.../examples/java/ml/LinearRegression.java | 5 +-
.../relational/EmptyFieldsCountAccumulator.java | 7 +-
.../java/relational/WebLogAnalysis.java | 6 +-
.../examples/java/wordcount/PojoExample.java | 6 +-
.../examples/java/wordcount/WordCount.java | 5 +-
.../examples/scala/clustering/KMeans.scala | 2 +-
.../scala/graph/ConnectedComponents.scala | 2 +-
.../examples/scala/graph/DeltaPageRank.scala | 1 -
.../scala/graph/EnumTrianglesBasic.scala | 5 +-
.../examples/scala/graph/EnumTrianglesOpt.scala | 4 +-
.../examples/scala/graph/PageRankBasic.scala | 5 +-
.../scala/graph/TransitiveClosureNaive.scala | 2 +-
.../examples/scala/misc/PiEstimation.scala | 2 -
.../examples/scala/ml/LinearRegression.scala | 2 +-
.../scala/relational/WebLogAnalysis.scala | 2 +-
.../examples/scala/wordcount/WordCount.scala | 2 +-
.../flink/api/java/CollectionEnvironment.java | 3 +-
.../java/org/apache/flink/api/java/DataSet.java | 26 ++---
.../flink/api/java/ExecutionEnvironment.java | 13 ++-
.../apache/flink/api/java/LocalEnvironment.java | 3 +-
.../flink/api/java/RemoteEnvironment.java | 4 +-
.../flink/api/java/MultipleInvokationsTest.java | 3 +-
.../SemanticPropertiesProjectionTest.java | 18 ++-
.../SemanticPropertiesTranslationTest.java | 31 +++---
.../translation/AggregateTranslationTest.java | 3 +-
.../translation/CoGroupSortTranslationTest.java | 5 +-
.../DeltaIterationTranslationTest.java | 3 +-
.../translation/DistinctTranslationTest.java | 13 ++-
.../translation/ReduceTranslationTests.java | 7 +-
.../optimizer/DistinctCompilationTest.java | 1 +
.../WorksetIterationCornerCasesTest.java | 22 +++-
...naryCustomPartitioningCompatibilityTest.java | 5 +-
.../CoGroupCustomPartitioningTest.java | 9 +-
...ustomPartitioningGlobalOptimizationTest.java | 5 +-
.../custompartition/CustomPartitioningTest.java | 7 +-
.../GroupingKeySelectorTranslationTest.java | 9 +-
.../GroupingPojoTranslationTest.java | 111 ++++++++++---------
.../GroupingTupleTranslationTest.java | 11 +-
.../JoinCustomPartitioningTest.java | 9 +-
.../java/DeltaIterationDependenciesTest.java | 3 +-
.../java/DistinctAndGroupingOptimizerTest.java | 5 +-
.../java/GroupReduceCompilationTest.java | 13 ++-
.../optimizer/java/IterationCompilerTest.java | 9 +-
.../optimizer/java/JoinTranslationTest.java | 4 +-
.../flink/optimizer/java/OpenIterationTest.java | 13 ++-
.../optimizer/java/PartitionOperatorTest.java | 5 +-
.../optimizer/java/ReduceCompilationTest.java | 9 +-
.../WorksetIterationsJavaApiCompilerTest.java | 3 +-
.../CoGroupOnConflictingPartitioningsTest.java | 3 +-
.../JoinOnConflictingPartitioningsTest.java | 3 +-
.../flink/api/scala/ExecutionEnvironment.scala | 7 ++
.../api/java/common/PlanBinder.java | 3 +-
.../flink/spargel/java/SpargelCompilerTest.java | 5 +-
.../test/util/CollectionTestEnvironment.java | 4 +-
.../flink/test/util/JavaProgramTestBase.java | 6 +-
.../apache/flink/test/util/TestEnvironment.java | 7 +-
...ultipleJoinsWithSolutionSetCompilerTest.java | 5 +-
.../iterations/PageRankCompilerTest.java | 3 +-
.../scala/DeltaIterationSanityCheckTest.scala | 23 ++--
.../PartitionOperatorTranslationTest.scala | 5 +-
.../SemanticPropertiesTranslationTest.scala | 16 ++-
.../translation/AggregateTranslationTest.scala | 5 +-
.../CoGroupCustomPartitioningTest.scala | 7 +-
.../CoGroupGroupSortTranslationTest.scala | 5 +-
...tomPartitioningGroupingKeySelectorTest.scala | 11 +-
.../CustomPartitioningGroupingPojoTest.scala | 9 +-
.../CustomPartitioningGroupingTupleTest.scala | 11 +-
.../translation/CustomPartitioningTest.scala | 7 +-
.../DeltaIterationTranslationTest.scala | 3 +-
.../translation/DistinctTranslationTest.scala | 3 +-
.../JoinCustomPartitioningTest.scala | 7 +-
.../translation/ReduceTranslationTest.scala | 10 +-
.../flink/yarn/YARNSessionFIFOITCase.java | 51 +++++++--
79 files changed, 396 insertions(+), 294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 35e5846..9287017 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -62,10 +62,12 @@ public class ContextEnvironment extends ExecutionEnvironment {
JobSubmissionResult result = this.client.run(toRun, getParallelism(), wait);
if(result instanceof JobExecutionResult) {
+ this.lastJobExecutionResult = (JobExecutionResult) result;
return (JobExecutionResult) result;
} else {
LOG.warn("The Client didn't return a JobExecutionResult");
- return new JobExecutionResult(result.getJobID(), -1, null);
+ this.lastJobExecutionResult = new JobExecutionResult(result.getJobID(), -1, null);
+ return this.lastJobExecutionResult;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
index bd74b20..827bb25 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
@@ -111,12 +111,12 @@ public class ConnectedComponents implements ProgramDescription {
// emit result
if(fileOutput) {
result.writeAsCsv(outputPath, "\n", " ");
+
+ // execute program
+ env.execute("Connected Components Example");
} else {
result.print();
}
-
- // execute program
- env.execute("Connected Components Example");
}
// *************************************************************************
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
index 5af60be..fdbe197 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
@@ -106,13 +106,13 @@ public class EnumTrianglesBasic {
// emit result
if(fileOutput) {
triangles.writeAsCsv(outputPath, "\n", ",");
+
+ // execute program
+ env.execute("Basic Triangle Enumeration Example");
} else {
triangles.print();
}
- // execute program
- env.execute("Basic Triangle Enumeration Example");
-
}
// *************************************************************************
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
index fb1e6f5..56b448e 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
@@ -121,12 +121,12 @@ public class EnumTrianglesOpt {
// emit result
if(fileOutput) {
triangles.writeAsCsv(outputPath, "\n", ",");
+ // execute program
+ env.execute("Triangle Enumeration Example");
} else {
triangles.print();
}
-
- // execute program
- env.execute("Triangle Enumeration Example");
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
index d622799..a374d0c 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
@@ -122,12 +122,12 @@ public class PageRankBasic {
// emit result
if(fileOutput) {
finalPageRanks.writeAsCsv(outputPath, "\n", " ");
+ // execute program
+ env.execute("Basic Page Rank Example");
} else {
finalPageRanks.print();
}
- // execute program
- env.execute("Basic Page Rank Example");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
index 83cf9d9..2780bb1 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
@@ -70,8 +70,6 @@ public class PiEstimation implements java.io.Serializable {
System.out.println("We estimate Pi to be:");
pi.print();
-
- env.execute();
}
//*************************************************************************
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
index 86b7de2..46873f6 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
@@ -104,12 +104,13 @@ public class LinearRegression {
// emit result
if(fileOutput) {
result.writeAsText(outputPath);
+ // execute program
+ env.execute("Linear Regression example");
} else {
result.print();
}
- // execute program
- env.execute("Linear Regression example");
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
index 2016eaa..9f6f567 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
@@ -72,16 +72,17 @@ public class EmptyFieldsCountAccumulator {
final DataSet<Tuple> filteredLines = file.filter(new EmptyFieldFilter());
// Here, we could do further processing with the filtered lines...
-
+ JobExecutionResult result = null;
// output the filtered lines
if (outputPath == null) {
filteredLines.print();
+ result = env.getLastJobExecutionResult();
} else {
filteredLines.writeAsCsv(outputPath);
+ // execute program
+ result = env.execute("Accumulator example");
}
- // execute program
- final JobExecutionResult result = env.execute("Accumulator example");
// get the accumulator result via its registration key
final List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
index 890af65..9425291 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
@@ -140,13 +140,11 @@ public class WebLogAnalysis {
// emit result
if(fileOutput) {
result.writeAsCsv(outputPath, "\n", "|");
+ // execute program
+ env.execute("WebLogAnalysis Example");
} else {
result.print();
}
-
- // execute program
- env.execute("WebLogAnalysis Example");
-
}
// *************************************************************************
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
index f3364fd..b001d12 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
@@ -97,12 +97,12 @@ public class PojoExample {
if(fileOutput) {
counts.writeAsText(outputPath, WriteMode.OVERWRITE);
+ // execute program
+ env.execute("WordCount-Pojo Example");
} else {
counts.print();
}
-
- // execute program
- env.execute("WordCount-Pojo Example");
+
}
// *************************************************************************
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
index 7db7946..82c3ad8 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
@@ -74,12 +74,13 @@ public class WordCount {
// emit result
if(fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ");
+ // execute program
+ env.execute("WordCount Example");
} else {
counts.print();
}
- // execute program
- env.execute("WordCount Example");
+
}
// *************************************************************************
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
index 26d01c3..08a3e62 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
@@ -96,12 +96,12 @@ object KMeans {
if (fileOutput) {
clusteredPoints.writeAsCsv(outputPath, "\n", " ")
+ env.execute("Scala KMeans Example")
}
else {
clusteredPoints.print()
}
- env.execute("Scala KMeans Example")
}
private def parseParameters(programArguments: Array[String]): Boolean = {
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
index e75c862..9e23ed7 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
@@ -98,11 +98,11 @@ object ConnectedComponents {
}
if (fileOutput) {
verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
+ env.execute("Scala Connected Components Example")
} else {
verticesWithComponents.print()
}
- env.execute("Scala Connected Components Example")
}
private def parseParameters(args: Array[String]): Boolean = {
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
index b4955ed..ae8a982 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
@@ -100,6 +100,5 @@ object DeltaPageRank {
iteration.print()
- env.execute("Page Rank")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
index a62786c..a9000b3 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
@@ -91,12 +91,13 @@ object EnumTrianglesBasic {
// emit result
if (fileOutput) {
triangles.writeAsCsv(outputPath, "\n", ",")
+ // execute program
+ env.execute("TriangleEnumeration Example")
} else {
triangles.print()
}
- // execute program
- env.execute("TriangleEnumeration Example")
+
}
// *************************************************************************
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
index 244e968..cc7c33f 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
@@ -117,12 +117,12 @@ object EnumTrianglesOpt {
// emit result
if (fileOutput) {
triangles.writeAsCsv(outputPath, "\n", ",")
+ // execute program
+ env.execute("TriangleEnumeration Example")
} else {
triangles.print()
}
- // execute program
- env.execute("TriangleEnumeration Example")
}
// *************************************************************************
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
index a3ea4b3..5b5f6c2 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -128,12 +128,11 @@ object PageRankBasic {
// emit result
if (fileOutput) {
result.writeAsCsv(outputPath, "\n", " ")
+ // execute program
+ env.execute("Basic PageRank Example")
} else {
result.print()
}
-
- // execute program
- env.execute("Basic PageRank Example")
}
// *************************************************************************
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
index 727cb47..3de0f2e 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -57,11 +57,11 @@ object TransitiveClosureNaive {
if (fileOutput) {
paths.writeAsCsv(outputPath, "\n", " ")
+ env.execute("Scala Transitive Closure Example")
} else {
paths.print()
}
- env.execute("Scala Transitive Closure Example")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
index 582dd4f..3453ee8 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
@@ -47,8 +47,6 @@ object PiEstimation {
println("We estimate Pi to be:")
pi.print()
-
- env.execute("PiEstimation example")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
index fb99a00..2a7b786 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
@@ -82,11 +82,11 @@ object LinearRegression {
if (fileOutput) {
result.writeAsText(outputPath)
+ env.execute("Scala Linear Regression example")
}
else {
result.print()
}
- env.execute("Scala Linear Regression example")
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
index 950e3c8..5392594 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
@@ -123,11 +123,11 @@ object WebLogAnalysis {
// emit result
if (fileOutput) {
result.writeAsCsv(outputPath, "\n", "|")
+ env.execute("Scala WebLogAnalysis Example")
} else {
result.print()
}
- env.execute("Scala WebLogAnalysis Example")
}
private var fileOutput: Boolean = false
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
index b5c2ee2..7d5db7e 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
@@ -57,11 +57,11 @@ object WordCount {
if (fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ")
+ env.execute("Scala WordCount Example")
} else {
counts.print()
}
- env.execute("Scala WordCount Example")
}
private def parseParameters(args: Array[String]): Boolean = {
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
index b48debc..51e91d7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
@@ -30,7 +30,8 @@ public class CollectionEnvironment extends ExecutionEnvironment {
// We need to reverse here. Object-Reuse enabled, means safe mode is disabled.
CollectionExecutor exec = new CollectionExecutor(getConfig());
- return exec.execute(p);
+ this.lastJobExecutionResult = exec.execute(p);
+ return this.lastJobExecutionResult;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 4f2942e..133a083 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -47,6 +47,7 @@ import org.apache.flink.api.java.functions.SelectByMaxFunction;
import org.apache.flink.api.java.functions.SelectByMinFunction;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
import org.apache.flink.api.java.operators.AggregateOperator;
@@ -82,7 +83,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.ExceptionUtils;
import java.io.IOException;
import java.util.ArrayList;
@@ -1337,14 +1337,10 @@ public abstract class DataSet<T> {
* For each element of the DataSet the result of {@link Object#toString()} is written.
* This triggers execute() automatically.
*/
- public void print() {
- try {
- List<T> elements = this.collect();
- for (T e: elements) {
- System.out.println(e);
- }
- } catch (Exception e) {
- System.out.println("Could not retrieve values for printing: " + ExceptionUtils.stringifyException(e));
+ public void print() throws Exception{
+ List<T> elements = this.collect();
+ for (T e: elements) {
+ System.out.println(e);
}
}
@@ -1363,14 +1359,10 @@ public abstract class DataSet<T> {
* Writes a DataSet to the standard error stream (stderr).<br/>
* For each element of the DataSet the result of {@link Object#toString()} is written.
*/
- public void printToErr() {
- try {
- List<T> elements = this.collect();
- for (T e: elements) {
- System.err.println(e);
- }
- } catch (Exception e) {
- System.err.println("Could not retrieve values for printing: " + e);
+ public void printToErr() throws Exception{
+ List<T> elements = this.collect();
+ for (T e: elements) {
+ System.err.println(e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 75d4387..0f65b79 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -94,6 +94,9 @@ import com.google.common.base.Preconditions;
*/
public abstract class ExecutionEnvironment {
+
+ protected JobExecutionResult lastJobExecutionResult;
+
private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
/** The environment of the context (local by default, cluster if invoked through command line) */
@@ -234,7 +237,15 @@ public abstract class ExecutionEnvironment {
public UUID getId() {
return this.executionId;
}
-
+
+ /**
+ * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
+ */
+ public JobExecutionResult getLastJobExecutionResult(){
+ return this.lastJobExecutionResult;
+ }
+
+
/**
* Gets the UUID by which this environment is identified, as a string.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 25042b6..27b6254 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -51,7 +51,8 @@ public class LocalEnvironment extends ExecutionEnvironment {
PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
- return executor.executePlan(p);
+ this.lastJobExecutionResult = executor.executePlan(p);
+ return this.lastJobExecutionResult;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index c9a4fe0..515037c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -67,7 +67,9 @@ public class RemoteEnvironment extends ExecutionEnvironment {
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
- return executor.executePlan(p);
+
+ this.lastJobExecutionResult = executor.executePlan(p);
+ return this.lastJobExecutionResult;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
index c0ca6c2..4fc51bb 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
@@ -36,7 +36,8 @@ public class MultipleInvokationsTest {
// ----------- Execution 1 ---------------
DataSet<String> data = env.fromElements("Some", "test", "data").name("source1");
- data.print();
+ //data.print();
+ data.output(new DiscardingOutputFormat<String>()).name("print1");
data.output(new DiscardingOutputFormat<String>()).name("output1");
{
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
index 341d87e..e890b4e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
@@ -28,11 +28,9 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.base.CrossOperatorBase;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.translation.PlanProjectOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.junit.Test;
@@ -73,7 +71,7 @@ public class SemanticPropertiesProjectionTest {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
- tupleDs.project(1, 3, 2).project(0, 3).print();
+ tupleDs.project(1, 3, 2).project(0, 3).output(new DiscardingOutputFormat<Tuple>());
Plan plan = env.createProgramPlan();
@@ -99,7 +97,7 @@ public class SemanticPropertiesProjectionTest {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, Long>, String>> tupleDs = env.fromCollection(emptyNestedTupleData, nestedTupleTypeInfo);
- tupleDs.project(2, 3, 1).project(2).print();
+ tupleDs.project(2, 3, 1).project(2).output(new DiscardingOutputFormat<Tuple>());
Plan plan = env.createProgramPlan();
@@ -135,7 +133,7 @@ public class SemanticPropertiesProjectionTest {
tupleDs.join(tupleDs).where(0).equalTo(0)
.projectFirst(2, 3)
.projectSecond(1, 4)
- .print();
+ .output(new DiscardingOutputFormat<Tuple>());
Plan plan = env.createProgramPlan();
@@ -163,7 +161,7 @@ public class SemanticPropertiesProjectionTest {
tupleDs.join(tupleDs).where(0).equalTo(0)
.projectFirst(2,0)
.projectSecond(1,3)
- .print();
+ .output(new DiscardingOutputFormat<Tuple>());
Plan plan = env.createProgramPlan();
@@ -212,7 +210,7 @@ public class SemanticPropertiesProjectionTest {
tupleDs.cross(tupleDs)
.projectFirst(2, 3)
.projectSecond(1, 4)
- .print();
+ .output(new DiscardingOutputFormat<Tuple>());
Plan plan = env.createProgramPlan();
@@ -240,7 +238,7 @@ public class SemanticPropertiesProjectionTest {
tupleDs.cross(tupleDs)
.projectFirst(2, 0)
.projectSecond(1,3)
- .print();
+ .output(new DiscardingOutputFormat<Tuple>());
Plan plan = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
index f0124e3..33b3958 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.junit.Test;
@@ -55,7 +56,7 @@ public class SemanticPropertiesTranslationTest {
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
- input.map(new WildcardForwardedMapper<Tuple3<Long,String,Integer>>()).print();
+ input.map(new WildcardForwardedMapper<Tuple3<Long,String,Integer>>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -80,7 +81,7 @@ public class SemanticPropertiesTranslationTest {
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
- input.map(new IndividualForwardedMapper<Long, String, Integer>()).print();
+ input.map(new IndividualForwardedMapper<Long, String, Integer>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -102,7 +103,7 @@ public class SemanticPropertiesTranslationTest {
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
- input.map(new ShufflingMapper<Long>()).print();
+ input.map(new ShufflingMapper<Long>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -128,7 +129,7 @@ public class SemanticPropertiesTranslationTest {
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
input.map(new NoAnnotationMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -151,7 +152,7 @@ public class SemanticPropertiesTranslationTest {
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -174,7 +175,7 @@ public class SemanticPropertiesTranslationTest {
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -196,7 +197,7 @@ public class SemanticPropertiesTranslationTest {
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
- input.map(new AllForwardedExceptMapper<Tuple3<Long, Long, Long>>()).print();
+ input.map(new AllForwardedExceptMapper<Tuple3<Long, Long, Long>>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -218,7 +219,7 @@ public class SemanticPropertiesTranslationTest {
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
- input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).print();
+ input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -260,7 +261,7 @@ public class SemanticPropertiesTranslationTest {
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Double>> input2 = env.fromElements(new Tuple2<Long, Double>(3l, 3.1415));
input1.join(input2).where(0).equalTo(0).with(new ForwardedBothAnnotationJoin<Long, String, Long, Double>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<String, Double>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -287,7 +288,7 @@ public class SemanticPropertiesTranslationTest {
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l));
input1.join(input2).where(0).equalTo(0).with(new NoAnnotationJoin<Long>())
.withForwardedFieldsFirst("0->1; 1->2").withForwardedFieldsSecond("1->0")
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -314,7 +315,7 @@ public class SemanticPropertiesTranslationTest {
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l));
input1.join(input2).where(0).equalTo(0).with(new ReadSetJoin<Long>())
.withForwardedFieldsFirst("0->1; 1->2").withForwardedFieldsSecond("1->0")
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -347,7 +348,7 @@ public class SemanticPropertiesTranslationTest {
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l));
input1.join(input2).where(0).equalTo(0).with(new ForwardedFirstAnnotationJoin<Long>())
.withForwardedFieldsSecond("1")
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -377,7 +378,7 @@ public class SemanticPropertiesTranslationTest {
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l));
input1.join(input2).where(0).equalTo(0).with(new ForwardedSecondAnnotationJoin<Long>())
.withForwardedFieldsFirst("0->1")
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -405,7 +406,7 @@ public class SemanticPropertiesTranslationTest {
@SuppressWarnings("unchecked")
DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
input1.join(input2).where(0).equalTo(0).with(new AllForwardedExceptJoin<Long>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -435,7 +436,7 @@ public class SemanticPropertiesTranslationTest {
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l));
input1.join(input2).where(0).equalTo(0).with(new ReadSetJoin<Long>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
index 63b4052..0ce79e3 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.types.StringValue;
import org.junit.Test;
@@ -46,7 +47,7 @@ public class AggregateTranslationTest {
DataSet<Tuple3<Double, StringValue, Long>> initialData =
env.fromElements(new Tuple3<Double, StringValue, Long>(3.141592, new StringValue("foobar"), Long.valueOf(77)));
- initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).print();
+ initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, Long>>());
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
index 2fe9965..887173d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
@@ -54,7 +55,7 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
Collector<Long> out) {}
})
- .print();
+ .output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
@@ -98,7 +99,7 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPoJo> second, Collector<Long> out) {}
})
- .print();
+ .output(new DiscardingOutputFormat<Long>());
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index ae89780..f9ce82f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
@@ -91,7 +92,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
joined,
joined.map(new NextWorksetMapper()).name(BEFORE_NEXT_WORKSET_MAP));
- result.print();
+ result.output(new DiscardingOutputFormat<Tuple3<Double, Long, String>>());
result.writeAsText("/dev/null");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
index b7fbb78..9824ee1 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -60,7 +61,7 @@ public class DistinctTranslationTest {
public String getKey(String value) { return value; }
});
- op.print();
+ op.output(new DiscardingOutputFormat<String>());
Plan p = env.createProgramPlan();
@@ -81,7 +82,7 @@ public class DistinctTranslationTest {
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
- initialData.distinct().print();
+ initialData.distinct().output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
Plan p = env.createProgramPlan();
@@ -117,7 +118,7 @@ public class DistinctTranslationTest {
DataSet<CustomType> initialData = getSourcePojoDataSet(env);
- initialData.distinct().print();
+ initialData.distinct().output(new DiscardingOutputFormat<CustomType>());
Plan p = env.createProgramPlan();
@@ -153,7 +154,7 @@ public class DistinctTranslationTest {
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
- initialData.distinct(1, 2).print();
+ initialData.distinct(1, 2).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
Plan p = env.createProgramPlan();
@@ -193,7 +194,7 @@ public class DistinctTranslationTest {
public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) {
return value.f1;
}
- }).setParallelism(4).print();
+ }).setParallelism(4).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
Plan p = env.createProgramPlan();
@@ -237,7 +238,7 @@ public class DistinctTranslationTest {
DataSet<CustomType> initialData = getSourcePojoDataSet(env);
- initialData.distinct("myInt").print();
+ initialData.distinct("myInt").output(new DiscardingOutputFormat<CustomType>());
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
index b578eb7..b555844 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -54,7 +55,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
return value1;
}
- }).print();
+ }).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
Plan p = env.createProgramPlan();
@@ -96,7 +97,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
return value1;
}
})
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
Plan p = env.createProgramPlan();
@@ -143,7 +144,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
return value1;
}
}).setParallelism(4)
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
Plan p = env.createProgramPlan();
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 973f402..20a4ef6 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
index 32bd6e9..ccc9b13 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
@@ -42,13 +41,13 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Tuple2<Long, Long>> input = env.readCsvFile("/tmp/some.csv").types(Long.class, Long.class);
+ DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 100).map(new Duplicator<Long>());
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 100, 1);
- DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+ DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new TestMapper<Tuple2<Long,Long>>());
iteration.closeWith(iterEnd, iterEnd)
- .output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -64,5 +63,18 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
fail(e.getMessage());
}
}
-
+
+ private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
+ @Override
+ public Tuple2<T, T> map(T value) {
+ return new Tuple2<T, T>(value, value);
+ }
+ }
+
+ private static final class TestMapper<T> implements MapFunction<T, T> {
+ @Override
+ public T map(T value) {
+ return value;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
index b4e95fb..9a60943 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
@@ -57,7 +58,7 @@ public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase
input1.partitionCustom(partitioner, 1)
.join(input2.partitionCustom(partitioner, 0))
.where(1).equalTo(0)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -102,7 +103,7 @@ public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase
.coGroup(input2.partitionCustom(partitioner, 0))
.where(1).equalTo(0)
.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
index 346e702..14d2a96 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
@@ -57,7 +58,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
.where(1).equalTo(0)
.withPartitioner(partitioner)
.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -118,7 +119,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
.where("b").equalTo("a")
.withPartitioner(partitioner)
.with(new DummyCoGroupFunction<Pojo2, Pojo3>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Pojo2, Pojo3>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -180,7 +181,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
.where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector())
.withPartitioner(partitioner)
.with(new DummyCoGroupFunction<Pojo2, Pojo3>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Pojo2, Pojo3>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -251,7 +252,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
grouped
.coGroup(partitioned).where(0).equalTo(0)
.with(new DummyCoGroupFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
index 17a7659..bc2eb82 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
@@ -57,8 +58,8 @@ public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase {
.withPartitioner(partitioner);
joined.groupBy(1).withPartitioner(partitioner)
- .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
- .print();
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
+ .output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
index 00fd587..1aca046 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -53,7 +54,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
data
.partitionCustom(part, 0)
.mapPartition(new IdentityPartitionerMapper<Tuple2<Integer,Integer>>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -123,7 +124,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
data
.partitionCustom(part, "a")
.mapPartition(new IdentityPartitionerMapper<Pojo>())
- .print();
+ .output(new DiscardingOutputFormat<Pojo>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -193,7 +194,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
data
.partitionCustom(part, new TestKeySelectorInt<Pojo>())
.mapPartition(new IdentityPartitionerMapper<Pojo>())
- .print();
+ .output(new DiscardingOutputFormat<Pojo>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
index 23f4812..b2bfc67 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -53,7 +54,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
.withPartitioner(new TestPartitionerInt())
.reduce(new DummyReducer<Tuple2<Integer,Integer>>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -84,8 +85,8 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
.withPartitioner(new TestPartitionerInt())
- .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
- .print();
+ .reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
+ .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -116,7 +117,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
.withPartitioner(new TestPartitionerInt())
.sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
index 54033ac..dc2b147 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
@@ -37,26 +38,26 @@ import org.junit.Test;
@SuppressWarnings("serial")
public class GroupingPojoTranslationTest extends CompilerTestBase {
-
+
@Test
public void testCustomPartitioningTupleReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Pojo2> data = env.fromElements(new Pojo2())
.rebalance().setParallelism(4);
-
+
data.groupBy("a").withPartitioner(new TestPartitionerInt())
- .reduce(new DummyReducer<Pojo2>())
- .print();
-
+ .reduce(new DummyReducer<Pojo2>())
+ .output(new DiscardingOutputFormat<Pojo2>());
+
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
-
+
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
-
+
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
@@ -66,26 +67,26 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
fail(e.getMessage());
}
}
-
+
@Test
public void testCustomPartitioningTupleGroupReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Pojo2> data = env.fromElements(new Pojo2())
.rebalance().setParallelism(4);
-
+
data.groupBy("a").withPartitioner(new TestPartitionerInt())
- .reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
- .print();
-
+ .reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
+ .output(new DiscardingOutputFormat<Pojo2>());
+
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
-
+
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
-
+
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
@@ -95,27 +96,27 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
fail(e.getMessage());
}
}
-
+
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Pojo3> data = env.fromElements(new Pojo3())
.rebalance().setParallelism(4);
-
+
data.groupBy("a").withPartitioner(new TestPartitionerInt())
- .sortGroup("b", Order.ASCENDING)
- .reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
- .print();
-
+ .sortGroup("b", Order.ASCENDING)
+ .reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
+ .output(new DiscardingOutputFormat<Pojo3>());
+
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
-
+
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
-
+
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
@@ -125,28 +126,28 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
fail(e.getMessage());
}
}
-
+
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Pojo4> data = env.fromElements(new Pojo4())
.rebalance().setParallelism(4);
-
+
data.groupBy("a").withPartitioner(new TestPartitionerInt())
- .sortGroup("b", Order.ASCENDING)
- .sortGroup("c", Order.DESCENDING)
- .reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
- .print();
-
+ .sortGroup("b", Order.ASCENDING)
+ .sortGroup("c", Order.DESCENDING)
+ .reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
+ .output(new DiscardingOutputFormat<Pojo4>());
+
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
-
+
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
-
+
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
@@ -156,15 +157,15 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
fail(e.getMessage());
}
}
-
+
@Test
public void testCustomPartitioningTupleInvalidType() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Pojo2> data = env.fromElements(new Pojo2())
.rebalance().setParallelism(4);
-
+
try {
data.groupBy("a").withPartitioner(new TestPartitionerLong());
fail("Should throw an exception");
@@ -176,19 +177,19 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
fail(e.getMessage());
}
}
-
+
@Test
public void testCustomPartitioningTupleInvalidTypeSorted() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Pojo3> data = env.fromElements(new Pojo3())
.rebalance().setParallelism(4);
-
+
try {
data.groupBy("a")
- .sortGroup("b", Order.ASCENDING)
- .withPartitioner(new TestPartitionerLong());
+ .sortGroup("b", Order.ASCENDING)
+ .withPartitioner(new TestPartitionerLong());
fail("Should throw an exception");
}
catch (InvalidProgramException e) {}
@@ -198,18 +199,18 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
fail(e.getMessage());
}
}
-
+
@Test
public void testCustomPartitioningTupleRejectCompositeKey() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Pojo2> data = env.fromElements(new Pojo2())
.rebalance().setParallelism(4);
-
+
try {
data.groupBy("a", "b")
- .withPartitioner(new TestPartitionerInt());
+ .withPartitioner(new TestPartitionerInt());
fail("Should throw an exception");
}
catch (InvalidProgramException e) {}
@@ -219,39 +220,39 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
fail(e.getMessage());
}
}
-
+
// --------------------------------------------------------------------------------------------
-
+
public static class Pojo2 {
public int a;
public int b;
-
+
}
-
+
public static class Pojo3 {
public int a;
public int b;
public int c;
}
-
+
public static class Pojo4 {
public int a;
public int b;
public int c;
public int d;
}
-
+
private static class TestPartitionerInt implements Partitioner<Integer> {
@Override
public int partition(Integer key, int numPartitions) {
return 0;
}
}
-
+
private static class TestPartitionerLong implements Partitioner<Long> {
@Override
public int partition(Long key, int numPartitions) {
return 0;
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
index 49f44f5..6eb5ad5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
@@ -51,7 +52,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sum(1)
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -80,7 +81,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.reduce(new DummyReducer<Tuple2<Integer,Integer>>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -109,7 +110,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -139,7 +140,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
@@ -170,7 +171,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
.sortGroup(1, Order.ASCENDING)
.sortGroup(2, Order.DESCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
- .print();
+ .output(new DiscardingOutputFormat<Tuple4<Integer, Integer, Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
[5/5] flink git commit: [FLINK-2071] [java api] Fix serializability
issue with projectsion function.
Posted by se...@apache.org.
[FLINK-2071] [java api] Fix serializability issue with projectsion function.
Improve type safety.
Minor cleanups in ProjectOperator.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad1d9362
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad1d9362
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad1d9362
Branch: refs/heads/master
Commit: ad1d9362c88343972b19fdcce0b041d8380805a9
Parents: 6220f34
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 21 14:47:38 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 14:50:32 2015 +0200
----------------------------------------------------------------------
.../api/java/operators/ProjectOperator.java | 27 +++++++------
.../translation/PlanProjectOperator.java | 41 +++++++++++++-------
2 files changed, 41 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ad1d9362/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 9b7d567..55e182f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -82,7 +82,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
* Continues a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
* <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
* The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
- * Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}.
+ * Additional fields can be added to the projection by calling this method repeatedly.
*
* <b>Note: With the current implementation, the Project transformation looses type information.</b>
*
@@ -94,8 +94,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
* @see DataSet
* @see ProjectOperator
*/
- @SuppressWarnings("hiding")
- public <OUT extends Tuple> ProjectOperator<?, OUT> project(int... fieldIndexes) {
+ public <R extends Tuple> ProjectOperator<?, R> project(int... fieldIndexes) {
proj.acceptAdditionalIndexes(fieldIndexes);
return proj.projectTupleX();
@@ -103,10 +102,10 @@ public class ProjectOperator<IN, OUT extends Tuple>
/**
* Deprecated method only kept for compatibility.
*/
- @SuppressWarnings({ "unchecked", "hiding" })
+ @SuppressWarnings("unchecked")
@Deprecated
- public <OUT extends Tuple> ProjectOperator<IN, OUT> types(Class<?>... types) {
- TupleTypeInfo<OUT> typeInfo = (TupleTypeInfo<OUT>)this.getResultType();
+ public <R extends Tuple> ProjectOperator<IN, R> types(Class<?>... types) {
+ TupleTypeInfo<R> typeInfo = (TupleTypeInfo<R>)this.getResultType();
if(types.length != typeInfo.getArity()) {
throw new InvalidProgramException("Provided types do not match projection.");
@@ -117,12 +116,12 @@ public class ProjectOperator<IN, OUT extends Tuple>
throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection");
}
}
- return (ProjectOperator<IN, OUT>) this;
+ return (ProjectOperator<IN, R>) this;
}
public static class Projection<T> {
- private final DataSet<T> ds;
+ private final DataSet<T> ds;
private int[] fieldIndexes;
public Projection(DataSet<T> ds, int[] fieldIndexes) {
@@ -138,9 +137,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
}
- int maxFieldIndex = ((TupleTypeInfo<?>)ds.getType()).getArity();
- for(int i=0; i<fieldIndexes.length; i++) {
- Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex);
+ int maxFieldIndex = ds.getType().getArity();
+ for (int fieldIndexe : fieldIndexes) {
+ Preconditions.checkElementIndex(fieldIndexe, maxFieldIndex);
}
this.ds = ds;
@@ -160,8 +159,8 @@ public class ProjectOperator<IN, OUT extends Tuple>
this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + additionalIndexes.length);
- int maxFieldIndex = ((TupleTypeInfo<?>)ds.getType()).getArity();
- for(int i=0; i<additionalIndexes.length; i++) {
+ int maxFieldIndex = ds.getType().getArity();
+ for (int i = 0; i < additionalIndexes.length; i++) {
Preconditions.checkElementIndex(additionalIndexes[i], maxFieldIndex);
this.fieldIndexes[offset + i] = additionalIndexes[i];
@@ -186,7 +185,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
*/
@SuppressWarnings("unchecked")
public <OUT extends Tuple> ProjectOperator<T, OUT> projectTupleX() {
- ProjectOperator<T, OUT> projOperator = null;
+ ProjectOperator<T, OUT> projOperator;
switch (fieldIndexes.length) {
case 1: projOperator = (ProjectOperator<T, OUT>) projectTuple1(); break;
http://git-wip-us.apache.org/repos/asf/flink/blob/ad1d9362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index 959b929..101b89b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -28,32 +28,47 @@ import org.apache.flink.api.java.tuple.Tuple;
public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {
- public PlanProjectOperator(int[] fields, String name, TypeInformation<T> inType, TypeInformation<R> outType, ExecutionConfig executionConfig) {
- super(new MapProjector<T, R>(fields, outType.createSerializer(executionConfig).createInstance()), new UnaryOperatorInformation<T, R>(inType, outType), name);
+ public PlanProjectOperator(int[] fields, String name,
+ TypeInformation<T> inType, TypeInformation<R> outType,
+ ExecutionConfig executionConfig)
+ {
+ super(PlanProjectOperator.<T, R, Tuple>createTypedProjector(fields), new UnaryOperatorInformation<T, R>(inType, outType), name);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T, R extends Tuple, X extends Tuple> MapFunction<T, R> createTypedProjector(int[] fields) {
+ return (MapFunction<T, R>) new MapProjector<X, R>(fields);
}
- public static final class MapProjector<T, R extends Tuple>
- extends AbstractRichFunction
- implements MapFunction<T, R>
+
+ public static final class MapProjector<T extends Tuple, R extends Tuple>
+ extends AbstractRichFunction implements MapFunction<T, R>
{
private static final long serialVersionUID = 1L;
private final int[] fields;
- private final R outTuple;
+ private final Tuple outTuple;
- private MapProjector(int[] fields, R outTupleInstance) {
+ private MapProjector(int[] fields) {
this.fields = fields;
- this.outTuple = outTupleInstance;
+ try {
+ this.outTuple = Tuple.getTupleClass(fields.length).newInstance();
+ }
+ catch (Exception e) {
+ // this should never happen
+ throw new RuntimeException(e);
+ }
}
// TODO We should use code generation for this.
+ @SuppressWarnings("unchecked")
@Override
- public R map(T inTuple) throws Exception {
-
- for(int i=0; i<fields.length; i++) {
- outTuple.setField(((Tuple)inTuple).getField(fields[i]), i);
+ public R map(Tuple inTuple) throws Exception {
+ for (int i = 0; i < fields.length; i++) {
+ outTuple.setField(inTuple.getField(fields[i]), i);
}
- return outTuple;
+
+ return (R) outTuple;
}
}
}
[4/5] flink git commit: [FLINK-1418] [apis] Minor cleanups for eager
on-client print() statement.
Posted by se...@apache.org.
[FLINK-1418] [apis] Minor cleanups for eager on-client print() statement.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6220f34b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6220f34b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6220f34b
Branch: refs/heads/master
Commit: 6220f34b626e9ed92b190042e0d69020e19461f1
Parents: 78d954b
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 21 13:49:16 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 14:50:32 2015 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 51 ++++++++++++--------
.../flink/api/java/ExecutionEnvironment.java | 10 ++--
.../org/apache/flink/api/scala/DataSet.scala | 41 ++++++++++------
.../flink/api/scala/ExecutionEnvironment.scala | 6 +--
4 files changed, 66 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 133a083..157c666 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -338,7 +338,7 @@ public abstract class DataSet<T> {
*
* @see org.apache.flink.api.java.operators.AggregateOperator
*/
- public AggregateOperator<T> sum (int field) {
+ public AggregateOperator<T> sum(int field) {
return aggregate(Aggregations.SUM, field);
}
@@ -997,7 +997,7 @@ public abstract class DataSet<T> {
* <pre>
* {@code
* DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
- * initialState.iterateDelta(initialFeedbakSet, 100, 0);
+ * initialState.iterateDelta(initialFeedbackSet, 100, 0);
*
* DataSet<Tuple2<Long, Long>> delta = iteration.groupBy(0).aggregate(Aggregations.AVG, 1)
* .join(iteration.getSolutionSet()).where(0).equalTo(0)
@@ -1236,7 +1236,7 @@ public abstract class DataSet<T> {
* @see TextOutputFormat
*/
public DataSink<String> writeAsFormattedText(String filePath, TextFormatter<T> formatter) {
- return this.map(new FormattingMapper<T>(formatter)).writeAsText(filePath);
+ return map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath);
}
/**
@@ -1250,8 +1250,8 @@ public abstract class DataSet<T> {
*
* @see TextOutputFormat
*/
- public DataSink<String> writeAsFormattedText(String filePath, WriteMode writeMode, final TextFormatter<T> formatter) {
- return this.map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath, writeMode);
+ public DataSink<String> writeAsFormattedText(String filePath, WriteMode writeMode, TextFormatter<T> formatter) {
+ return map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath, writeMode);
}
/**
@@ -1333,11 +1333,16 @@ public abstract class DataSet<T> {
}
/**
- * Writes a DataSet to the standard output stream (stdout).<br/>
- * For each element of the DataSet the result of {@link Object#toString()} is written.
- * This triggers execute() automatically.
+ * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
+ * the print() method. For programs that are executed in a cluster, this method needs
+ * to gather the contents of the DataSet back to the client, to print it there.
+ *
+ * <p>The string written for each element is defined by the {@link Object#toString()} method.</p>
+ *
+ * <p>This method immediately triggers the program execution, similar to the
+ * {@link #collect()} and {@link #count()} methods.</p>
*/
- public void print() throws Exception{
+ public void print() throws Exception {
List<T> elements = this.collect();
for (T e: elements) {
System.out.println(e);
@@ -1345,6 +1350,23 @@ public abstract class DataSet<T> {
}
/**
+ * Prints the elements in a DataSet to the standard error stream {@link System#err} of the JVM that calls
+ * the print() method. For programs that are executed in a cluster, this method needs
+ * to gather the contents of the DataSet back to the client, to print it there.
+ *
+ * <p>The string written for each element is defined by the {@link Object#toString()} method.</p>
+ *
+ * <p>This method immediately triggers the program execution, similar to the
+ * {@link #collect()} and {@link #count()} methods.</p>
+ */
+ public void printToErr() throws Exception {
+ List<T> elements = this.collect();
+ for (T e: elements) {
+ System.err.println(e);
+ }
+ }
+
+ /**
* Writes a DataSet to the standard output stream (stdout).<br/>
* For each element of the DataSet the result of {@link Object#toString()} is written.
*
@@ -1354,17 +1376,6 @@ public abstract class DataSet<T> {
public DataSink<T> print(String sinkIdentifier) {
return output(new PrintingOutputFormat<T>(sinkIdentifier, false));
}
-
- /**
- * Writes a DataSet to the standard error stream (stderr).<br/>
- * For each element of the DataSet the result of {@link Object#toString()} is written.
- */
- public void printToErr() throws Exception{
- List<T> elements = this.collect();
- for (T e: elements) {
- System.err.println(e);
- }
- }
/**
* Writes a DataSet to the standard error stream (stderr).<br/>
http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 0f65b79..9c76409 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -94,9 +94,6 @@ import com.google.common.base.Preconditions;
*/
public abstract class ExecutionEnvironment {
-
- protected JobExecutionResult lastJobExecutionResult;
-
private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
/** The environment of the context (local by default, cluster if invoked through command line) */
@@ -116,8 +113,11 @@ public abstract class ExecutionEnvironment {
private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
- private ExecutionConfig config = new ExecutionConfig();
+ private final ExecutionConfig config = new ExecutionConfig();
+ /** Result from the latest execution, to be make it retrievable when using eager execution methods */
+ protected JobExecutionResult lastJobExecutionResult;
+
/** Flag to indicate whether sinks have been cleared in previous executions */
private boolean wasExecuted = false;
@@ -240,6 +240,8 @@ public abstract class ExecutionEnvironment {
/**
* Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
+ *
+ * @return The execution result from the latest job execution.
*/
public JobExecutionResult getLastJobExecutionResult(){
return this.lastJobExecutionResult;
http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 5198157..e283e95 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1319,16 +1319,37 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
def output(outputFormat: OutputFormat[T]): DataSink[T] = {
javaSet.output(outputFormat)
}
-
+
/**
- * Writes a DataSet to the standard output stream (stdout). This uses [[AnyRef.toString]] on
- * each element.
- * This triggers execute() automatically.
+ * Prints the elements in a DataSet to the standard output stream [[System.out]] of the
+ * JVM that calls the print() method. For programs that are executed in a cluster, this
+ * method needs to gather the contents of the DataSet back to the client, to print it
+ * there.
+ *
+ * The string written for each element is defined by the [[AnyRef.toString]] method.
+ *
+ * This method immediately triggers the program execution, similar to the
+ * [[collect()]] and [[count()]] methods.
*/
- def print() = {
+ def print(): Unit = {
javaSet.print()
}
-
+
+ /**
+ * Prints the elements in a DataSet to the standard error stream [[System.err]] of the
+ * JVM that calls the print() method. For programs that are executed in a cluster, this
+ * method needs to gather the contents of the DataSet back to the client, to print it
+ * there.
+ *
+ * The string written for each element is defined by the [[AnyRef.toString]] method.
+ *
+ * This method immediately triggers the program execution, similar to the
+ * [[collect()]] and [[count()]] methods.
+ */
+ def printToErr(): Unit = {
+ javaSet.printToErr()
+ }
+
/**
* *
* Writes a DataSet to the standard output stream (stdout) with a sink identifier prefixed.
@@ -1340,14 +1361,6 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
}
/**
- * Writes a DataSet to the standard error stream (stderr). This uses [[AnyRef.toString]] on
- * each element.
- */
- def printToErr() = {
- javaSet.printToErr()
- }
-
- /**
* Writes a DataSet to the standard error stream (stderr) with a sink identifier prefixed.
* This uses [[AnyRef.toString]] on each element.
* @param sinkIdentifier The string to prefix the output with.
http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 6cc327a..e01ff3b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -130,11 +130,9 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
def getId: UUID = {
javaEnv.getId
}
-
-
+
/**
- * retrieves JobExecutionResult from last job execution (for "eager" print)
- * @return JobExecutionResult form last job execution
+ * Gets the JobExecutionResult of the last executed job.
*/
def getLastJobExecutionResult = javaEnv.getLastJobExecutionResult