You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/05/05 23:03:42 UTC

[3/4] flink git commit: [FLINK-1682] Ported optimizer unit tests from Record API to Java API

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
index f42eb02..41e0eb9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -52,7 +53,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 					.where(0,1).equalTo(0,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -73,7 +74,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -96,7 +97,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(0,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -119,7 +120,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -142,7 +143,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -164,7 +165,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -187,7 +188,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -212,7 +213,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(0,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -238,7 +239,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -263,7 +264,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -288,7 +289,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,2).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -313,7 +314,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,2).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -338,7 +339,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,2).equalTo(1,2).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -363,7 +364,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,2).equalTo(1,2).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -384,7 +385,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.coGroup(set2)
 				.where(0,1).equalTo(0,1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -405,7 +406,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.coGroup(set2)
 				.where(0,1).equalTo(2,1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -428,7 +429,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.coGroup(set2)
 				.where(0,1).equalTo(0,1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -451,7 +452,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.coGroup(set2)
 				.where(0,1).equalTo(2,1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -473,7 +474,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 								.withForwardedFields("2;1"))
 				.where(0,1).equalTo(2, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -495,7 +496,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.coGroup(set2)
 				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -517,7 +518,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 								.withForwardedFields("2"))
 				.where(0,1).equalTo(2,1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -541,7 +542,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("0;1"))
 				.where(0, 1).equalTo(0, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -566,7 +567,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("1;2"))
 				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -590,7 +591,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("2;1"))
 				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -614,7 +615,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("1"))
 				.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -638,7 +639,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("1"))
 				.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -662,7 +663,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("2"))
 				.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -686,7 +687,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("1"))
 				.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
index 84f6377..68e8a41 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
 import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -50,7 +52,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 										.map(new IdentityMapper<Long>())
 											.withBroadcastSet(source, "bc");
 			
-			result.print();
+			result.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -84,7 +86,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 							.withBroadcastSet(bcInput1, "bc1")
 							.withBroadcastSet(bcInput2, "bc2");
 			
-			result.print();
+			result.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -123,7 +125,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 							.withBroadcastSet(bcInput1, "bc1");
 							
 			
-			iteration.closeWith(result).print();
+			iteration.closeWith(result).output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -154,7 +156,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 				initialSource
 					.map(new IdentityMapper<Long>())
 					.cross(initialSource).withParameters(conf)
-					.print();
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 				
 				
 				Plan p = env.createProgramPlan();
@@ -176,7 +178,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 				initialSource
 					.map(new IdentityMapper<Long>())
 					.cross(initialSource).withParameters(conf)
-					.print();
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 				
 				
 				Plan p = env.createProgramPlan();
@@ -199,7 +201,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 				initialSource
 					.map(new IdentityMapper<Long>())
 					.cross(initialSource).withParameters(conf)
-					.print();
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 				
 				
 				Plan p = env.createProgramPlan();
@@ -222,7 +224,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 				initialSource
 					.map(new IdentityMapper<Long>())
 					.cross(initialSource).withParameters(conf)
-					.print();
+					.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 				
 				
 				Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
index d6b9444..dc9f2e5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -64,7 +65,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy(0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long,String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -97,7 +98,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy(1, 0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long,String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -129,7 +130,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy("*");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long,SomePojo,String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -161,7 +162,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy("f1");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long,SomePojo,String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -193,7 +194,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy("f1.stringField");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -225,7 +226,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy("f1.intField; f2");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -258,7 +259,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy("byDate", 1, 0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -293,7 +294,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(0)
 				.splitsGroupedBy(0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -327,7 +328,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(0)
 				.splitsGroupedBy(1, 0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -362,7 +363,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(1)
 				.splitsGroupedBy(0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -396,7 +397,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(0, 1)
 				.splitsGroupedBy(0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -429,7 +430,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy("f2")
 				.splitsGroupedBy("f2");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -463,7 +464,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy("f1.intField")
 				.splitsGroupedBy("f0; f1.intField");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -497,7 +498,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy("f1.intField")
 				.splitsGroupedBy("f1");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -530,7 +531,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy("f1")
 				.splitsGroupedBy("f1.stringField");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -565,7 +566,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(1)
 				.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -599,7 +600,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(1)
 				.splitsOrderedBy(new int[]{1, 0}, new Order[]{Order.ASCENDING, Order.DESCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -634,7 +635,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(0)
 				.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -668,7 +669,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(0, 1)
 				.splitsOrderedBy(new int[]{1}, new Order[]{Order.DESCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -701,7 +702,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 			.splitsPartitionedBy("f1.intField")
 			.splitsOrderedBy("f0; f1.intField", new Order[]{Order.ASCENDING, Order.DESCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -735,7 +736,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy("f1.intField")
 				.splitsOrderedBy("f1", new Order[]{Order.DESCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -768,7 +769,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy("f1")
 				.splitsOrderedBy("f1.stringField", new Order[]{Order.ASCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -808,7 +809,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data2.getSplitDataProperties()
 				.splitsPartitionedBy("byDate", 0);
 
-		data1.union(data2).print();
+		data1.union(data2).output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -856,7 +857,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data2.getSplitDataProperties()
 				.splitsPartitionedBy("byDate", 0);
 
-		data1.union(data2).print();
+		data1.union(data2).output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
index 3af64fc..b0dca66 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
@@ -20,36 +20,35 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.fail;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
 import org.junit.Test;
 
 /**
  * This test case has been created to validate a bug that occurred when
  * the ReduceOperator was used without a grouping key.
  */
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class ReduceAllTest extends CompilerTestBase {
 
 	@Test
 	public void testReduce() {
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce()).name("Reduce1").input(source).build();
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setInput(reduce1);
-		Plan plan = new Plan(sink, "AllReduce Test");
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> set1 = env.generateSequence(0,1);
+
+		set1.reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
+				.output(new DiscardingOutputFormat<Long>()).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
+
 		try {
 			OptimizedPlan oPlan = compileNoStats(plan);
 			JobGraphGenerator jobGen = new JobGraphGenerator();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
index 25643a4..26af380 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -44,7 +44,7 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial", "unchecked"})
 public class ReplicatingDataSourceTest extends CompilerTestBase {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index 3a24ce1..00ada2a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -57,7 +58,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
 				.groupBy(1)
 				.reduce(new MockReducer()).withForwardedFields("*");
 
-		set.print();
+		set.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -118,7 +119,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
 				.reduce(new MockReducer()).withForwardedFields("f1->f2");
 		DataSet<Tuple3<Integer, Integer, Integer>> out = in1.join(in2).where(1).equalTo(2).with(new MockJoin());
 
-		out.print();
+		out.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
index 65e5025..a94f845 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
@@ -30,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -51,12 +52,12 @@ public class SortPartialReuseTest extends CompilerTestBase {
 				.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
 				
 				.groupBy(0, 1)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
 				
 				.groupBy(0)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
-				
-				.print();
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
+
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -96,12 +97,12 @@ public class SortPartialReuseTest extends CompilerTestBase {
 				.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
 				
 				.groupBy(0, 1)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
 				
 				.groupBy(1)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
-				
-				.print();
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
+
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
index 1001626..f041b2a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
 import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -50,8 +51,8 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
 			DataSet<Long> result = iteration.closeWith(
 					input2.union(input2).union(iteration.union(iteration)));
 				
-			result.print();
-			result.print();
+			result.output(new DiscardingOutputFormat<Long>());
+			result.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -102,8 +103,8 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
 			DataSet<Long> iterResult = iteration
 				.closeWith(iteration.union(iteration).union(input2.union(input2)));
 			
-			iterResult.print();
-			iterResult.print();
+			iterResult.output(new DiscardingOutputFormat<Long>());
+			iterResult.output(new DiscardingOutputFormat<Long>());
 			
 			
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
index 2e52565..fee6e17 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
@@ -26,13 +26,12 @@ import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.IntValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Visitor;
 import org.junit.Assert;
@@ -45,37 +44,26 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
 
 
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class UnionPropertyPropagationTest extends CompilerTestBase {
 
-	@SuppressWarnings("unchecked")
 	@Test
-	public void testUnionPropertyOldApiPropagation() {
+	public void testUnion1() {
 		// construct the plan
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> sourceA = env.generateSequence(0,1);
+		DataSet<Long> sourceB = env.generateSequence(0,1);
 
-		FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
-		FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
-		
-		ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-			.input(sourceA)
-			.build();
-		ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-			.input(sourceB)
-			.build();
-		
-		ReduceOperator globalRed = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).build();
-		globalRed.addInput(redA);
-		globalRed.addInput(redB);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, globalRed);
-		
-		// return the plan
-		Plan plan = new Plan(sink, "Union Property Propagation");
+		DataSet<Long> redA = sourceA.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>());
+		DataSet<Long> redB = sourceB.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>());
+
+		redA.union(redB).groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+			.output(new DiscardingOutputFormat<Long>());
+
+		JavaPlan plan = env.createProgramPlan();
 		
 		OptimizedPlan oPlan = compileNoStats(plan);
 		
@@ -88,7 +76,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
 			
 			@Override
 			public boolean preVisit(PlanNode visitable) {
-				if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperator) {
+				if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof GroupReduceOperatorBase) {
 					for (Channel inConn : visitable.getInputs()) {
 						Assert.assertTrue("Reduce should just forward the input if it is already partitioned",
 								inConn.getShipStrategy() == ShipStrategyType.FORWARD); 
@@ -107,7 +95,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
 	}
 	
 	@Test
-	public void testUnionNewApiAssembly() {
+	public void testUnion2() {
 		final int NUM_INPUTS = 4;
 		
 		// construct the plan it will be multiple flat maps, all unioned

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index e81e0ec..65dd2b3 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -40,8 +41,8 @@ public class UnionReplacementTest extends CompilerTestBase {
 	
 			DataSet<String> union = input1.union(input2);
 	
-			union.print();
-			union.print();
+			union.output(new DiscardingOutputFormat<String>());
+			union.output(new DiscardingOutputFormat<String>());
 	
 			Plan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
index 321ca5a..32bd6e9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -24,11 +24,13 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
@@ -40,12 +42,13 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			
-			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 100).map(new Duplicator<Long>());
+			DataSet<Tuple2<Long, Long>> input = env.readCsvFile("/tmp/some.csv").types(Long.class, Long.class);
 			
 			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 100, 1);
 			
-			DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new TestMapper<Tuple2<Long,Long>>());
-			iteration.closeWith(iterEnd, iterEnd).print();
+			DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+			iteration.closeWith(iterEnd, iterEnd)
+					.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -61,18 +64,5 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
-	private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
-		@Override
-		public Tuple2<T, T> map(T value) {
-			return new Tuple2<T, T>(value, value);
-		}
-	}
-	
-	private static final class TestMapper<T> implements MapFunction<T, T> {
-		@Override
-		public T map(T value) {
-			return value;
-		}
-	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
index 27f367f..46b9357 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
@@ -25,25 +25,21 @@ import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.JoinOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.LongValue;
 import org.junit.Test;
 
 
@@ -51,7 +47,6 @@ import org.junit.Test;
 * Tests that validate optimizer choices when using operators that are requesting certain specific execution
 * strategies.
 */
-@SuppressWarnings("deprecation")
 public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 	
 	private static final long serialVersionUID = 1L;
@@ -66,7 +61,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 
 	@Test
 	public void testRecordApiWithDeferredSoltionSetUpdateWithMapper() {
-		Plan plan = getRecordTestPlan(false, true);
+		Plan plan = getTestPlan(false, true);
 		
 		OptimizedPlan oPlan;
 		try {
@@ -112,7 +107,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 	
 	@Test
 	public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
-		Plan plan = getRecordTestPlan(false, false);
+		Plan plan = getTestPlan(false, false);
 		
 		OptimizedPlan oPlan;
 		try {
@@ -156,7 +151,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 	
 	@Test
 	public void testRecordApiWithDirectSoltionSetUpdate() {
-		Plan plan = getRecordTestPlan(true, false);
+		Plan plan = getTestPlan(true, false);
 		
 		OptimizedPlan oPlan;
 		try {
@@ -197,52 +192,45 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 		new JobGraphGenerator().compileJobGraph(oPlan);
 	}
 	
-	private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
-		FileDataSource solutionSetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Solution Set");
-		FileDataSource worksetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Workset");
-		
-		FileDataSource invariantInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Invariant Input");
-		
-		DeltaIteration iteration = new DeltaIteration(0, ITERATION_NAME);
-		iteration.setInitialSolutionSet(solutionSetInput);
-		iteration.setInitialWorkset(worksetInput);
-		iteration.setMaximumNumberOfIterations(100);
-
-		JoinOperator joinWithInvariant = JoinOperator.builder(new DummyMatchStub(), LongValue.class, 0, 0)
-				.input1(iteration.getWorkset())
-				.input2(invariantInput)
-				.name(JOIN_WITH_INVARIANT_NAME)
-				.build();
-
-		JoinOperator joinWithSolutionSet = JoinOperator.builder(
-				joinPreservesSolutionSet ? new DummyMatchStub() : new DummyNonPreservingMatchStub(), LongValue.class, 0, 0)
-				.input1(iteration.getSolutionSet())
-				.input2(joinWithInvariant)
-				.name(JOIN_WITH_SOLUTION_SET)
-				.build();
-		
-		ReduceOperator nextWorkset = ReduceOperator.builder(new IdentityReduce(), LongValue.class, 0)
-				.input(joinWithSolutionSet)
-				.name(NEXT_WORKSET_REDUCER_NAME)
-				.build();
-		
-		if (mapBeforeSolutionDelta) {
-			MapOperator mapper = MapOperator.builder(new IdentityMap())
-				.input(joinWithSolutionSet)
-				.name(SOLUTION_DELTA_MAPPER_NAME)
-				.build();
-			iteration.setSolutionSetDelta(mapper);
-		} else {
-			iteration.setSolutionSetDelta(joinWithSolutionSet);
+	private Plan getTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
+
+		// construct the plan
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Tuple2<Long, Long>> solSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Solution Set");
+		DataSet<Tuple2<Long, Long>> workSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Workset");
+		DataSet<Tuple2<Long, Long>> invariantInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Invariant Input");
+
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> deltaIt = solSetInput.iterateDelta(workSetInput, 100, 0).name(ITERATION_NAME);
+
+		DataSet<Tuple2<Long, Long>> join1 = deltaIt.getWorkset().join(invariantInput).where(0).equalTo(0)
+				.with(new IdentityJoiner<Tuple2<Long, Long>>())
+				.withForwardedFieldsFirst("*").name(JOIN_WITH_INVARIANT_NAME);
+
+		DataSet<Tuple2<Long, Long>> join2 = deltaIt.getSolutionSet().join(join1).where(0).equalTo(0)
+				.with(new IdentityJoiner<Tuple2<Long, Long>>())
+				.name(JOIN_WITH_SOLUTION_SET);
+		if(joinPreservesSolutionSet) {
+			((JoinOperator<?,?,?>)join2).withForwardedFieldsFirst("*");
 		}
-		
-		iteration.setNextWorkset(nextWorkset);
 
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, iteration, "Sink");
-		
-		Plan plan = new Plan(sink);
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		return plan;
+		DataSet<Tuple2<Long, Long>> nextWorkset = join2.groupBy(0).reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>())
+				.withForwardedFields("*").name(NEXT_WORKSET_REDUCER_NAME);
+
+		if(mapBeforeSolutionDelta) {
+
+			DataSet<Tuple2<Long, Long>> mapper = join2.map(new IdentityMapper<Tuple2<Long, Long>>())
+					.withForwardedFields("*").name(SOLUTION_DELTA_MAPPER_NAME);
+
+			deltaIt.closeWith(mapper, nextWorkset)
+					.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
+		}
+		else {
+			deltaIt.closeWith(join2, nextWorkset)
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+		}
+
+		return env.createProgramPlan();
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
index d52181d..346e702 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -246,7 +246,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
 				.distinct(0, 1)
 				.groupBy(1)
 				.sortGroup(0, Order.ASCENDING)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
 			
 			grouped
 				.coGroup(partitioned).where(0).equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
index 5758c86..17a7659 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -57,7 +57,7 @@ public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase {
 				.withPartitioner(partitioner);
 
 			joined.groupBy(1).withPartitioner(partitioner)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
 				.print();
 
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
index 0408ca9..23f4812 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -84,7 +84,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
 			
 			data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
 				.withPartitioner(new TestPartitionerInt())
-				.reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
 				.print();
 			
 			Plan p = env.createProgramPlan();
@@ -115,7 +115,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
 			data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>())
 				.withPartitioner(new TestPartitionerInt())
 				.sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
 				.print();
 			
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
index 74e5c8c..54033ac 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -76,7 +76,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 					.rebalance().setParallelism(4);
 			
 			data.groupBy("a").withPartitioner(new TestPartitionerInt())
-				.reduceGroup(new IdentityGroupReducer<Pojo2>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
 				.print();
 			
 			Plan p = env.createProgramPlan();
@@ -106,7 +106,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 			
 			data.groupBy("a").withPartitioner(new TestPartitionerInt())
 				.sortGroup("b", Order.ASCENDING)
-				.reduceGroup(new IdentityGroupReducer<Pojo3>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
 				.print();
 			
 			Plan p = env.createProgramPlan();
@@ -137,7 +137,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 			data.groupBy("a").withPartitioner(new TestPartitionerInt())
 				.sortGroup("b", Order.ASCENDING)
 				.sortGroup("c", Order.DESCENDING)
-				.reduceGroup(new IdentityGroupReducer<Pojo4>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
 				.print();
 			
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
index 72fb81b..49f44f5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -108,7 +108,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
 					.rebalance().setParallelism(4);
 			
 			data.groupBy(0).withPartitioner(new TestPartitionerInt())
-				.reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
 				.print();
 			
 			Plan p = env.createProgramPlan();
@@ -138,7 +138,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
 			
 			data.groupBy(0).withPartitioner(new TestPartitionerInt())
 				.sortGroup(1, Order.ASCENDING)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
 				.print();
 			
 			Plan p = env.createProgramPlan();
@@ -169,7 +169,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
 			data.groupBy(0).withPartitioner(new TestPartitionerInt())
 				.sortGroup(1, Order.ASCENDING)
 				.sortGroup(2, Order.DESCENDING)
-				.reduceGroup(new IdentityGroupReducer<Tuple4<Integer,Integer,Integer,Integer>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
 				.print();
 			
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
index 8eedee1..ff429b8 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -243,7 +243,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
 				.distinct(0, 1)
 				.groupBy(1)
 				.sortGroup(0, Order.ASCENDING)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
 			
 			grouped
 				.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
index 95ee4de..9c2d0d2 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -49,7 +49,7 @@ public class PartitionOperatorTest extends CompilerTestBase {
 					public int partition(Long key, int numPartitions) { return key.intValue(); }
 				}, 1)
 				.groupBy(1)
-				.reduceGroup(new IdentityGroupReducer<Tuple2<Long,Long>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
 				.print();
 			
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
new file mode 100644
index 0000000..9d8ac2e
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.util.Collector;
+
+public class IdentityCoGrouper<T> implements CoGroupFunction<T, T, T> {
+	
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void coGroup(Iterable<T> first, Iterable<T> second, Collector<T> out) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
new file mode 100644
index 0000000..54b2785
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+
+public class IdentityCrosser<T> implements CrossFunction<T, T, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public T cross(T first, T second) {
+		return first;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
index 11fd044..da4ef17 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
-
-@Combinable
 public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
new file mode 100644
index 0000000..ce24bb6
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.util.Collector;
+
+@Combinable
+public class IdentityGroupReducerCombinable<T> extends RichGroupReduceFunction<T, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void reduce(Iterable<T> values, Collector<T> out) {
+		for (T next : values) {
+			out.collect(next);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
new file mode 100644
index 0000000..faca2ce
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+
+public class IdentityJoiner<T> implements JoinFunction<T, T, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public T join(T first, T second) {
+		return first;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
deleted file mode 100644
index 6a84c44..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyCoGroupStub extends CoGroupFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
-		while (records1.hasNext()) {
-			out.collect(records1.next());
-		}
-
-		while (records2.hasNext()) {
-			out.collect(records2.next());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
deleted file mode 100644
index 8ee2285..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.types.Record;
-
-@SuppressWarnings("deprecation")
-public class DummyCrossStub extends CrossFunction {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Record cross(Record first, Record second) throws Exception {
-		return first;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
deleted file mode 100644
index 0c816e7..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-public final class DummyInputFormat extends DelimitedInputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	private final IntValue integer = new IntValue(1);
-
-	@Override
-	public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-		target.setField(0, this.integer);
-		target.setField(1, this.integer);
-		return target;
-	}
-
-	@Override
-	public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-		return (cachedStatistics instanceof FileBaseStatistics) ? (FileBaseStatistics) cachedStatistics : null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
deleted file mode 100644
index d00be6e..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirstExcept({})
-public class DummyMatchStub extends JoinFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
-		out.collect(value1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
deleted file mode 100644
index 444b48e..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyNonPreservingMatchStub extends JoinFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
-		out.collect(value1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
deleted file mode 100644
index 1bbe24c..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.util;
-
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.Record;
-
-
-public final class DummyOutputFormat extends DelimitedOutputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	@Override
-	public int serializeRecord(Record rec, byte[] target) throws Exception {
-		return 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
deleted file mode 100644
index cccc6cb..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityMap extends MapFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-	
-	@Override
-	public void map(Record record, Collector<Record> out) throws Exception {
-		out.collect(record);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
deleted file mode 100644
index f45745d..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityReduce extends ReduceFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-	
-	@Override
-	public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-		while (records.hasNext()) {
-			out.collect(records.next());
-		}
-	}
-}