You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/21 15:40:00 UTC

[1/5] flink git commit: [FLINK-1418] [apis] Fix eager print() and adjust all tests and examples to not fail due to "eager" print method

Repository: flink
Updated Branches:
  refs/heads/master 939e3fc40 -> ad1d9362c


http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
index ff429b8..65b9756 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
@@ -55,7 +56,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
 			
 			input1
 				.join(input2, JoinHint.REPARTITION_HASH_FIRST).where(1).equalTo(0).withPartitioner(partitioner)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -114,7 +115,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
 			input1
 				.join(input2, JoinHint.REPARTITION_HASH_FIRST)
 				.where("b").equalTo("a").withPartitioner(partitioner)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Pojo2, Pojo3>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -176,7 +177,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
 				.where(new Pojo2KeySelector())
 				.equalTo(new Pojo3KeySelector())
 				.withPartitioner(partitioner)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Pojo2, Pojo3>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -248,7 +249,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
 			grouped
 				.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
 				.with(new DummyFlatJoinFunction<Tuple3<Long,Long,Long>>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
index ab83dba..25b17f8 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer.java;
 
 import static org.junit.Assert.fail;
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
@@ -54,7 +55,7 @@ public class DeltaIterationDependenciesTest extends CompilerTestBase {
 
 			DataSet<Tuple2<Long, Long>> result = deltaIteration.closeWith(delta, nextWorkset);
 
-			result.print();
+			result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
index 96758b1..e5b6ad5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer.java;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
@@ -48,7 +49,7 @@ public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
 			data.distinct(0)
 				.groupBy(0)
 				.sum(1)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -84,7 +85,7 @@ public class DistinctAndGroupingOptimizerTest extends CompilerTestBase {
 			data.distinct(1)
 				.groupBy(0)
 				.sum(1)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index 8fb4ef0..6b49dd4 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -51,7 +52,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
 				public void reduce(Iterable<Double> values, Collector<Double> out) {}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Double>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -97,7 +98,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
-			reduced.print();
+			reduced.output(new DiscardingOutputFormat<Long>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -148,7 +149,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
 				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -199,7 +200,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
-			reduced.print();
+			reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -257,7 +258,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
 				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -317,7 +318,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
-			reduced.print();
+			reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
index 796d4ab..bcfb2ef 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -48,7 +49,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 			env.setParallelism(43);
 			
 			IterativeDataSet<Long> iteration = env.generateSequence(-4, 1000).iterate(100);
-			iteration.closeWith(iteration).print();
+			iteration.closeWith(iteration).output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -76,7 +77,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 					
 			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = input.iterateDelta(input, 100, 0);
 			iter.closeWith(iter.getWorkset(), iter.getWorkset())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -99,7 +100,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 			
 			iteration.closeWith(
 					iteration.map(new IdentityMapper<Long>()).union(iteration.map(new IdentityMapper<Long>())))
-					.print();
+					.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -150,7 +151,7 @@ public class IterationCompilerTest extends CompilerTestBase {
 				.union(
 						iter.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>()))
 				)
-			.print();
+			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
index 14d863d..b3718b0 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
@@ -131,7 +133,7 @@ public class JoinTranslationTest extends CompilerTestBase {
 		DataSet<Long> i1 = env.generateSequence(1, 1000);
 		DataSet<Long> i2 = env.generateSequence(1, 1000);
 		
-		i1.join(i2, hint).where(new IdentityKeySelector<Long>()).equalTo(new IdentityKeySelector<Long>()).print();
+		i1.join(i2, hint).where(new IdentityKeySelector<Long>()).equalTo(new IdentityKeySelector<Long>()).output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 		
 		Plan plan = env.createProgramPlan();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
index 3f18e62..e1e6b5f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -45,7 +46,7 @@ public class OpenIterationTest extends CompilerTestBase {
 			
 			DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>());
 			
-			mapped.print();
+			mapped.output(new DiscardingOutputFormat<Long>());
 			
 			try {
 				env.createProgramPlan();
@@ -72,9 +73,9 @@ public class OpenIterationTest extends CompilerTestBase {
 			
 			DataSet<Long> mapped = iteration.map(new IdentityMapper<Long>());
 			
-			iteration.closeWith(mapped).print();
+			iteration.closeWith(mapped).output(new DiscardingOutputFormat<Long>());
 			
-			mapped.print();
+			mapped.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			
@@ -104,7 +105,7 @@ public class OpenIterationTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>());
 			
-			mapped.print();
+			mapped.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			try {
 				env.createProgramPlan();
@@ -132,7 +133,7 @@ public class OpenIterationTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> mapped = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
 			
-			mapped.print();
+			mapped.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			try {
 				env.createProgramPlan();
@@ -164,7 +165,7 @@ public class OpenIterationTest extends CompilerTestBase {
 												.where(0).equalTo(0).projectFirst(1).projectSecond(0);
 			
 			iteration.closeWith(joined, joined)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
index 9c2d0d2..7f5c209 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -49,8 +50,8 @@ public class PartitionOperatorTest extends CompilerTestBase {
 					public int partition(Long key, int numPartitions) { return key.intValue(); }
 				}, 1)
 				.groupBy(1)
-				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
-				.print();
+					.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index 2958f1a..942aa47 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
@@ -53,7 +54,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return value1 + value2;
 				}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Double>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -98,7 +99,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return value1 + value2;
 				}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Long>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -151,7 +152,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return null;
 				}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -211,7 +212,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return null;
 				}
 			}).name("reducer")
-			.print();
+			.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
index 46eb48a..3d6d90b 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
@@ -294,7 +295,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 				joinedWithSolutionSet;
 		
 		iter.closeWith(nextSolutionSet, nextWorkset)
-			.print();
+			.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		
 		return env.createProgramPlan();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
index fb7a80f..b23bf35 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.Optimizer;
@@ -48,7 +49,7 @@ public class CoGroupOnConflictingPartitioningsTest extends CompilerTestBase {
 			input.coGroup(input).where(0).equalTo(0)
 				.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
 				.withParameters(cfg)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
 			
 			Plan p = env.createProgramPlan();
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
index 8a4786f..a4e520b 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.Optimizer;
@@ -46,7 +47,7 @@ public class JoinOnConflictingPartitioningsTest extends CompilerTestBase {
 			
 			input.join(input).where(0).equalTo(0)
 				.withParameters(cfg)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
 			
 			Plan p = env.createProgramPlan();
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index efa1e88..6cc327a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -131,6 +131,13 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     javaEnv.getId
   }
 
+
+  /**
+   * retrieves JobExecutionResult from last job execution (for "eager" print)
+   * @return JobExecutionResult form last job execution
+   */
+  def getLastJobExecutionResult = javaEnv.getLastJobExecutionResult
+
   /**
    * Gets the UUID by which this environment is identified, as a string.
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
index a1bd2e0..b4a27b6 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
@@ -19,6 +19,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.operators.AggregateOperator;
 import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
 import org.apache.flink.api.java.operators.CrossOperator.ProjectCross;
@@ -296,7 +297,7 @@ public abstract class PlanBinder<INFO extends OperationInfo> {
 		int parentID = (Integer) receiver.getRecord(true);
 		DataSet parent = (DataSet) sets.get(parentID);
 		boolean toError = (Boolean) receiver.getRecord();
-		(toError ? parent.printToErr() : parent.print()).name("PrintSink");
+		parent.output(new PrintingOutputFormat(toError));
 	}
 
 	private void createBroadcastVariable() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
index d0b0164..018daf8 100644
--- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
+++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
@@ -59,7 +60,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
 				DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
 				DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
 				
-				result.print();
+				result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			}
 			
 			Plan p = env.createProgramPlan("Spargel Connected Components");
@@ -134,7 +135,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
 				
 				DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(vcIter);
 				
-				result.print();
+				result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			}
 			
 			Plan p = env.createProgramPlan("Spargel Connected Components");

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
index 2840914..7189bbe 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java
@@ -25,8 +25,6 @@ import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 
 public class CollectionTestEnvironment extends CollectionEnvironment {
 
-	protected JobExecutionResult latestResult;
-
 	@Override
 	public JobExecutionResult execute() throws Exception {
 		return execute("test job");
@@ -35,7 +33,7 @@ public class CollectionTestEnvironment extends CollectionEnvironment {
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		JobExecutionResult result = super.execute(jobName);
-		this.latestResult = result;
+		this.lastJobExecutionResult = result;
 		return result;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 2214000..e639c80 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -119,7 +119,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 				// call the test program
 				try {
 					testProgram();
-					this.latestExecutionResult = env.latestResult;
+					this.latestExecutionResult = env.getLastJobExecutionResult();
 				}
 				catch (Exception e) {
 					System.err.println(e.getMessage());
@@ -171,7 +171,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 				// call the test program
 				try {
 					testProgram();
-					this.latestExecutionResult = env.latestResult;
+					this.latestExecutionResult = env.getLastJobExecutionResult();
 				}
 				catch (Exception e) {
 					System.err.println(e.getMessage());
@@ -224,7 +224,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 		// call the test program
 		try {
 			testProgram();
-			this.latestExecutionResult = env.latestResult;
+			this.latestExecutionResult = env.getLastJobExecutionResult();
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index cf1caeb..25f2c83 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -36,9 +36,6 @@ public class TestEnvironment extends ExecutionEnvironment {
 
 	private final ForkableFlinkMiniCluster executor;
 
-	protected JobExecutionResult latestResult;
-
-
 	public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
 		this.executor = executor;
 		setParallelism(parallelism);
@@ -54,8 +51,8 @@ public class TestEnvironment extends ExecutionEnvironment {
 			
 			SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false);
 
-			this.latestResult = result.toJobExecutionResult(getClass().getClassLoader());
-			return this.latestResult;
+			this.lastJobExecutionResult = result.toJobExecutionResult(getClass().getClassLoader());
+			return this.lastJobExecutionResult;
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
index 27c1644..aea448f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.operators.DriverStrategy;
@@ -58,8 +59,8 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase {
 			DataSet<Tuple2<Long, Double>> result = constructPlan(inputData, 10);
 			
 			// add two sinks, to test the case of branching after an iteration
-			result.print();
-			result.print();
+			result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
+			result.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
 		
 			Plan p = env.createProgramPlan();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
index 4edd68e..a3b7572 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
@@ -23,6 +23,7 @@ import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
 import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -85,7 +86,7 @@ public class PageRankCompilerTest extends CompilerTestBase{
 					// termination condition
 					.filter(new EpsilonFilter()));
 	
-			finalPageRanks.print();
+			finalPageRanks.output(new DiscardingOutputFormat<Tuple2<Long, Double>>());
 	
 			// get the plan and compile it
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
index 25cc089..2775d09 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.junit.Test
 import org.apache.flink.api.common.InvalidProgramException
 
@@ -37,7 +38,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int, String)])
   }
 
   @Test
@@ -51,7 +52,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
   }
 
   @Test(expected = classOf[InvalidProgramException])
@@ -65,7 +66,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
   }
 
   @Test(expected = classOf[InvalidProgramException])
@@ -79,7 +80,8 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()  }
+    iteration.output(new DiscardingOutputFormat[(Int,String)])  
+  }
 
   @Test(expected = classOf[InvalidProgramException])
   def testIncorrectJoinWithSolution3(): Unit = {
@@ -92,7 +94,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
    }
 
   @Test
@@ -106,7 +108,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
   }
 
   @Test
@@ -120,7 +122,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
   }
 
   @Test(expected = classOf[InvalidProgramException])
@@ -134,7 +136,7 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
   }
 
   @Test(expected = classOf[InvalidProgramException])
@@ -148,7 +150,8 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()  }
+    iteration.output(new DiscardingOutputFormat[(Int,String)])  
+  }
 
   @Test(expected = classOf[InvalidProgramException])
   def testIncorrectCoGroupWithSolution3(): Unit = {
@@ -161,6 +164,6 @@ class DeltaIterationSanityCheckTest extends Serializable {
       (result, ws)
     }
 
-    iteration.print()
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
index 3fefa01..97a0f87 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.compiler
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Test
 import org.junit.Assert._
@@ -38,8 +39,8 @@ class PartitionOperatorTranslationTest extends CompilerTestBase {
           def partition(key: Long, numPartitions: Int): Int = key.intValue()
         }, 1)
         .groupBy(1).reduceGroup( x => x)
-        .print()
-      
+        .output(new DiscardingOutputFormat[Iterator[(Long, Long)]])
+
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
       

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
index eecc347..cc2c81e 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.scala.functions
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.junit.Assert._
 import org.apache.flink.api.common.functions.RichJoinFunction
 import org.apache.flink.api.common.functions.RichMapFunction
@@ -46,7 +47,8 @@ class SemanticPropertiesTranslationTest {
       val env = ExecutionEnvironment.getExecutionEnvironment
 
       val input = env.fromElements((3L, "test", 42))
-      input.map(new WildcardForwardMapper[(Long, String, Int)]).print()
+      input.map(new WildcardForwardMapper[(Long, String, Int)])
+        .output(new DiscardingOutputFormat[(Long, String, Int)])
 
       val plan = env.createProgramPlan()
 
@@ -83,7 +85,8 @@ class SemanticPropertiesTranslationTest {
       val env = ExecutionEnvironment.getExecutionEnvironment
 
       val input = env.fromElements((3L, "test", 42))
-      input.map(new IndividualForwardMapper[Long, String, Int]).print()
+      input.map(new IndividualForwardMapper[Long, String, Int])
+        .output(new DiscardingOutputFormat[(Long, String, Int)])
 
       val plan = env.createProgramPlan()
 
@@ -120,7 +123,8 @@ class SemanticPropertiesTranslationTest {
       val env = ExecutionEnvironment.getExecutionEnvironment
 
       val input = env.fromElements((3L, "test", 42))
-      input.map(new FieldTwoForwardMapper[Long, String, Int]).print()
+      input.map(new FieldTwoForwardMapper[Long, String, Int])
+        .output(new DiscardingOutputFormat[(Long, String, Int)])
 
       val plan = env.createProgramPlan()
 
@@ -160,7 +164,8 @@ class SemanticPropertiesTranslationTest {
       val input2 = env.fromElements((3L, 3.1415))
 
       input1.join(input2).where(0).equalTo(0)(
-        new ForwardingTupleJoin[Long, String, Long, Double]).print()
+        new ForwardingTupleJoin[Long, String, Long, Double])
+        .output(new DiscardingOutputFormat[(String, Long)])
 
       val plan = env.createProgramPlan()
       val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
@@ -204,7 +209,8 @@ class SemanticPropertiesTranslationTest {
       val input2 = env.fromElements((3L, 42))
 
       input1.join(input2).where(0).equalTo(0)(
-        new ForwardingBasicJoin[(Long, String), (Long, Int)]).print()
+        new ForwardingBasicJoin[(Long, String), (Long, Int)])
+        .output(new DiscardingOutputFormat[((Long, String), (Long, Int))])
 
       val plan = env.createProgramPlan()
       val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
index 425cff6..6babbe7 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.operators.GenericDataSourceBase
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
 
 import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.scala._
 import org.junit.Assert.{assertEquals, assertTrue, fail}
 import org.junit.Test
@@ -37,7 +38,8 @@ class AggregateTranslationTest {
 
       val initialData = env.fromElements((3.141592, "foobar", 77L))
 
-      initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).print()
+      initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2)
+        .output(new DiscardingOutputFormat[(Double, String, Long)])
 
       val p: Plan = env.createProgramPlan()
       val sink = p.getDataSinks.iterator.next
@@ -55,6 +57,7 @@ class AggregateTranslationTest {
         System.err.println(e.getMessage)
         e.printStackTrace()
         fail("Test caused an error: " + e.getMessage)
+
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
index 8d75f2e..4d85c58 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Assert._
 import org.junit.Test
@@ -46,7 +47,7 @@ class CoGroupCustomPartitioningTest extends CompilerTestBase {
           .coGroup(input2)
           .where(1).equalTo(0)
           .withPartitioner(partitioner)
-        .print()
+        .output(new DiscardingOutputFormat[(Array[(Long, Long)], Array[(Long, Long, Long)])])
       
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -110,7 +111,7 @@ class CoGroupCustomPartitioningTest extends CompilerTestBase {
           .coGroup(input2)
           .where("b").equalTo("a")
           .withPartitioner(partitioner)
-        .print()
+        .output(new DiscardingOutputFormat[(Array[Pojo2], Array[Pojo3])])
         
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -174,7 +175,7 @@ class CoGroupCustomPartitioningTest extends CompilerTestBase {
           .coGroup(input2)
           .where( _.a ).equalTo( _.b )
           .withPartitioner(partitioner)
-        .print()
+        .output(new DiscardingOutputFormat[(Array[Pojo2], Array[Pojo3])])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
index 8d816ee..a0f93dd 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Assert._
 import org.junit.Test
@@ -49,7 +50,7 @@ class CoGroupGroupSortTranslationTest {
           .sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING) {
                (first, second) => first.buffered.head
             }
-        .print()
+        .output(new DiscardingOutputFormat[(Long, Long)])
         
       val p = env.createProgramPlan()
       
@@ -92,7 +93,7 @@ class CoGroupGroupSortTranslationTest {
           .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING) {
                (first, second) => first.buffered.head
             }
-          .print()
+          .output(new DiscardingOutputFormat[(Long, Long)])
           
       val p = env.createProgramPlan()
       

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
index 395f36a..f81cb84 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Assert._
 import org.junit.Test
@@ -40,7 +41,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
       data
         .groupBy( _._1 ).withPartitioner(new TestPartitionerInt())
         .reduce( (a,b) => a )
-        .print()
+        .output(new DiscardingOutputFormat[(Int, Int)])
 
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -73,7 +74,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
       data
         .groupBy( _._1 ).withPartitioner(new TestPartitionerInt())
         .reduce( (a, b) => a)
-        .print()
+        .output(new DiscardingOutputFormat[(Int, Int)])
 
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -107,7 +108,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
         .withPartitioner(new TestPartitionerInt())
         .sortGroup(1, Order.ASCENDING)
         .reduce( (a,b) => a)
-        .print()
+        .output(new DiscardingOutputFormat[(Int, Int, Int)])
 
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -141,7 +142,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
         .withPartitioner(new TestPartitionerInt())
         .sortGroup(_._2, Order.ASCENDING)
         .reduce( (a,b) => a)
-        .print()
+        .output(new DiscardingOutputFormat[(Int, Int, Int)])
 
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -175,7 +176,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase {
         .sortGroup(1, Order.ASCENDING)
         .sortGroup(2, Order.DESCENDING)
         .reduce( (a,b) => a)
-        .print()
+        .output(new DiscardingOutputFormat[(Int, Int, Int, Int)])
 
       val p = env.createProgramPlan()
       val op = compileNoStats(p)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
index a02d2af..6e40ea5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Assert._
 import org.junit.Test
@@ -41,7 +42,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase {
       data
           .groupBy("a").withPartitioner(new TestPartitionerInt())
           .reduce( (a,b) => a )
-          .print()
+          .output(new DiscardingOutputFormat[Pojo2])
       
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -72,7 +73,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase {
       data
           .groupBy("a").withPartitioner(new TestPartitionerInt())
           .reduceGroup( iter => Seq(iter.next) )
-          .print()
+          .output(new DiscardingOutputFormat[Seq[Pojo2]])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -102,7 +103,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase {
           .groupBy("a").withPartitioner(new TestPartitionerInt())
           .sortGroup("b", Order.ASCENDING)
           .reduceGroup( iter => Seq(iter.next) )
-          .print()
+          .output(new DiscardingOutputFormat[Seq[Pojo3]])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -133,7 +134,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase {
           .sortGroup("b", Order.ASCENDING)
           .sortGroup("c", Order.DESCENDING)
           .reduceGroup( iter => Seq(iter.next) )
-          .print()
+          .output(new DiscardingOutputFormat[Seq[Pojo4]])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
index 25efe48..b103e9c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.junit.Assert._
 import org.junit.Test
 
@@ -42,7 +43,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
       data.groupBy(0)
           .withPartitioner(new TestPartitionerInt())
           .sum(1)
-          .print()
+          .output(new DiscardingOutputFormat[(Int, Int)])
       
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -73,7 +74,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
       data
           .groupBy(0).withPartitioner(new TestPartitionerInt())
           .reduce( (a,b) => a )
-          .print()
+          .output(new DiscardingOutputFormat[(Int, Int)])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -104,7 +105,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
       data
           .groupBy(0).withPartitioner(new TestPartitionerInt())
           .reduceGroup( iter => Seq(iter.next) )
-          .print()
+          .output(new DiscardingOutputFormat[Seq[(Int, Int)]])
       
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -134,7 +135,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
           .groupBy(0).withPartitioner(new TestPartitionerInt())
           .sortGroup(1, Order.ASCENDING)
           .reduceGroup( iter => Seq(iter.next) )
-          .print()
+          .output(new DiscardingOutputFormat[Seq[(Int, Int, Int)]])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -165,7 +166,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase {
           .sortGroup(1, Order.ASCENDING)
           .sortGroup(2, Order.DESCENDING)
           .reduceGroup( iter => Seq(iter.next) )
-          .print()
+          .output(new DiscardingOutputFormat[Seq[(Int, Int, Int, Int)]])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
index fe30376..7ebf378 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.scala._
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Test
@@ -43,7 +44,7 @@ class CustomPartitioningTest extends CompilerTestBase {
       
       data.partitionCustom(part, 0)
           .mapPartition( x => x )
-          .print()
+          .output(new DiscardingOutputFormat[(Int, Int)])
 
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -113,7 +114,7 @@ class CustomPartitioningTest extends CompilerTestBase {
       data
           .partitionCustom(part, "a")
           .mapPartition( x => x)
-          .print()
+          .output(new DiscardingOutputFormat[Pojo])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -184,7 +185,7 @@ class CustomPartitioningTest extends CompilerTestBase {
       data
           .partitionCustom(part, pojo => pojo.a)
           .mapPartition( x => x)
-          .print()
+          .output(new DiscardingOutputFormat[Pojo])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
index 6aa4d75..9a400c5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.scala.operators.translation
 import org.apache.flink.api.common.functions.{RichCoGroupFunction, RichMapFunction,
 RichJoinFunction}
 import org.apache.flink.api.common.operators.GenericDataSinkBase
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.operators.translation.WrappingFunction
 import org.junit.Assert.assertArrayEquals
 import org.junit.Assert.assertEquals
@@ -67,7 +68,7 @@ class DeltaIterationTranslationTest {
         .setParallelism(ITERATION_PARALLELISM)
         .registerAggregator(AGGREGATOR_NAME, new LongSumAggregator)
 
-      result.print()
+      result.output(new DiscardingOutputFormat[(Double, Long, String)])
       result.writeAsText("/dev/null")
 
       val p: Plan = env.createProgramPlan(JOB_NAME)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
index 7836400..c540f61 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.api.scala.operators.translation
 
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.junit.Assert
 import org.junit.Test
 
@@ -31,7 +32,7 @@ class DistinctTranslationTest {
       val input = env.fromElements("1", "2", "1", "3")
 
       val op = input.distinct { x => x}
-      op.print()
+      op.output(new DiscardingOutputFormat[String])
 
       val p = env.createProgramPlan()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
index 2467596..eae3db1 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.optimizer.util.CompilerTestBase
 import org.junit.Assert._
 import org.junit.Test
@@ -46,7 +47,7 @@ class JoinCustomPartitioningTest extends CompilerTestBase {
           .join(input2, JoinHint.REPARTITION_HASH_FIRST)
           .where(1).equalTo(0)
           .withPartitioner(partitioner)
-        .print()
+        .output(new DiscardingOutputFormat[((Long, Long), (Long, Long, Long))])
       
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -110,7 +111,7 @@ class JoinCustomPartitioningTest extends CompilerTestBase {
           .join(input2, JoinHint.REPARTITION_HASH_FIRST)
           .where("b").equalTo("a")
           .withPartitioner(partitioner)
-        .print()
+        .output(new DiscardingOutputFormat[(Pojo2, Pojo3)])
         
       val p = env.createProgramPlan()
       val op = compileNoStats(p)
@@ -174,7 +175,7 @@ class JoinCustomPartitioningTest extends CompilerTestBase {
           .join(input2, JoinHint.REPARTITION_HASH_FIRST)
           .where( _.a ).equalTo( _.b )
           .withPartitioner(partitioner)
-        .print()
+        .output(new DiscardingOutputFormat[(Pojo2, Pojo3)])
           
       val p = env.createProgramPlan()
       val op = compileNoStats(p)

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
index e97fc21..5d3878c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.api.scala.operators.translation
 
 import org.apache.flink.api.common.operators.{GenericDataSourceBase, GenericDataSinkBase}
+import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.operators.translation.{KeyExtractingMapper,
 PlanUnwrappingReduceOperator}
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
@@ -39,7 +40,8 @@ class ReduceTranslationTest {
       val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
 
 
-      initialData reduce { (v1, v2) => v1 } print()
+      initialData reduce { (v1, v2) => v1 } output(
+        new DiscardingOutputFormat[(Double, String, Long)])
 
       val p = env.createProgramPlan(
 
@@ -70,7 +72,8 @@ class ReduceTranslationTest {
 
       val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
 
-      initialData.groupBy(2) reduce { (v1, v2) => v1 } print()
+      initialData.groupBy(2) reduce { (v1, v2) => v1 } output(
+        new DiscardingOutputFormat[(Double, String, Long)])
 
       val p = env.createProgramPlan()
 
@@ -99,7 +102,8 @@ class ReduceTranslationTest {
 
       val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
 
-      initialData.groupBy { _._2 }. reduce { (v1, v2) => v1 } setParallelism(4) print()
+      initialData.groupBy { _._2 }. reduce { (v1, v2) => v1 } setParallelism(4) output(
+        new DiscardingOutputFormat[(Double, String, Long)])
 
       val p = env.createProgramPlan()
       val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 07300da..e22e0ef 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner;
 import org.apache.hadoop.fs.Path;
@@ -445,13 +446,32 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		yc.init(yarnConfiguration);
 		yc.start();
 
+		// get temporary folder for writing output of wordcount example
+		File tmpOutFolder = null;
+		try{
+			tmpOutFolder = tmp.newFolder();
+		}
+		catch(IOException e) {
+			throw new RuntimeException(e);
+		}
+
+		// get temporary file for reading input data for wordcount example
+		File tmpInFile = null;
+		try{
+			tmpInFile = tmp.newFile();
+			FileUtils.writeStringToFile(tmpInFile,WordCountData.TEXT);
+		}
+		catch(IOException e) {
+			throw new RuntimeException(e);
+		}
+
 		Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
 						"-yn", "1",
 						"-yjm", "512",
 						"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
 						"-ytm", "1024",
 						"-ys", "2", // test requesting slots from YARN.
-						"--yarndetached", job},
+						"--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()},
 				"The Job has been submitted with JobID",
 				RunTypes.CLI_FRONTEND);
 
@@ -490,19 +510,26 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			final ApplicationId id = tmpAppId;
 
 			// now it has finished.
-			// check the output.
-			File taskmanagerOut = YarnTestBase.findFile("..", new FilenameFilter() {
-				@Override
-				public boolean accept(File dir, String name) {
-					return name.contains("taskmanager") && name.contains("stdout") && dir.getAbsolutePath().contains(id.toString());
+			// check the output files.
+			File[] listOfOutputFiles = tmpOutFolder.listFiles();
+
+
+			Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles);
+			LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder );
+
+			// read all output files in output folder to one output string
+			String content = "";
+			for(File f:listOfOutputFiles)
+			{
+				if(f.isFile())
+				{
+					content += FileUtils.readFileToString(f) + "\n";
 				}
-			});
-			Assert.assertNotNull("Taskmanager output not found", taskmanagerOut);
-			LOG.info("The job has finished. TaskManager output file found {}", taskmanagerOut.getAbsolutePath());
-			String content = FileUtils.readFileToString(taskmanagerOut);
+			}
+			//String content = FileUtils.readFileToString(taskmanagerOut);
 			// check for some of the wordcount outputs.
-			Assert.assertTrue("Expected string '(all,2)' not found in string '"+content+"'", content.contains("(all,2)"));
-			Assert.assertTrue("Expected string '(mind,1)' not found in string'"+content+"'", content.contains("(mind,1)"));
+			Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)"));
+			Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)"));
 
 			// check if the heap size for the TaskManager was set correctly
 			File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() {


[3/5] flink git commit: [FLINK-1418] [apis] Change print() to print on the client and to eagerly execute the program.

Posted by se...@apache.org.
[FLINK-1418] [apis] Change print() to print on the client and to eagerly execute the program.

print() now uses collect() internally


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9c15620
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9c15620
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9c15620

Branch: refs/heads/master
Commit: e9c1562034dabc34fa46d4fd8411321db0a6c637
Parents: 939e3fc
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Apr 28 10:52:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 13:12:45 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java | 38 ++++++++++++--------
 .../flink/api/java/MultipleInvokationsTest.java |  2 +-
 .../optimizer/DistinctCompilationTest.java      |  6 ++--
 .../java/GroupReduceCompilationTest.java        | 12 +++----
 .../optimizer/java/ReduceCompilationTest.java   |  8 ++---
 .../org/apache/flink/api/scala/DataSet.scala    |  9 ++---
 6 files changed, 44 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 18cb01d..4f2942e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.api.java;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
@@ -50,7 +47,6 @@ import org.apache.flink.api.java.functions.SelectByMaxFunction;
 import org.apache.flink.api.java.functions.SelectByMinFunction;
 import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
 import org.apache.flink.api.java.operators.AggregateOperator;
@@ -86,8 +82,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.ExceptionUtils;
 
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * A DataSet represents a collection of elements of the same type.<br/>
@@ -1336,11 +1335,17 @@ public abstract class DataSet<T> {
 	/**
 	 * Writes a DataSet to the standard output stream (stdout).<br/>
 	 * For each element of the DataSet the result of {@link Object#toString()} is written.
-	 * 
-	 *  @return The DataSink that writes the DataSet.
+	 * This triggers execute() automatically.
 	 */
-	public DataSink<T> print() {
-		return output(new PrintingOutputFormat<T>(false));
+	public void print() {
+		try {
+			List<T> elements = this.collect();
+			for (T e: elements) {
+				System.out.println(e);
+			}
+		} catch (Exception e) {
+			System.out.println("Could not retrieve values for printing: " + ExceptionUtils.stringifyException(e));
+		}
 	}
 
 	/**
@@ -1357,11 +1362,16 @@ public abstract class DataSet<T> {
 	/**
 	 * Writes a DataSet to the standard error stream (stderr).<br/>
 	 * For each element of the DataSet the result of {@link Object#toString()} is written.
-	 * 
-	 * @return The DataSink that writes the DataSet.
 	 */
-	public DataSink<T> printToErr() {
-		return output(new PrintingOutputFormat<T>(true));
+	public void printToErr() {
+		try {
+			List<T> elements = this.collect();
+			for (T e: elements) {
+				System.err.println(e);
+			}
+		} catch (Exception e) {
+			System.err.println("Could not retrieve values for printing: " + e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
index 3638f70..c0ca6c2 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
@@ -36,7 +36,7 @@ public class MultipleInvokationsTest {
 			// ----------- Execution 1 ---------------
 			
 			DataSet<String> data = env.fromElements("Some", "test", "data").name("source1");
-			data.print().name("print1");
+			data.print();
 			data.output(new DiscardingOutputFormat<String>()).name("output1");
 			
 			{

http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 5827d9c..973f402 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -86,6 +85,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
+			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}
@@ -146,6 +146,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
+			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}
@@ -198,8 +199,9 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
+			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index 1bd4b8a..8fb4ef0 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -51,7 +51,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
 				public void reduce(Iterable<Double> values, Collector<Double> out) {}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -97,7 +97,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
-			reduced.print().name("sink");
+			reduced.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -148,7 +148,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
 				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -199,7 +199,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
-			reduced.print().name("sink");
+			reduced.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -257,7 +257,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
 				public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -317,7 +317,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			}).name("reducer");
 			
 			reduced.setCombinable(true);
-			reduced.print().name("sink");
+			reduced.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index 4197abb..2958f1a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -53,7 +53,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return value1 + value2;
 				}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -98,7 +98,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return value1 + value2;
 				}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -151,7 +151,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return null;
 				}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -211,7 +211,7 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 					return null;
 				}
 			}).name("reducer")
-			.print().name("sink");
+			.print();
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9c15620/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 15d2f4e..5198157 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1323,9 +1323,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   /**
    * Writes a DataSet to the standard output stream (stdout). This uses [[AnyRef.toString]] on
    * each element.
+   * This triggers execute() automatically.
    */
-  def print(): DataSink[T] = {
-    output(new PrintingOutputFormat[T](false))
+  def print() = {
+    javaSet.print()
   }
 
   /**
@@ -1342,8 +1343,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * Writes a DataSet to the standard error stream (stderr). This uses [[AnyRef.toString]] on
    * each element.
    */
-  def printToErr(): DataSink[T] = {
-    output(new PrintingOutputFormat[T](true))
+  def printToErr() = {
+    javaSet.printToErr()
   }
 
   /**


[2/5] flink git commit: [FLINK-1418] [apis] Fix eager print() and adjust all tests and examples to not fail due to "eager" print method

Posted by se...@apache.org.
[FLINK-1418] [apis] Fix eager print() and adjust all tests and examples to not fail due to "eager" print method

 - Add lastJobExecutionResult for getting the result of the last execution, when executing "eager" execution methods

This closes #699


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78d954b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78d954b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78d954b8

Branch: refs/heads/master
Commit: 78d954b858933f8f8ecc1ace01839b4e6080def1
Parents: e9c1562
Author: Nikolaas Steenbergen <Ni...@googlemail.com>
Authored: Tue May 5 10:49:54 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 13:12:45 2015 +0200

----------------------------------------------------------------------
 .../client/program/ContextEnvironment.java      |   4 +-
 .../java/graph/ConnectedComponents.java         |   6 +-
 .../examples/java/graph/EnumTrianglesBasic.java |   6 +-
 .../examples/java/graph/EnumTrianglesOpt.java   |   6 +-
 .../examples/java/graph/PageRankBasic.java      |   4 +-
 .../flink/examples/java/misc/PiEstimation.java  |   2 -
 .../examples/java/ml/LinearRegression.java      |   5 +-
 .../relational/EmptyFieldsCountAccumulator.java |   7 +-
 .../java/relational/WebLogAnalysis.java         |   6 +-
 .../examples/java/wordcount/PojoExample.java    |   6 +-
 .../examples/java/wordcount/WordCount.java      |   5 +-
 .../examples/scala/clustering/KMeans.scala      |   2 +-
 .../scala/graph/ConnectedComponents.scala       |   2 +-
 .../examples/scala/graph/DeltaPageRank.scala    |   1 -
 .../scala/graph/EnumTrianglesBasic.scala        |   5 +-
 .../examples/scala/graph/EnumTrianglesOpt.scala |   4 +-
 .../examples/scala/graph/PageRankBasic.scala    |   5 +-
 .../scala/graph/TransitiveClosureNaive.scala    |   2 +-
 .../examples/scala/misc/PiEstimation.scala      |   2 -
 .../examples/scala/ml/LinearRegression.scala    |   2 +-
 .../scala/relational/WebLogAnalysis.scala       |   2 +-
 .../examples/scala/wordcount/WordCount.scala    |   2 +-
 .../flink/api/java/CollectionEnvironment.java   |   3 +-
 .../java/org/apache/flink/api/java/DataSet.java |  26 ++---
 .../flink/api/java/ExecutionEnvironment.java    |  13 ++-
 .../apache/flink/api/java/LocalEnvironment.java |   3 +-
 .../flink/api/java/RemoteEnvironment.java       |   4 +-
 .../flink/api/java/MultipleInvokationsTest.java |   3 +-
 .../SemanticPropertiesProjectionTest.java       |  18 ++-
 .../SemanticPropertiesTranslationTest.java      |  31 +++---
 .../translation/AggregateTranslationTest.java   |   3 +-
 .../translation/CoGroupSortTranslationTest.java |   5 +-
 .../DeltaIterationTranslationTest.java          |   3 +-
 .../translation/DistinctTranslationTest.java    |  13 ++-
 .../translation/ReduceTranslationTests.java     |   7 +-
 .../optimizer/DistinctCompilationTest.java      |   1 +
 .../WorksetIterationCornerCasesTest.java        |  22 +++-
 ...naryCustomPartitioningCompatibilityTest.java |   5 +-
 .../CoGroupCustomPartitioningTest.java          |   9 +-
 ...ustomPartitioningGlobalOptimizationTest.java |   5 +-
 .../custompartition/CustomPartitioningTest.java |   7 +-
 .../GroupingKeySelectorTranslationTest.java     |   9 +-
 .../GroupingPojoTranslationTest.java            | 111 ++++++++++---------
 .../GroupingTupleTranslationTest.java           |  11 +-
 .../JoinCustomPartitioningTest.java             |   9 +-
 .../java/DeltaIterationDependenciesTest.java    |   3 +-
 .../java/DistinctAndGroupingOptimizerTest.java  |   5 +-
 .../java/GroupReduceCompilationTest.java        |  13 ++-
 .../optimizer/java/IterationCompilerTest.java   |   9 +-
 .../optimizer/java/JoinTranslationTest.java     |   4 +-
 .../flink/optimizer/java/OpenIterationTest.java |  13 ++-
 .../optimizer/java/PartitionOperatorTest.java   |   5 +-
 .../optimizer/java/ReduceCompilationTest.java   |   9 +-
 .../WorksetIterationsJavaApiCompilerTest.java   |   3 +-
 .../CoGroupOnConflictingPartitioningsTest.java  |   3 +-
 .../JoinOnConflictingPartitioningsTest.java     |   3 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |   7 ++
 .../api/java/common/PlanBinder.java             |   3 +-
 .../flink/spargel/java/SpargelCompilerTest.java |   5 +-
 .../test/util/CollectionTestEnvironment.java    |   4 +-
 .../flink/test/util/JavaProgramTestBase.java    |   6 +-
 .../apache/flink/test/util/TestEnvironment.java |   7 +-
 ...ultipleJoinsWithSolutionSetCompilerTest.java |   5 +-
 .../iterations/PageRankCompilerTest.java        |   3 +-
 .../scala/DeltaIterationSanityCheckTest.scala   |  23 ++--
 .../PartitionOperatorTranslationTest.scala      |   5 +-
 .../SemanticPropertiesTranslationTest.scala     |  16 ++-
 .../translation/AggregateTranslationTest.scala  |   5 +-
 .../CoGroupCustomPartitioningTest.scala         |   7 +-
 .../CoGroupGroupSortTranslationTest.scala       |   5 +-
 ...tomPartitioningGroupingKeySelectorTest.scala |  11 +-
 .../CustomPartitioningGroupingPojoTest.scala    |   9 +-
 .../CustomPartitioningGroupingTupleTest.scala   |  11 +-
 .../translation/CustomPartitioningTest.scala    |   7 +-
 .../DeltaIterationTranslationTest.scala         |   3 +-
 .../translation/DistinctTranslationTest.scala   |   3 +-
 .../JoinCustomPartitioningTest.scala            |   7 +-
 .../translation/ReduceTranslationTest.scala     |  10 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  51 +++++++--
 79 files changed, 396 insertions(+), 294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 35e5846..9287017 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -62,10 +62,12 @@ public class ContextEnvironment extends ExecutionEnvironment {
 
 		JobSubmissionResult result = this.client.run(toRun, getParallelism(), wait);
 		if(result instanceof JobExecutionResult) {
+			this.lastJobExecutionResult = (JobExecutionResult) result;
 			return (JobExecutionResult) result;
 		} else {
 			LOG.warn("The Client didn't return a JobExecutionResult");
-			return new JobExecutionResult(result.getJobID(), -1, null);
+			this.lastJobExecutionResult = new JobExecutionResult(result.getJobID(), -1, null);
+			return this.lastJobExecutionResult;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
index bd74b20..827bb25 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
@@ -111,12 +111,12 @@ public class ConnectedComponents implements ProgramDescription {
 		// emit result
 		if(fileOutput) {
 			result.writeAsCsv(outputPath, "\n", " ");
+
+			// execute program
+			env.execute("Connected Components Example");
 		} else {
 			result.print();
 		}
-		
-		// execute program
-		env.execute("Connected Components Example");
 	}
 	
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
index 5af60be..fdbe197 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
@@ -106,13 +106,13 @@ public class EnumTrianglesBasic {
 		// emit result
 		if(fileOutput) {
 			triangles.writeAsCsv(outputPath, "\n", ",");
+
+			// execute program
+			env.execute("Basic Triangle Enumeration Example");
 		} else {
 			triangles.print();
 		}
 
-		// execute program
-		env.execute("Basic Triangle Enumeration Example");
-
 	}
 	
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
index fb1e6f5..56b448e 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
@@ -121,12 +121,12 @@ public class EnumTrianglesOpt {
 		// emit result
 		if(fileOutput) {
 			triangles.writeAsCsv(outputPath, "\n", ",");
+			// execute program
+			env.execute("Triangle Enumeration Example");
 		} else {
 			triangles.print();
 		}
-		
-		// execute program
-		env.execute("Triangle Enumeration Example");
+
 		
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
index d622799..a374d0c 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
@@ -122,12 +122,12 @@ public class PageRankBasic {
 		// emit result
 		if(fileOutput) {
 			finalPageRanks.writeAsCsv(outputPath, "\n", " ");
+			// execute program
+			env.execute("Basic Page Rank Example");
 		} else {
 			finalPageRanks.print();
 		}
 
-		// execute program
-		env.execute("Basic Page Rank Example");
 		
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
index 83cf9d9..2780bb1 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
@@ -70,8 +70,6 @@ public class PiEstimation implements java.io.Serializable {
 
 		System.out.println("We estimate Pi to be:");
 		pi.print();
-
-		env.execute();
 	}
 
 	//*************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
index 86b7de2..46873f6 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
@@ -104,12 +104,13 @@ public class LinearRegression {
 		// emit result
 		if(fileOutput) {
 			result.writeAsText(outputPath);
+			// execute program
+			env.execute("Linear Regression example");
 		} else {
 			result.print();
 		}
 
-		// execute program
-		env.execute("Linear Regression example");
+
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
index 2016eaa..9f6f567 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
@@ -72,16 +72,17 @@ public class EmptyFieldsCountAccumulator {
 		final DataSet<Tuple> filteredLines = file.filter(new EmptyFieldFilter());
 
 		// Here, we could do further processing with the filtered lines...
-		
+		JobExecutionResult result = null;
 		// output the filtered lines
 		if (outputPath == null) {
 			filteredLines.print();
+			result = env.getLastJobExecutionResult();
 		} else {
 			filteredLines.writeAsCsv(outputPath);
+			// execute program
+			result = env.execute("Accumulator example");
 		}
 
-		// execute program
-		final JobExecutionResult result = env.execute("Accumulator example");
 
 		// get the accumulator result via its registration key
 		final List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
index 890af65..9425291 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java
@@ -140,13 +140,11 @@ public class WebLogAnalysis {
 		// emit result
 		if(fileOutput) {
 			result.writeAsCsv(outputPath, "\n", "|");
+			// execute program
+			env.execute("WebLogAnalysis Example");
 		} else {
 			result.print();
 		}
-
-		// execute program
-		env.execute("WebLogAnalysis Example");
-		
 	}
 	
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
index f3364fd..b001d12 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java
@@ -97,12 +97,12 @@ public class PojoExample {
 		
 		if(fileOutput) {
 			counts.writeAsText(outputPath, WriteMode.OVERWRITE);
+			// execute program
+			env.execute("WordCount-Pojo Example");
 		} else {
 			counts.print();
 		}
-		
-		// execute program
-		env.execute("WordCount-Pojo Example");
+
 	}
 	
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
index 7db7946..82c3ad8 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
@@ -74,12 +74,13 @@ public class WordCount {
 		// emit result
 		if(fileOutput) {
 			counts.writeAsCsv(outputPath, "\n", " ");
+			// execute program
+			env.execute("WordCount Example");
 		} else {
 			counts.print();
 		}
 		
-		// execute program
-		env.execute("WordCount Example");
+
 	}
 	
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
index 26d01c3..08a3e62 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
@@ -96,12 +96,12 @@ object KMeans {
 
     if (fileOutput) {
       clusteredPoints.writeAsCsv(outputPath, "\n", " ")
+      env.execute("Scala KMeans Example")
     }
     else {
       clusteredPoints.print()
     }
 
-    env.execute("Scala KMeans Example")
   }
 
   private def parseParameters(programArguments: Array[String]): Boolean = {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
index e75c862..9e23ed7 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
@@ -98,11 +98,11 @@ object ConnectedComponents {
     }
     if (fileOutput) {
       verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
+      env.execute("Scala Connected Components Example")
     } else {
       verticesWithComponents.print()
     }
 
-    env.execute("Scala Connected Components Example")
   }
  
   private def parseParameters(args: Array[String]): Boolean = {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
index b4955ed..ae8a982 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
@@ -100,6 +100,5 @@ object DeltaPageRank {
 
     iteration.print()
 
-    env.execute("Page Rank")
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
index a62786c..a9000b3 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
@@ -91,12 +91,13 @@ object EnumTrianglesBasic {
     // emit result
     if (fileOutput) {
       triangles.writeAsCsv(outputPath, "\n", ",")
+      // execute program
+      env.execute("TriangleEnumeration Example")
     } else {
       triangles.print()
     }
     
-    // execute program
-    env.execute("TriangleEnumeration Example")
+
   }
 
   // *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
index 244e968..cc7c33f 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
@@ -117,12 +117,12 @@ object EnumTrianglesOpt {
     // emit result
     if (fileOutput) {
       triangles.writeAsCsv(outputPath, "\n", ",")
+      // execute program
+      env.execute("TriangleEnumeration Example")
     } else {
       triangles.print()
     }
 
-    // execute program
-    env.execute("TriangleEnumeration Example")
   }
 
   // *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
index a3ea4b3..5b5f6c2 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -128,12 +128,11 @@ object PageRankBasic {
     // emit result
     if (fileOutput) {
       result.writeAsCsv(outputPath, "\n", " ")
+      // execute program
+      env.execute("Basic PageRank Example")
     } else {
       result.print()
     }
-
-    // execute program
-    env.execute("Basic PageRank Example")
   }
 
   // *************************************************************************

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
index 727cb47..3de0f2e 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -57,11 +57,11 @@ object  TransitiveClosureNaive {
 
     if (fileOutput) {
       paths.writeAsCsv(outputPath, "\n", " ")
+      env.execute("Scala Transitive Closure Example")
     } else {
       paths.print()
     }
 
-    env.execute("Scala Transitive Closure Example")
 
 
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
index 582dd4f..3453ee8 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
@@ -47,8 +47,6 @@ object PiEstimation {
     println("We estimate Pi to be:")
 
     pi.print()
-
-    env.execute("PiEstimation example")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
index fb99a00..2a7b786 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
@@ -82,11 +82,11 @@ object LinearRegression {
 
     if (fileOutput) {
       result.writeAsText(outputPath)
+      env.execute("Scala Linear Regression example")
     }
     else {
       result.print()
     }
-    env.execute("Scala Linear Regression example")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
index 950e3c8..5392594 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
@@ -123,11 +123,11 @@ object WebLogAnalysis {
     // emit result
     if (fileOutput) {
       result.writeAsCsv(outputPath, "\n", "|")
+      env.execute("Scala WebLogAnalysis Example")
     } else {
       result.print()
     }
 
-    env.execute("Scala WebLogAnalysis Example")
   }
 
   private var fileOutput: Boolean = false

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
index b5c2ee2..7d5db7e 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
@@ -57,11 +57,11 @@ object WordCount {
 
     if (fileOutput) {
       counts.writeAsCsv(outputPath, "\n", " ")
+      env.execute("Scala WordCount Example")
     } else {
       counts.print()
     }
 
-    env.execute("Scala WordCount Example")
   }
 
   private def parseParameters(args: Array[String]): Boolean = {

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
index b48debc..51e91d7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java
@@ -30,7 +30,8 @@ public class CollectionEnvironment extends ExecutionEnvironment {
 
 		// We need to reverse here. Object-Reuse enabled, means safe mode is disabled.
 		CollectionExecutor exec = new CollectionExecutor(getConfig());
-		return exec.execute(p);
+		this.lastJobExecutionResult = exec.execute(p);
+		return this.lastJobExecutionResult;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 4f2942e..133a083 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -47,6 +47,7 @@ import org.apache.flink.api.java.functions.SelectByMaxFunction;
 import org.apache.flink.api.java.functions.SelectByMinFunction;
 import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
 import org.apache.flink.api.java.operators.AggregateOperator;
@@ -82,7 +83,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -1337,14 +1337,10 @@ public abstract class DataSet<T> {
 	 * For each element of the DataSet the result of {@link Object#toString()} is written.
 	 * This triggers execute() automatically.
 	 */
-	public void print() {
-		try {
-			List<T> elements = this.collect();
-			for (T e: elements) {
-				System.out.println(e);
-			}
-		} catch (Exception e) {
-			System.out.println("Could not retrieve values for printing: " + ExceptionUtils.stringifyException(e));
+	public void print() throws Exception{
+		List<T> elements = this.collect();
+		for (T e: elements) {
+			System.out.println(e);
 		}
 	}
 
@@ -1363,14 +1359,10 @@ public abstract class DataSet<T> {
 	 * Writes a DataSet to the standard error stream (stderr).<br/>
 	 * For each element of the DataSet the result of {@link Object#toString()} is written.
 	 */
-	public void printToErr() {
-		try {
-			List<T> elements = this.collect();
-			for (T e: elements) {
-				System.err.println(e);
-			}
-		} catch (Exception e) {
-			System.err.println("Could not retrieve values for printing: " + e);
+	public void printToErr() throws Exception{
+		List<T> elements = this.collect();
+		for (T e: elements) {
+			System.err.println(e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 75d4387..0f65b79 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -94,6 +94,9 @@ import com.google.common.base.Preconditions;
  */
 public abstract class ExecutionEnvironment {
 
+
+	protected JobExecutionResult lastJobExecutionResult;
+
 	private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
 	
 	/** The environment of the context (local by default, cluster if invoked through command line) */
@@ -234,7 +237,15 @@ public abstract class ExecutionEnvironment {
 	public UUID getId() {
 		return this.executionId;
 	}
-	
+
+	/**
+	 * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
+	 */
+	public JobExecutionResult getLastJobExecutionResult(){
+		return this.lastJobExecutionResult;
+	}
+
+
 	/**
 	 * Gets the UUID by which this environment is identified, as a string.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 25042b6..27b6254 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -51,7 +51,8 @@ public class LocalEnvironment extends ExecutionEnvironment {
 		
 		PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration);
 		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
-		return executor.executePlan(p);
+		this.lastJobExecutionResult = executor.executePlan(p);
+		return this.lastJobExecutionResult;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index c9a4fe0..515037c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -67,7 +67,9 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		
 		PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles);
 		executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
-		return executor.executePlan(p);
+
+		this.lastJobExecutionResult = executor.executePlan(p);
+		return this.lastJobExecutionResult;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
index c0ca6c2..4fc51bb 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java
@@ -36,7 +36,8 @@ public class MultipleInvokationsTest {
 			// ----------- Execution 1 ---------------
 			
 			DataSet<String> data = env.fromElements("Some", "test", "data").name("source1");
-			data.print();
+			//data.print();
+			data.output(new DiscardingOutputFormat<String>()).name("print1");
 			data.output(new DiscardingOutputFormat<String>()).name("output1");
 			
 			{

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
index 341d87e..e890b4e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
@@ -28,11 +28,9 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.translation.PlanProjectOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.*;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.junit.Test;
 
@@ -73,7 +71,7 @@ public class SemanticPropertiesProjectionTest {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-		tupleDs.project(1, 3, 2).project(0, 3).print();
+		tupleDs.project(1, 3, 2).project(0, 3).output(new DiscardingOutputFormat<Tuple>());
 
 		Plan plan = env.createProgramPlan();
 
@@ -99,7 +97,7 @@ public class SemanticPropertiesProjectionTest {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, Long>, String>> tupleDs = env.fromCollection(emptyNestedTupleData, nestedTupleTypeInfo);
 
-		tupleDs.project(2, 3, 1).project(2).print();
+		tupleDs.project(2, 3, 1).project(2).output(new DiscardingOutputFormat<Tuple>());
 
 		Plan plan = env.createProgramPlan();
 
@@ -135,7 +133,7 @@ public class SemanticPropertiesProjectionTest {
 		tupleDs.join(tupleDs).where(0).equalTo(0)
 				.projectFirst(2, 3)
 				.projectSecond(1, 4)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple>());
 
 		Plan plan = env.createProgramPlan();
 
@@ -163,7 +161,7 @@ public class SemanticPropertiesProjectionTest {
 		tupleDs.join(tupleDs).where(0).equalTo(0)
 				.projectFirst(2,0)
 				.projectSecond(1,3)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple>());
 
 		Plan plan = env.createProgramPlan();
 
@@ -212,7 +210,7 @@ public class SemanticPropertiesProjectionTest {
 		tupleDs.cross(tupleDs)
 				.projectFirst(2, 3)
 				.projectSecond(1, 4)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple>());
 
 		Plan plan = env.createProgramPlan();
 
@@ -240,7 +238,7 @@ public class SemanticPropertiesProjectionTest {
 		tupleDs.cross(tupleDs)
 				.projectFirst(2, 0)
 				.projectSecond(1,3)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple>());
 
 		Plan plan = env.createProgramPlan();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
index f0124e3..33b3958 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.junit.Test;
@@ -55,7 +56,7 @@ public class SemanticPropertiesTranslationTest {
 
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
-		input.map(new WildcardForwardedMapper<Tuple3<Long,String,Integer>>()).print();
+		input.map(new WildcardForwardedMapper<Tuple3<Long,String,Integer>>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -80,7 +81,7 @@ public class SemanticPropertiesTranslationTest {
 
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42));
-		input.map(new IndividualForwardedMapper<Long, String, Integer>()).print();
+		input.map(new IndividualForwardedMapper<Long, String, Integer>()).output(new DiscardingOutputFormat<Tuple3<Long, String, Integer>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -102,7 +103,7 @@ public class SemanticPropertiesTranslationTest {
 
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
-		input.map(new ShufflingMapper<Long>()).print();
+		input.map(new ShufflingMapper<Long>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -128,7 +129,7 @@ public class SemanticPropertiesTranslationTest {
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
 		input.map(new NoAnnotationMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -151,7 +152,7 @@ public class SemanticPropertiesTranslationTest {
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
 		input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -174,7 +175,7 @@ public class SemanticPropertiesTranslationTest {
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
 		input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2")
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -196,7 +197,7 @@ public class SemanticPropertiesTranslationTest {
 
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
-		input.map(new AllForwardedExceptMapper<Tuple3<Long, Long, Long>>()).print();
+		input.map(new AllForwardedExceptMapper<Tuple3<Long, Long, Long>>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -218,7 +219,7 @@ public class SemanticPropertiesTranslationTest {
 
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
-		input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).print();
+		input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -260,7 +261,7 @@ public class SemanticPropertiesTranslationTest {
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple2<Long, Double>> input2 = env.fromElements(new Tuple2<Long, Double>(3l, 3.1415));
 		input1.join(input2).where(0).equalTo(0).with(new ForwardedBothAnnotationJoin<Long, String, Long, Double>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<String, Double>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -287,7 +288,7 @@ public class SemanticPropertiesTranslationTest {
 		DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l));
 		input1.join(input2).where(0).equalTo(0).with(new NoAnnotationJoin<Long>())
 				.withForwardedFieldsFirst("0->1; 1->2").withForwardedFieldsSecond("1->0")
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -314,7 +315,7 @@ public class SemanticPropertiesTranslationTest {
 		DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l));
 		input1.join(input2).where(0).equalTo(0).with(new ReadSetJoin<Long>())
 				.withForwardedFieldsFirst("0->1; 1->2").withForwardedFieldsSecond("1->0")
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -347,7 +348,7 @@ public class SemanticPropertiesTranslationTest {
 		DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l));
 		input1.join(input2).where(0).equalTo(0).with(new ForwardedFirstAnnotationJoin<Long>())
 				.withForwardedFieldsSecond("1")
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -377,7 +378,7 @@ public class SemanticPropertiesTranslationTest {
 		DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l));
 		input1.join(input2).where(0).equalTo(0).with(new ForwardedSecondAnnotationJoin<Long>())
 				.withForwardedFieldsFirst("0->1")
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -405,7 +406,7 @@ public class SemanticPropertiesTranslationTest {
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l));
 		input1.join(input2).where(0).equalTo(0).with(new AllForwardedExceptJoin<Long>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
@@ -435,7 +436,7 @@ public class SemanticPropertiesTranslationTest {
 		@SuppressWarnings("unchecked")
 		DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l));
 		input1.join(input2).where(0).equalTo(0).with(new ReadSetJoin<Long>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		Plan plan = env.createProgramPlan();
 
 		GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
index 63b4052..0ce79e3 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.types.StringValue;
 import org.junit.Test;
@@ -46,7 +47,7 @@ public class AggregateTranslationTest {
 			DataSet<Tuple3<Double, StringValue, Long>> initialData = 
 					env.fromElements(new Tuple3<Double, StringValue, Long>(3.141592, new StringValue("foobar"), Long.valueOf(77)));
 			
-			initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).print();
+			initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
index 2fe9965..887173d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
@@ -54,7 +55,7 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
 							Collector<Long> out) {}
 				})
 				
-				.print();
+				.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			
@@ -98,7 +99,7 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
 					public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPoJo> second, Collector<Long> out) {}
 				})
 				
-				.print();
+				.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index ae89780..f9ce82f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
@@ -91,7 +92,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 						joined,
 						joined.map(new NextWorksetMapper()).name(BEFORE_NEXT_WORKSET_MAP));
 				
-				result.print();
+				result.output(new DiscardingOutputFormat<Tuple3<Double, Long, String>>());
 				result.writeAsText("/dev/null");
 			}
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
index b7fbb78..9824ee1 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -60,7 +61,7 @@ public class DistinctTranslationTest {
 				public String getKey(String value) { return value; }
 			});
 			
-			op.print();
+			op.output(new DiscardingOutputFormat<String>());
 			
 			Plan p = env.createProgramPlan();
 			
@@ -81,7 +82,7 @@ public class DistinctTranslationTest {
 
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
 
-			initialData.distinct().print();
+			initialData.distinct().output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
 
 			Plan p = env.createProgramPlan();
 
@@ -117,7 +118,7 @@ public class DistinctTranslationTest {
 
 			DataSet<CustomType> initialData = getSourcePojoDataSet(env);
 
-			initialData.distinct().print();
+			initialData.distinct().output(new DiscardingOutputFormat<CustomType>());
 
 			Plan p = env.createProgramPlan();
 
@@ -153,7 +154,7 @@ public class DistinctTranslationTest {
 
 			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
 
-			initialData.distinct(1, 2).print();
+			initialData.distinct(1, 2).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
 
 			Plan p = env.createProgramPlan();
 
@@ -193,7 +194,7 @@ public class DistinctTranslationTest {
 				public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) {
 					return value.f1;
 				}
-			}).setParallelism(4).print();
+			}).setParallelism(4).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
 
 			Plan p = env.createProgramPlan();
 
@@ -237,7 +238,7 @@ public class DistinctTranslationTest {
 
 			DataSet<CustomType> initialData = getSourcePojoDataSet(env);
 
-			initialData.distinct("myInt").print();
+			initialData.distinct("myInt").output(new DiscardingOutputFormat<CustomType>());
 
 			Plan p = env.createProgramPlan();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
index b578eb7..b555844 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -54,7 +55,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
 				public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
 					return value1;
 				}
-			}).print();
+			}).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
 			
 			Plan p = env.createProgramPlan();
 			
@@ -96,7 +97,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
 						return value1;
 					}
 				})
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
 			
 			Plan p = env.createProgramPlan();
 			
@@ -143,7 +144,7 @@ public class ReduceTranslationTests implements java.io.Serializable {
 						return value1;
 					}
 				}).setParallelism(4)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
 			
 			Plan p = env.createProgramPlan();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 973f402..20a4ef6 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
index 32bd6e9..ccc9b13 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
@@ -42,13 +41,13 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			
-			DataSet<Tuple2<Long, Long>> input = env.readCsvFile("/tmp/some.csv").types(Long.class, Long.class);
+			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 100).map(new Duplicator<Long>());
 			
 			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 100, 1);
 			
-			DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+			DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new TestMapper<Tuple2<Long,Long>>());
 			iteration.closeWith(iterEnd, iterEnd)
-					.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -64,5 +63,18 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-
+	
+	private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
+		@Override
+		public Tuple2<T, T> map(T value) {
+			return new Tuple2<T, T>(value, value);
+		}
+	}
+	
+	private static final class TestMapper<T> implements MapFunction<T, T> {
+		@Override
+		public T map(T value) {
+			return value;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
index b4e95fb..9a60943 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
@@ -57,7 +58,7 @@ public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase
 			input1.partitionCustom(partitioner, 1)
 				.join(input2.partitionCustom(partitioner, 0))
 				.where(1).equalTo(0)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -102,7 +103,7 @@ public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase
 				.coGroup(input2.partitionCustom(partitioner, 0))
 				.where(1).equalTo(0)
 				.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
index 346e702..14d2a96 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
@@ -57,7 +58,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
 				.where(1).equalTo(0)
 				.withPartitioner(partitioner)
 				.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple3<Long, Long, Long>>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -118,7 +119,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
 				.where("b").equalTo("a")
 				.withPartitioner(partitioner)
 				.with(new DummyCoGroupFunction<Pojo2, Pojo3>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Pojo2, Pojo3>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -180,7 +181,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
 				.where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector())
 				.withPartitioner(partitioner)
 				.with(new DummyCoGroupFunction<Pojo2, Pojo3>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Pojo2, Pojo3>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -251,7 +252,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
 			grouped
 				.coGroup(partitioned).where(0).equalTo(0)
 				.with(new DummyCoGroupFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
index 17a7659..bc2eb82 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
@@ -57,8 +58,8 @@ public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase {
 				.withPartitioner(partitioner);
 
 			joined.groupBy(1).withPartitioner(partitioner)
-				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
-				.print();
+					.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
index 00fd587..1aca046 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -53,7 +54,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
 			data
 				.partitionCustom(part, 0)
 				.mapPartition(new IdentityPartitionerMapper<Tuple2<Integer,Integer>>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -123,7 +124,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
 			data
 				.partitionCustom(part, "a")
 				.mapPartition(new IdentityPartitionerMapper<Pojo>())
-				.print();
+				.output(new DiscardingOutputFormat<Pojo>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -193,7 +194,7 @@ public class CustomPartitioningTest extends CompilerTestBase {
 			data
 				.partitionCustom(part, new TestKeySelectorInt<Pojo>())
 				.mapPartition(new IdentityPartitionerMapper<Pojo>())
-				.print();
+				.output(new DiscardingOutputFormat<Pojo>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
index 23f4812..b2bfc67 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -53,7 +54,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
 			data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
 				.withPartitioner(new TestPartitionerInt())
 				.reduce(new DummyReducer<Tuple2<Integer,Integer>>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -84,8 +85,8 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
 			
 			data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
 				.withPartitioner(new TestPartitionerInt())
-				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
-				.print();
+					.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
+				.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -116,7 +117,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
 				.withPartitioner(new TestPartitionerInt())
 				.sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING)
 				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
index 54033ac..dc2b147 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
@@ -37,26 +38,26 @@ import org.junit.Test;
 
 @SuppressWarnings("serial")
 public class GroupingPojoTranslationTest extends CompilerTestBase {
-	
+
 	@Test
 	public void testCustomPartitioningTupleReduce() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Pojo2> data = env.fromElements(new Pojo2())
 					.rebalance().setParallelism(4);
-			
+
 			data.groupBy("a").withPartitioner(new TestPartitionerInt())
-				.reduce(new DummyReducer<Pojo2>())
-				.print();
-			
+					.reduce(new DummyReducer<Pojo2>())
+					.output(new DiscardingOutputFormat<Pojo2>());
+
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
-			
+
 			SinkPlanNode sink = op.getDataSinks().iterator().next();
 			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
 			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
-			
+
 			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
 			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
 			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
@@ -66,26 +67,26 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCustomPartitioningTupleGroupReduce() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Pojo2> data = env.fromElements(new Pojo2())
 					.rebalance().setParallelism(4);
-			
+
 			data.groupBy("a").withPartitioner(new TestPartitionerInt())
-				.reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
-				.print();
-			
+					.reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
+					.output(new DiscardingOutputFormat<Pojo2>());
+
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
-			
+
 			SinkPlanNode sink = op.getDataSinks().iterator().next();
 			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
 			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
-			
+
 			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
 			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
 			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
@@ -95,27 +96,27 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCustomPartitioningTupleGroupReduceSorted() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Pojo3> data = env.fromElements(new Pojo3())
 					.rebalance().setParallelism(4);
-			
+
 			data.groupBy("a").withPartitioner(new TestPartitionerInt())
-				.sortGroup("b", Order.ASCENDING)
-				.reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
-				.print();
-			
+					.sortGroup("b", Order.ASCENDING)
+					.reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
+					.output(new DiscardingOutputFormat<Pojo3>());
+
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
-			
+
 			SinkPlanNode sink = op.getDataSinks().iterator().next();
 			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
 			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
-			
+
 			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
 			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
 			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
@@ -125,28 +126,28 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCustomPartitioningTupleGroupReduceSorted2() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Pojo4> data = env.fromElements(new Pojo4())
 					.rebalance().setParallelism(4);
-			
+
 			data.groupBy("a").withPartitioner(new TestPartitionerInt())
-				.sortGroup("b", Order.ASCENDING)
-				.sortGroup("c", Order.DESCENDING)
-				.reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
-				.print();
-			
+					.sortGroup("b", Order.ASCENDING)
+					.sortGroup("c", Order.DESCENDING)
+					.reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
+					.output(new DiscardingOutputFormat<Pojo4>());
+
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
-			
+
 			SinkPlanNode sink = op.getDataSinks().iterator().next();
 			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
 			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
-			
+
 			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
 			assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
 			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
@@ -156,15 +157,15 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCustomPartitioningTupleInvalidType() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Pojo2> data = env.fromElements(new Pojo2())
 					.rebalance().setParallelism(4);
-			
+
 			try {
 				data.groupBy("a").withPartitioner(new TestPartitionerLong());
 				fail("Should throw an exception");
@@ -176,19 +177,19 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCustomPartitioningTupleInvalidTypeSorted() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Pojo3> data = env.fromElements(new Pojo3())
 					.rebalance().setParallelism(4);
-			
+
 			try {
 				data.groupBy("a")
-					.sortGroup("b", Order.ASCENDING)
-					.withPartitioner(new TestPartitionerLong());
+						.sortGroup("b", Order.ASCENDING)
+						.withPartitioner(new TestPartitionerLong());
 				fail("Should throw an exception");
 			}
 			catch (InvalidProgramException e) {}
@@ -198,18 +199,18 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCustomPartitioningTupleRejectCompositeKey() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Pojo2> data = env.fromElements(new Pojo2())
 					.rebalance().setParallelism(4);
-			
+
 			try {
 				data.groupBy("a", "b")
-					.withPartitioner(new TestPartitionerInt());
+						.withPartitioner(new TestPartitionerInt());
 				fail("Should throw an exception");
 			}
 			catch (InvalidProgramException e) {}
@@ -219,39 +220,39 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public static class Pojo2 {
 		public int a;
 		public int b;
-		
+
 	}
-	
+
 	public static class Pojo3 {
 		public int a;
 		public int b;
 		public int c;
 	}
-	
+
 	public static class Pojo4 {
 		public int a;
 		public int b;
 		public int c;
 		public int d;
 	}
-	
+
 	private static class TestPartitionerInt implements Partitioner<Integer> {
 		@Override
 		public int partition(Integer key, int numPartitions) {
 			return 0;
 		}
 	}
-	
+
 	private static class TestPartitionerLong implements Partitioner<Long> {
 		@Override
 		public int partition(Long key, int numPartitions) {
 			return 0;
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/78d954b8/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
index 49f44f5..6eb5ad5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
@@ -51,7 +52,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
 			
 			data.groupBy(0).withPartitioner(new TestPartitionerInt())
 				.sum(1)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -80,7 +81,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
 			
 			data.groupBy(0).withPartitioner(new TestPartitionerInt())
 				.reduce(new DummyReducer<Tuple2<Integer,Integer>>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -109,7 +110,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
 			
 			data.groupBy(0).withPartitioner(new TestPartitionerInt())
 				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -139,7 +140,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
 			data.groupBy(0).withPartitioner(new TestPartitionerInt())
 				.sortGroup(1, Order.ASCENDING)
 				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -170,7 +171,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
 				.sortGroup(1, Order.ASCENDING)
 				.sortGroup(2, Order.DESCENDING)
 				.reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
-				.print();
+				.output(new DiscardingOutputFormat<Tuple4<Integer, Integer, Integer, Integer>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);


[5/5] flink git commit: [FLINK-2071] [java api] Fix serializability issue with projectsion function.

Posted by se...@apache.org.
[FLINK-2071] [java api] Fix serializability issue with projectsion function.

Improve type safety.
Minor cleanups in ProjectOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad1d9362
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad1d9362
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad1d9362

Branch: refs/heads/master
Commit: ad1d9362c88343972b19fdcce0b041d8380805a9
Parents: 6220f34
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 21 14:47:38 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 14:50:32 2015 +0200

----------------------------------------------------------------------
 .../api/java/operators/ProjectOperator.java     | 27 +++++++------
 .../translation/PlanProjectOperator.java        | 41 +++++++++++++-------
 2 files changed, 41 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad1d9362/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 9b7d567..55e182f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -82,7 +82,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 	 * Continues a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
 	 * <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
 	 * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
-	 * Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}.
+	 * Additional fields can be added to the projection by calling this method repeatedly.
 	 *
 	 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
 	 *
@@ -94,8 +94,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 	 * @see DataSet
 	 * @see ProjectOperator
 	 */
-	@SuppressWarnings("hiding")
-	public <OUT extends Tuple> ProjectOperator<?, OUT> project(int... fieldIndexes) {
+	public <R extends Tuple> ProjectOperator<?, R> project(int... fieldIndexes) {
 		proj.acceptAdditionalIndexes(fieldIndexes);
 		
 		return proj.projectTupleX();
@@ -103,10 +102,10 @@ public class ProjectOperator<IN, OUT extends Tuple>
 	/**
 	 * Deprecated method only kept for compatibility.
 	 */
-	@SuppressWarnings({ "unchecked", "hiding" })
+	@SuppressWarnings("unchecked")
 	@Deprecated
-	public <OUT extends Tuple> ProjectOperator<IN, OUT> types(Class<?>... types) {
-		TupleTypeInfo<OUT> typeInfo = (TupleTypeInfo<OUT>)this.getResultType();
+	public <R extends Tuple> ProjectOperator<IN, R> types(Class<?>... types) {
+		TupleTypeInfo<R> typeInfo = (TupleTypeInfo<R>)this.getResultType();
 
 		if(types.length != typeInfo.getArity()) {
 			throw new InvalidProgramException("Provided types do not match projection.");
@@ -117,12 +116,12 @@ public class ProjectOperator<IN, OUT extends Tuple>
 				throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection");
 			}
 		}
-		return (ProjectOperator<IN, OUT>) this;
+		return (ProjectOperator<IN, R>) this;
 	}
 	
 	public static class Projection<T> {
 		
-		private final DataSet<T> ds;		
+		private final DataSet<T> ds;
 		private int[] fieldIndexes;
 		
 		public Projection(DataSet<T> ds, int[] fieldIndexes) {
@@ -138,9 +137,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 						"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
 			}
 			
-			int maxFieldIndex = ((TupleTypeInfo<?>)ds.getType()).getArity();
-			for(int i=0; i<fieldIndexes.length; i++) {
-				Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex);
+			int maxFieldIndex = ds.getType().getArity();
+			for (int fieldIndexe : fieldIndexes) {
+				Preconditions.checkElementIndex(fieldIndexe, maxFieldIndex);
 			}
 			
 			this.ds = ds;
@@ -160,8 +159,8 @@ public class ProjectOperator<IN, OUT extends Tuple>
 			
 			this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + additionalIndexes.length);
 			
-			int maxFieldIndex = ((TupleTypeInfo<?>)ds.getType()).getArity();
-			for(int i=0; i<additionalIndexes.length; i++) {
+			int maxFieldIndex = ds.getType().getArity();
+			for (int i = 0; i < additionalIndexes.length; i++) {
 				Preconditions.checkElementIndex(additionalIndexes[i], maxFieldIndex);
 
 				this.fieldIndexes[offset + i] = additionalIndexes[i];
@@ -186,7 +185,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		 */
 		@SuppressWarnings("unchecked")
 		public <OUT extends Tuple> ProjectOperator<T, OUT> projectTupleX() {
-			ProjectOperator<T, OUT> projOperator = null;
+			ProjectOperator<T, OUT> projOperator;
 
 			switch (fieldIndexes.length) {
 			case 1: projOperator = (ProjectOperator<T, OUT>) projectTuple1(); break;

http://git-wip-us.apache.org/repos/asf/flink/blob/ad1d9362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index 959b929..101b89b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -28,32 +28,47 @@ import org.apache.flink.api.java.tuple.Tuple;
 
 public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {
 
-	public PlanProjectOperator(int[] fields, String name, TypeInformation<T> inType, TypeInformation<R> outType, ExecutionConfig executionConfig) {
-		super(new MapProjector<T, R>(fields, outType.createSerializer(executionConfig).createInstance()), new UnaryOperatorInformation<T, R>(inType, outType), name);
+	public PlanProjectOperator(int[] fields, String name,
+								TypeInformation<T> inType, TypeInformation<R> outType,
+								ExecutionConfig executionConfig)
+	{
+		super(PlanProjectOperator.<T, R, Tuple>createTypedProjector(fields), new UnaryOperatorInformation<T, R>(inType, outType), name);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private static <T, R extends Tuple, X extends Tuple> MapFunction<T, R> createTypedProjector(int[] fields) {
+		return (MapFunction<T, R>) new MapProjector<X, R>(fields);
 	}
 	
-	public static final class MapProjector<T, R extends Tuple>
-		extends AbstractRichFunction
-		implements MapFunction<T, R>
+	
+	public static final class MapProjector<T extends Tuple, R extends Tuple> 
+			extends AbstractRichFunction implements MapFunction<T, R>
 	{
 		private static final long serialVersionUID = 1L;
 		
 		private final int[] fields;
-		private final R outTuple;
+		private final Tuple outTuple;
 		
-		private MapProjector(int[] fields, R outTupleInstance) {
+		private MapProjector(int[] fields) {
 			this.fields = fields;
-			this.outTuple = outTupleInstance;
+			try {
+				this.outTuple = Tuple.getTupleClass(fields.length).newInstance();
+			}
+			catch (Exception e) {
+				// this should never happen
+				throw new RuntimeException(e);
+			}
 		}
 
 		// TODO We should use code generation for this.
+		@SuppressWarnings("unchecked")
 		@Override
-		public R map(T inTuple) throws Exception {
-			
-			for(int i=0; i<fields.length; i++) {
-				outTuple.setField(((Tuple)inTuple).getField(fields[i]), i);
+		public R map(Tuple inTuple) throws Exception {
+			for (int i = 0; i < fields.length; i++) {
+				outTuple.setField(inTuple.getField(fields[i]), i);
 			}
-			return outTuple;
+			
+			return (R) outTuple;
 		}
 	}
 }


[4/5] flink git commit: [FLINK-1418] [apis] Minor cleanups for eager on-client print() statement.

Posted by se...@apache.org.
[FLINK-1418] [apis] Minor cleanups for eager on-client print() statement.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6220f34b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6220f34b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6220f34b

Branch: refs/heads/master
Commit: 6220f34b626e9ed92b190042e0d69020e19461f1
Parents: 78d954b
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 21 13:49:16 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 21 14:50:32 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java | 51 ++++++++++++--------
 .../flink/api/java/ExecutionEnvironment.java    | 10 ++--
 .../org/apache/flink/api/scala/DataSet.scala    | 41 ++++++++++------
 .../flink/api/scala/ExecutionEnvironment.scala  |  6 +--
 4 files changed, 66 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 133a083..157c666 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -338,7 +338,7 @@ public abstract class DataSet<T> {
 	 *
 	 * @see org.apache.flink.api.java.operators.AggregateOperator
 	 */
-	public AggregateOperator<T> sum (int field) {
+	public AggregateOperator<T> sum(int field) {
 		return aggregate(Aggregations.SUM, field);
 	}
 
@@ -997,7 +997,7 @@ public abstract class DataSet<T> {
 	 * <pre>
 	 * {@code
 	 * DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
-	 *                                                  initialState.iterateDelta(initialFeedbakSet, 100, 0);
+	 *                                                  initialState.iterateDelta(initialFeedbackSet, 100, 0);
 	 * 
 	 * DataSet<Tuple2<Long, Long>> delta = iteration.groupBy(0).aggregate(Aggregations.AVG, 1)
 	 *                                              .join(iteration.getSolutionSet()).where(0).equalTo(0)
@@ -1236,7 +1236,7 @@ public abstract class DataSet<T> {
 	 * @see TextOutputFormat
 	 */
 	public DataSink<String> writeAsFormattedText(String filePath, TextFormatter<T> formatter) {
-		return this.map(new FormattingMapper<T>(formatter)).writeAsText(filePath);
+		return map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath);
 	}
 
 	/**
@@ -1250,8 +1250,8 @@ public abstract class DataSet<T> {
 	 *
 	 * @see TextOutputFormat
 	 */
-	public DataSink<String> writeAsFormattedText(String filePath, WriteMode writeMode, final TextFormatter<T> formatter) {
-		return this.map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath, writeMode);
+	public DataSink<String> writeAsFormattedText(String filePath, WriteMode writeMode, TextFormatter<T> formatter) {
+		return map(new FormattingMapper<T>(clean(formatter))).writeAsText(filePath, writeMode);
 	}
 	
 	/**
@@ -1333,11 +1333,16 @@ public abstract class DataSet<T> {
 	}
 	
 	/**
-	 * Writes a DataSet to the standard output stream (stdout).<br/>
-	 * For each element of the DataSet the result of {@link Object#toString()} is written.
-	 * This triggers execute() automatically.
+	 * Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls
+	 * the print() method. For programs that are executed in a cluster, this method needs
+	 * to gather the contents of the DataSet back to the client, to print it there.
+	 * 
+	 * <p>The string written for each element is defined by the {@link Object#toString()} method.</p>
+	 * 
+	 * <p>This method immediately triggers the program execution, similar to the
+	 * {@link #collect()} and {@link #count()} methods.</p>
 	 */
-	public void print() throws Exception{
+	public void print() throws Exception {
 		List<T> elements = this.collect();
 		for (T e: elements) {
 			System.out.println(e);
@@ -1345,6 +1350,23 @@ public abstract class DataSet<T> {
 	}
 
 	/**
+	 * Prints the elements in a DataSet to the standard error stream {@link System#err} of the JVM that calls
+	 * the print() method. For programs that are executed in a cluster, this method needs
+	 * to gather the contents of the DataSet back to the client, to print it there.
+	 *
+	 * <p>The string written for each element is defined by the {@link Object#toString()} method.</p>
+	 *
+	 * <p>This method immediately triggers the program execution, similar to the
+	 * {@link #collect()} and {@link #count()} methods.</p>
+	 */
+	public void printToErr() throws Exception {
+		List<T> elements = this.collect();
+		for (T e: elements) {
+			System.err.println(e);
+		}
+	}
+	
+	/**
 	 * Writes a DataSet to the standard output stream (stdout).<br/>
 	 * For each element of the DataSet the result of {@link Object#toString()} is written.
 	 *
@@ -1354,17 +1376,6 @@ public abstract class DataSet<T> {
 	public DataSink<T> print(String sinkIdentifier) {
 		return output(new PrintingOutputFormat<T>(sinkIdentifier, false));
 	}
-	
-	/**
-	 * Writes a DataSet to the standard error stream (stderr).<br/>
-	 * For each element of the DataSet the result of {@link Object#toString()} is written.
-	 */
-	public void printToErr() throws Exception{
-		List<T> elements = this.collect();
-		for (T e: elements) {
-			System.err.println(e);
-		}
-	}
 
 	/**
 	 * Writes a DataSet to the standard error stream (stderr).<br/>

http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 0f65b79..9c76409 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -94,9 +94,6 @@ import com.google.common.base.Preconditions;
  */
 public abstract class ExecutionEnvironment {
 
-
-	protected JobExecutionResult lastJobExecutionResult;
-
 	private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
 	
 	/** The environment of the context (local by default, cluster if invoked through command line) */
@@ -116,8 +113,11 @@ public abstract class ExecutionEnvironment {
 	
 	private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
 
-	private ExecutionConfig config = new ExecutionConfig();
+	private final ExecutionConfig config = new ExecutionConfig();
 
+	/** Result from the latest execution, to be make it retrievable when using eager execution methods */
+	protected JobExecutionResult lastJobExecutionResult;
+	
 	/** Flag to indicate whether sinks have been cleared in previous executions */
 	private boolean wasExecuted = false;
 
@@ -240,6 +240,8 @@ public abstract class ExecutionEnvironment {
 
 	/**
 	 * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
+	 * 
+	 * @return The execution result from the latest job execution.
 	 */
 	public JobExecutionResult getLastJobExecutionResult(){
 		return this.lastJobExecutionResult;

http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 5198157..e283e95 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1319,16 +1319,37 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   def output(outputFormat: OutputFormat[T]): DataSink[T] = {
     javaSet.output(outputFormat)
   }
-
+  
   /**
-   * Writes a DataSet to the standard output stream (stdout). This uses [[AnyRef.toString]] on
-   * each element.
-   * This triggers execute() automatically.
+   * Prints the elements in a DataSet to the standard output stream [[System.out]] of the
+   * JVM that calls the print() method. For programs that are executed in a cluster, this
+   * method needs to gather the contents of the DataSet back to the client, to print it
+   * there.
+   *
+   * The string written for each element is defined by the [[AnyRef.toString]] method.
+   *
+   * This method immediately triggers the program execution, similar to the
+   * [[collect()]] and [[count()]] methods.
    */
-  def print() = {
+  def print(): Unit = {
     javaSet.print()
   }
-
+  
+  /**
+   * Prints the elements in a DataSet to the standard error stream [[System.err]] of the
+   * JVM that calls the print() method. For programs that are executed in a cluster, this
+   * method needs to gather the contents of the DataSet back to the client, to print it
+   * there.
+   *
+   * The string written for each element is defined by the [[AnyRef.toString]] method.
+   *
+   * This method immediately triggers the program execution, similar to the
+   * [[collect()]] and [[count()]] methods.
+   */
+  def printToErr(): Unit = {
+    javaSet.printToErr()
+  }
+  
   /**
    * *
    * Writes a DataSet to the standard output stream (stdout) with a sink identifier prefixed.
@@ -1340,14 +1361,6 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
-   * Writes a DataSet to the standard error stream (stderr). This uses [[AnyRef.toString]] on
-   * each element.
-   */
-  def printToErr() = {
-    javaSet.printToErr()
-  }
-
-  /**
    * Writes a DataSet to the standard error stream (stderr) with a sink identifier prefixed.
    * This uses [[AnyRef.toString]] on each element.
    * @param sinkIdentifier The string to prefix the output with.

http://git-wip-us.apache.org/repos/asf/flink/blob/6220f34b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 6cc327a..e01ff3b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -130,11 +130,9 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   def getId: UUID = {
     javaEnv.getId
   }
-
-
+  
   /**
-   * retrieves JobExecutionResult from last job execution (for "eager" print)
-   * @return JobExecutionResult form last job execution
+   * Gets the JobExecutionResult of the last executed job.
    */
   def getLastJobExecutionResult = javaEnv.getLastJobExecutionResult