You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/05/05 23:03:40 UTC

[1/4] flink git commit: [FLINK-1951] Fix NullPointerException in delta iteration due to missing temp

Repository: flink
Updated Branches:
  refs/heads/master 60ec68308 -> bd96ba8d1


[FLINK-1951] Fix NullPointerException in delta iteration due to missing temp

This closes #641


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adb321d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adb321d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adb321d6

Branch: refs/heads/master
Commit: adb321d61cc783b3a2a78f4e707104d75e1d63c0
Parents: 60ec683
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Apr 30 17:34:02 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue May 5 22:56:36 2015 +0200

----------------------------------------------------------------------
 .../plantranslate/JobGraphGenerator.java        |  5 +-
 .../plantranslate/TempInIterationsTest.java     | 81 ++++++++++++++++++++
 2 files changed, 84 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/adb321d6/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index dc21c13..2630019 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -1163,8 +1163,9 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			final TempMode tm = channel.getTempMode();
 
 			boolean needsMemory = false;
-			// Don't add a pipeline breaker if the data exchange is already blocking.
-			if (tm.breaksPipeline() && channel.getDataExchangeMode() != DataExchangeMode.BATCH) {
+			// Don't add a pipeline breaker if the data exchange is already blocking, EXCEPT the channel is within an iteration.
+			if (tm.breaksPipeline() &&
+					(channel.isOnDynamicPath() || channel.getDataExchangeMode() != DataExchangeMode.BATCH) ) {
 				config.setInputAsynchronouslyMaterialized(inputNum, true);
 				needsMemory = true;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/adb321d6/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
new file mode 100644
index 0000000..15cb03f
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/plantranslate/TempInIterationsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plantranslate;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TempInIterationsTest {
+
+	/*
+	 * Tests whether temps barriers are correctly set in within iterations
+	 */
+	@Test
+	public void testTempInIterationTest() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<Long, Long>> input = env.readCsvFile("file:///does/not/exist").types(Long.class, Long.class);
+
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
+				input.iterateDelta(input, 1, 0);
+
+		DataSet<Tuple2<Long, Long>> update = iteration.getWorkset()
+				.join(iteration.getSolutionSet()).where(0).equalTo(0)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+		iteration.closeWith(update, update)
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = (new Optimizer(new Configuration())).compile(plan);
+
+		JobGraphGenerator jgg = new JobGraphGenerator();
+		JobGraph jg = jgg.compileJobGraph(oPlan);
+
+		boolean solutionSetUpdateChecked = false;
+		for(AbstractJobVertex v : jg.getVertices()) {
+			if(v.getName().equals("SolutionSet Delta")) {
+
+				// check if input of solution set delta is temped
+				TaskConfig tc = new TaskConfig(v.getConfiguration());
+				assertTrue(tc.isInputAsynchronouslyMaterialized(0));
+				solutionSetUpdateChecked = true;
+			}
+		}
+		assertTrue(solutionSetUpdateChecked);
+
+	}
+
+}


[3/4] flink git commit: [FLINK-1682] Ported optimizer unit tests from Record API to Java API

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
index f42eb02..41e0eb9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -52,7 +53,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 					.where(0,1).equalTo(0,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -73,7 +74,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -96,7 +97,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(0,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -119,7 +120,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -142,7 +143,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -164,7 +165,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -187,7 +188,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -212,7 +213,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(0,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -238,7 +239,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -263,7 +264,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,1).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -288,7 +289,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,2).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -313,7 +314,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,2).equalTo(2,1).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -338,7 +339,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,2).equalTo(1,2).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -363,7 +364,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
 				.where(0,2).equalTo(1,2).with(new MockJoin());
 
-		joined.print();
+		joined.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -384,7 +385,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.coGroup(set2)
 				.where(0,1).equalTo(0,1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -405,7 +406,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.coGroup(set2)
 				.where(0,1).equalTo(2,1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -428,7 +429,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.coGroup(set2)
 				.where(0,1).equalTo(0,1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -451,7 +452,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.coGroup(set2)
 				.where(0,1).equalTo(2,1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -473,7 +474,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 								.withForwardedFields("2;1"))
 				.where(0,1).equalTo(2, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -495,7 +496,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 				.coGroup(set2)
 				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -517,7 +518,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 								.withForwardedFields("2"))
 				.where(0,1).equalTo(2,1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -541,7 +542,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("0;1"))
 				.where(0, 1).equalTo(0, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -566,7 +567,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("1;2"))
 				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -590,7 +591,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("2;1"))
 				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -614,7 +615,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("1"))
 				.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -638,7 +639,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("1"))
 				.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -662,7 +663,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("2"))
 				.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -686,7 +687,7 @@ public class PartitioningReusageTest extends CompilerTestBase {
 						.withForwardedFields("1"))
 				.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
 
-		coGrouped.print();
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
index 84f6377..68e8a41 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
 import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -50,7 +52,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 										.map(new IdentityMapper<Long>())
 											.withBroadcastSet(source, "bc");
 			
-			result.print();
+			result.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -84,7 +86,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 							.withBroadcastSet(bcInput1, "bc1")
 							.withBroadcastSet(bcInput2, "bc2");
 			
-			result.print();
+			result.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -123,7 +125,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 							.withBroadcastSet(bcInput1, "bc1");
 							
 			
-			iteration.closeWith(result).print();
+			iteration.closeWith(result).output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -154,7 +156,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 				initialSource
 					.map(new IdentityMapper<Long>())
 					.cross(initialSource).withParameters(conf)
-					.print();
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 				
 				
 				Plan p = env.createProgramPlan();
@@ -176,7 +178,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 				initialSource
 					.map(new IdentityMapper<Long>())
 					.cross(initialSource).withParameters(conf)
-					.print();
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 				
 				
 				Plan p = env.createProgramPlan();
@@ -199,7 +201,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 				initialSource
 					.map(new IdentityMapper<Long>())
 					.cross(initialSource).withParameters(conf)
-					.print();
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 				
 				
 				Plan p = env.createProgramPlan();
@@ -222,7 +224,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
 				initialSource
 					.map(new IdentityMapper<Long>())
 					.cross(initialSource).withParameters(conf)
-					.print();
+					.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 				
 				
 				Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
index d6b9444..dc9f2e5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -64,7 +65,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy(0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long,String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -97,7 +98,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy(1, 0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long,String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -129,7 +130,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy("*");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long,SomePojo,String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -161,7 +162,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy("f1");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long,SomePojo,String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -193,7 +194,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy("f1.stringField");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -225,7 +226,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy("f1.intField; f2");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -258,7 +259,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data.getSplitDataProperties()
 				.splitsPartitionedBy("byDate", 1, 0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -293,7 +294,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(0)
 				.splitsGroupedBy(0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -327,7 +328,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(0)
 				.splitsGroupedBy(1, 0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -362,7 +363,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(1)
 				.splitsGroupedBy(0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -396,7 +397,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(0, 1)
 				.splitsGroupedBy(0);
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -429,7 +430,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy("f2")
 				.splitsGroupedBy("f2");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -463,7 +464,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy("f1.intField")
 				.splitsGroupedBy("f0; f1.intField");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -497,7 +498,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy("f1.intField")
 				.splitsGroupedBy("f1");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -530,7 +531,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy("f1")
 				.splitsGroupedBy("f1.stringField");
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -565,7 +566,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(1)
 				.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -599,7 +600,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(1)
 				.splitsOrderedBy(new int[]{1, 0}, new Order[]{Order.ASCENDING, Order.DESCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -634,7 +635,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(0)
 				.splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -668,7 +669,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy(0, 1)
 				.splitsOrderedBy(new int[]{1}, new Order[]{Order.DESCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -701,7 +702,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 			.splitsPartitionedBy("f1.intField")
 			.splitsOrderedBy("f0; f1.intField", new Order[]{Order.ASCENDING, Order.DESCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -735,7 +736,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy("f1.intField")
 				.splitsOrderedBy("f1", new Order[]{Order.DESCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -768,7 +769,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 				.splitsPartitionedBy("f1")
 				.splitsOrderedBy("f1.stringField", new Order[]{Order.ASCENDING});
 
-		data.print();
+		data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -808,7 +809,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data2.getSplitDataProperties()
 				.splitsPartitionedBy("byDate", 0);
 
-		data1.union(data2).print();
+		data1.union(data2).output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 
@@ -856,7 +857,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
 		data2.getSplitDataProperties()
 				.splitsPartitionedBy("byDate", 0);
 
-		data1.union(data2).print();
+		data1.union(data2).output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
 		JavaPlan plan = env.createProgramPlan();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
index 3af64fc..b0dca66 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
@@ -20,36 +20,35 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.fail;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
 import org.junit.Test;
 
 /**
  * This test case has been created to validate a bug that occurred when
  * the ReduceOperator was used without a grouping key.
  */
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class ReduceAllTest extends CompilerTestBase {
 
 	@Test
 	public void testReduce() {
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce()).name("Reduce1").input(source).build();
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setInput(reduce1);
-		Plan plan = new Plan(sink, "AllReduce Test");
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> set1 = env.generateSequence(0,1);
+
+		set1.reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
+				.output(new DiscardingOutputFormat<Long>()).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
+
 		try {
 			OptimizedPlan oPlan = compileNoStats(plan);
 			JobGraphGenerator jobGen = new JobGraphGenerator();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
index 25643a4..26af380 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -44,7 +44,7 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial", "unchecked"})
 public class ReplicatingDataSourceTest extends CompilerTestBase {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index 3a24ce1..00ada2a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -57,7 +58,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
 				.groupBy(1)
 				.reduce(new MockReducer()).withForwardedFields("*");
 
-		set.print();
+		set.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -118,7 +119,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
 				.reduce(new MockReducer()).withForwardedFields("f1->f2");
 		DataSet<Tuple3<Integer, Integer, Integer>> out = in1.join(in2).where(1).equalTo(2).with(new MockJoin());
 
-		out.print();
+		out.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
 		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileWithStats(plan);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
index 65e5025..a94f845 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
@@ -30,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -51,12 +52,12 @@ public class SortPartialReuseTest extends CompilerTestBase {
 				.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
 				
 				.groupBy(0, 1)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
 				
 				.groupBy(0)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
-				
-				.print();
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
+
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -96,12 +97,12 @@ public class SortPartialReuseTest extends CompilerTestBase {
 				.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
 				
 				.groupBy(0, 1)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
 				
 				.groupBy(1)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
-				
-				.print();
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
+
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
index 1001626..f041b2a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
 import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -50,8 +51,8 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
 			DataSet<Long> result = iteration.closeWith(
 					input2.union(input2).union(iteration.union(iteration)));
 				
-			result.print();
-			result.print();
+			result.output(new DiscardingOutputFormat<Long>());
+			result.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -102,8 +103,8 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
 			DataSet<Long> iterResult = iteration
 				.closeWith(iteration.union(iteration).union(input2.union(input2)));
 			
-			iterResult.print();
-			iterResult.print();
+			iterResult.output(new DiscardingOutputFormat<Long>());
+			iterResult.output(new DiscardingOutputFormat<Long>());
 			
 			
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
index 2e52565..fee6e17 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
@@ -26,13 +26,12 @@ import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.IntValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Visitor;
 import org.junit.Assert;
@@ -45,37 +44,26 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
 
 
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class UnionPropertyPropagationTest extends CompilerTestBase {
 
-	@SuppressWarnings("unchecked")
 	@Test
-	public void testUnionPropertyOldApiPropagation() {
+	public void testUnion1() {
 		// construct the plan
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> sourceA = env.generateSequence(0,1);
+		DataSet<Long> sourceB = env.generateSequence(0,1);
 
-		FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
-		FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
-		
-		ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-			.input(sourceA)
-			.build();
-		ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-			.input(sourceB)
-			.build();
-		
-		ReduceOperator globalRed = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).build();
-		globalRed.addInput(redA);
-		globalRed.addInput(redB);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, globalRed);
-		
-		// return the plan
-		Plan plan = new Plan(sink, "Union Property Propagation");
+		DataSet<Long> redA = sourceA.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>());
+		DataSet<Long> redB = sourceB.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>());
+
+		redA.union(redB).groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+			.output(new DiscardingOutputFormat<Long>());
+
+		JavaPlan plan = env.createProgramPlan();
 		
 		OptimizedPlan oPlan = compileNoStats(plan);
 		
@@ -88,7 +76,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
 			
 			@Override
 			public boolean preVisit(PlanNode visitable) {
-				if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperator) {
+				if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof GroupReduceOperatorBase) {
 					for (Channel inConn : visitable.getInputs()) {
 						Assert.assertTrue("Reduce should just forward the input if it is already partitioned",
 								inConn.getShipStrategy() == ShipStrategyType.FORWARD); 
@@ -107,7 +95,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase {
 	}
 	
 	@Test
-	public void testUnionNewApiAssembly() {
+	public void testUnion2() {
 		final int NUM_INPUTS = 4;
 		
 		// construct the plan it will be multiple flat maps, all unioned

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index e81e0ec..65dd2b3 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -40,8 +41,8 @@ public class UnionReplacementTest extends CompilerTestBase {
 	
 			DataSet<String> union = input1.union(input2);
 	
-			union.print();
-			union.print();
+			union.output(new DiscardingOutputFormat<String>());
+			union.output(new DiscardingOutputFormat<String>());
 	
 			Plan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
index 321ca5a..32bd6e9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -24,11 +24,13 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
@@ -40,12 +42,13 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			
-			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 100).map(new Duplicator<Long>());
+			DataSet<Tuple2<Long, Long>> input = env.readCsvFile("/tmp/some.csv").types(Long.class, Long.class);
 			
 			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = input.iterateDelta(input, 100, 1);
 			
-			DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new TestMapper<Tuple2<Long,Long>>());
-			iteration.closeWith(iterEnd, iterEnd).print();
+			DataSet<Tuple2<Long, Long>> iterEnd = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+			iteration.closeWith(iterEnd, iterEnd)
+					.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -61,18 +64,5 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
-	
-	private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
-		@Override
-		public Tuple2<T, T> map(T value) {
-			return new Tuple2<T, T>(value, value);
-		}
-	}
-	
-	private static final class TestMapper<T> implements MapFunction<T, T> {
-		@Override
-		public T map(T value) {
-			return value;
-		}
-	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
index 27f367f..46b9357 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
@@ -25,25 +25,21 @@ import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.JoinOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.LongValue;
 import org.junit.Test;
 
 
@@ -51,7 +47,6 @@ import org.junit.Test;
 * Tests that validate optimizer choices when using operators that are requesting certain specific execution
 * strategies.
 */
-@SuppressWarnings("deprecation")
 public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 	
 	private static final long serialVersionUID = 1L;
@@ -66,7 +61,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 
 	@Test
 	public void testRecordApiWithDeferredSoltionSetUpdateWithMapper() {
-		Plan plan = getRecordTestPlan(false, true);
+		Plan plan = getTestPlan(false, true);
 		
 		OptimizedPlan oPlan;
 		try {
@@ -112,7 +107,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 	
 	@Test
 	public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
-		Plan plan = getRecordTestPlan(false, false);
+		Plan plan = getTestPlan(false, false);
 		
 		OptimizedPlan oPlan;
 		try {
@@ -156,7 +151,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 	
 	@Test
 	public void testRecordApiWithDirectSoltionSetUpdate() {
-		Plan plan = getRecordTestPlan(true, false);
+		Plan plan = getTestPlan(true, false);
 		
 		OptimizedPlan oPlan;
 		try {
@@ -197,52 +192,45 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 		new JobGraphGenerator().compileJobGraph(oPlan);
 	}
 	
-	private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
-		FileDataSource solutionSetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Solution Set");
-		FileDataSource worksetInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Workset");
-		
-		FileDataSource invariantInput = new FileDataSource(new DummyInputFormat(), IN_FILE, "Invariant Input");
-		
-		DeltaIteration iteration = new DeltaIteration(0, ITERATION_NAME);
-		iteration.setInitialSolutionSet(solutionSetInput);
-		iteration.setInitialWorkset(worksetInput);
-		iteration.setMaximumNumberOfIterations(100);
-
-		JoinOperator joinWithInvariant = JoinOperator.builder(new DummyMatchStub(), LongValue.class, 0, 0)
-				.input1(iteration.getWorkset())
-				.input2(invariantInput)
-				.name(JOIN_WITH_INVARIANT_NAME)
-				.build();
-
-		JoinOperator joinWithSolutionSet = JoinOperator.builder(
-				joinPreservesSolutionSet ? new DummyMatchStub() : new DummyNonPreservingMatchStub(), LongValue.class, 0, 0)
-				.input1(iteration.getSolutionSet())
-				.input2(joinWithInvariant)
-				.name(JOIN_WITH_SOLUTION_SET)
-				.build();
-		
-		ReduceOperator nextWorkset = ReduceOperator.builder(new IdentityReduce(), LongValue.class, 0)
-				.input(joinWithSolutionSet)
-				.name(NEXT_WORKSET_REDUCER_NAME)
-				.build();
-		
-		if (mapBeforeSolutionDelta) {
-			MapOperator mapper = MapOperator.builder(new IdentityMap())
-				.input(joinWithSolutionSet)
-				.name(SOLUTION_DELTA_MAPPER_NAME)
-				.build();
-			iteration.setSolutionSetDelta(mapper);
-		} else {
-			iteration.setSolutionSetDelta(joinWithSolutionSet);
+	private Plan getTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) {
+
+		// construct the plan
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Tuple2<Long, Long>> solSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Solution Set");
+		DataSet<Tuple2<Long, Long>> workSetInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Workset");
+		DataSet<Tuple2<Long, Long>> invariantInput = env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Invariant Input");
+
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> deltaIt = solSetInput.iterateDelta(workSetInput, 100, 0).name(ITERATION_NAME);
+
+		DataSet<Tuple2<Long, Long>> join1 = deltaIt.getWorkset().join(invariantInput).where(0).equalTo(0)
+				.with(new IdentityJoiner<Tuple2<Long, Long>>())
+				.withForwardedFieldsFirst("*").name(JOIN_WITH_INVARIANT_NAME);
+
+		DataSet<Tuple2<Long, Long>> join2 = deltaIt.getSolutionSet().join(join1).where(0).equalTo(0)
+				.with(new IdentityJoiner<Tuple2<Long, Long>>())
+				.name(JOIN_WITH_SOLUTION_SET);
+		if(joinPreservesSolutionSet) {
+			((JoinOperator<?,?,?>)join2).withForwardedFieldsFirst("*");
 		}
-		
-		iteration.setNextWorkset(nextWorkset);
 
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, iteration, "Sink");
-		
-		Plan plan = new Plan(sink);
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		return plan;
+		DataSet<Tuple2<Long, Long>> nextWorkset = join2.groupBy(0).reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>())
+				.withForwardedFields("*").name(NEXT_WORKSET_REDUCER_NAME);
+
+		if(mapBeforeSolutionDelta) {
+
+			DataSet<Tuple2<Long, Long>> mapper = join2.map(new IdentityMapper<Tuple2<Long, Long>>())
+					.withForwardedFields("*").name(SOLUTION_DELTA_MAPPER_NAME);
+
+			deltaIt.closeWith(mapper, nextWorkset)
+					.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
+		}
+		else {
+			deltaIt.closeWith(join2, nextWorkset)
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+		}
+
+		return env.createProgramPlan();
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
index d52181d..346e702 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -246,7 +246,7 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
 				.distinct(0, 1)
 				.groupBy(1)
 				.sortGroup(0, Order.ASCENDING)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
 			
 			grouped
 				.coGroup(partitioned).where(0).equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
index 5758c86..17a7659 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -57,7 +57,7 @@ public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase {
 				.withPartitioner(partitioner);
 
 			joined.groupBy(1).withPartitioner(partitioner)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
 				.print();
 
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
index 0408ca9..23f4812 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -84,7 +84,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
 			
 			data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
 				.withPartitioner(new TestPartitionerInt())
-				.reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
 				.print();
 			
 			Plan p = env.createProgramPlan();
@@ -115,7 +115,7 @@ public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
 			data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>())
 				.withPartitioner(new TestPartitionerInt())
 				.sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
 				.print();
 			
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
index 74e5c8c..54033ac 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -76,7 +76,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 					.rebalance().setParallelism(4);
 			
 			data.groupBy("a").withPartitioner(new TestPartitionerInt())
-				.reduceGroup(new IdentityGroupReducer<Pojo2>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
 				.print();
 			
 			Plan p = env.createProgramPlan();
@@ -106,7 +106,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 			
 			data.groupBy("a").withPartitioner(new TestPartitionerInt())
 				.sortGroup("b", Order.ASCENDING)
-				.reduceGroup(new IdentityGroupReducer<Pojo3>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
 				.print();
 			
 			Plan p = env.createProgramPlan();
@@ -137,7 +137,7 @@ public class GroupingPojoTranslationTest extends CompilerTestBase {
 			data.groupBy("a").withPartitioner(new TestPartitionerInt())
 				.sortGroup("b", Order.ASCENDING)
 				.sortGroup("c", Order.DESCENDING)
-				.reduceGroup(new IdentityGroupReducer<Pojo4>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
 				.print();
 			
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
index 72fb81b..49f44f5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -108,7 +108,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
 					.rebalance().setParallelism(4);
 			
 			data.groupBy(0).withPartitioner(new TestPartitionerInt())
-				.reduceGroup(new IdentityGroupReducer<Tuple2<Integer,Integer>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
 				.print();
 			
 			Plan p = env.createProgramPlan();
@@ -138,7 +138,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
 			
 			data.groupBy(0).withPartitioner(new TestPartitionerInt())
 				.sortGroup(1, Order.ASCENDING)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
 				.print();
 			
 			Plan p = env.createProgramPlan();
@@ -169,7 +169,7 @@ public class GroupingTupleTranslationTest extends CompilerTestBase {
 			data.groupBy(0).withPartitioner(new TestPartitionerInt())
 				.sortGroup(1, Order.ASCENDING)
 				.sortGroup(2, Order.DESCENDING)
-				.reduceGroup(new IdentityGroupReducer<Tuple4<Integer,Integer,Integer,Integer>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
 				.print();
 			
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
index 8eedee1..ff429b8 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -243,7 +243,7 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
 				.distinct(0, 1)
 				.groupBy(1)
 				.sortGroup(0, Order.ASCENDING)
-				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
 			
 			grouped
 				.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
index 95ee4de..9c2d0d2 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -49,7 +49,7 @@ public class PartitionOperatorTest extends CompilerTestBase {
 					public int partition(Long key, int numPartitions) { return key.intValue(); }
 				}, 1)
 				.groupBy(1)
-				.reduceGroup(new IdentityGroupReducer<Tuple2<Long,Long>>())
+				.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
 				.print();
 			
 			Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
new file mode 100644
index 0000000..9d8ac2e
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.util.Collector;
+
+public class IdentityCoGrouper<T> implements CoGroupFunction<T, T, T> {
+	
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void coGroup(Iterable<T> first, Iterable<T> second, Collector<T> out) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
new file mode 100644
index 0000000..54b2785
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+
+public class IdentityCrosser<T> implements CrossFunction<T, T, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public T cross(T first, T second) {
+		return first;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
index 11fd044..da4ef17 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
-
-@Combinable
 public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
new file mode 100644
index 0000000..ce24bb6
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.util.Collector;
+
+@Combinable
+public class IdentityGroupReducerCombinable<T> extends RichGroupReduceFunction<T, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void reduce(Iterable<T> values, Collector<T> out) {
+		for (T next : values) {
+			out.collect(next);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
new file mode 100644
index 0000000..faca2ce
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+
+public class IdentityJoiner<T> implements JoinFunction<T, T, T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public T join(T first, T second) {
+		return first;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
deleted file mode 100644
index 6a84c44..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyCoGroupStub extends CoGroupFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
-		while (records1.hasNext()) {
-			out.collect(records1.next());
-		}
-
-		while (records2.hasNext()) {
-			out.collect(records2.next());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
deleted file mode 100644
index 8ee2285..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.types.Record;
-
-@SuppressWarnings("deprecation")
-public class DummyCrossStub extends CrossFunction {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Record cross(Record first, Record second) throws Exception {
-		return first;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
deleted file mode 100644
index 0c816e7..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-public final class DummyInputFormat extends DelimitedInputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	private final IntValue integer = new IntValue(1);
-
-	@Override
-	public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-		target.setField(0, this.integer);
-		target.setField(1, this.integer);
-		return target;
-	}
-
-	@Override
-	public FileBaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-		return (cachedStatistics instanceof FileBaseStatistics) ? (FileBaseStatistics) cachedStatistics : null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
deleted file mode 100644
index d00be6e..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirstExcept({})
-public class DummyMatchStub extends JoinFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
-		out.collect(value1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
deleted file mode 100644
index 444b48e..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyNonPreservingMatchStub extends JoinFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
-		out.collect(value1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
deleted file mode 100644
index 1bbe24c..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.util;
-
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.Record;
-
-
-public final class DummyOutputFormat extends DelimitedOutputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	@Override
-	public int serializeRecord(Record rec, byte[] target) throws Exception {
-		return 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
deleted file mode 100644
index cccc6cb..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityMap extends MapFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-	
-	@Override
-	public void map(Record record, Collector<Record> out) throws Exception {
-		out.collect(record);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
deleted file mode 100644
index f45745d..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityReduce extends ReduceFunction implements Serializable {
-	private static final long serialVersionUID = 1L;
-	
-	@Override
-	public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-		while (records.hasNext()) {
-			out.collect(records.next());
-		}
-	}
-}


[4/4] flink git commit: [FLINK-1682] Ported optimizer unit tests from Record API to Java API

Posted by fh...@apache.org.
[FLINK-1682] Ported optimizer unit tests from Record API to Java API

This closes #627


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd96ba8d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd96ba8d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd96ba8d

Branch: refs/heads/master
Commit: bd96ba8d1bbdc494ac88b98a6469255572f4a9fc
Parents: adb321d
Author: Fabian Hueske <fh...@apache.org>
Authored: Sat Apr 25 01:30:11 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue May 5 23:01:47 2015 +0200

----------------------------------------------------------------------
 .../optimizer/AdditionalOperatorsTest.java      |  64 +--
 .../optimizer/BranchingPlansCompilerTest.java   | 509 +++++++------------
 .../BroadcastVariablePipelinebreakerTest.java   |   7 +-
 .../CachedMatchStrategyCompilerTest.java        |   7 +-
 .../optimizer/CoGroupSolutionSetFirstTest.java  |   3 +-
 .../flink/optimizer/DisjointDataFlowsTest.java  |   7 +-
 .../optimizer/DistinctCompilationTest.java      |  10 +-
 .../apache/flink/optimizer/GroupOrderTest.java  | 100 ++--
 .../optimizer/HardPlansCompilationTest.java     |  55 +-
 .../flink/optimizer/IterationsCompilerTest.java |  22 +-
 .../flink/optimizer/NestedIterationsTest.java   |   9 +-
 .../flink/optimizer/ParallelismChangeTest.java  | 250 ++++-----
 .../flink/optimizer/PartitionPushdownTest.java  |   5 +-
 .../optimizer/PartitioningReusageTest.java      |  57 ++-
 .../flink/optimizer/PipelineBreakerTest.java    |  16 +-
 .../flink/optimizer/PropertyDataSourceTest.java |  49 +-
 .../apache/flink/optimizer/ReduceAllTest.java   |  31 +-
 .../optimizer/ReplicatingDataSourceTest.java    |   2 +-
 .../SemanticPropertiesAPIToPlanTest.java        |   5 +-
 .../flink/optimizer/SortPartialReuseTest.java   |  19 +-
 .../UnionBetweenDynamicAndStaticPathTest.java   |   9 +-
 .../optimizer/UnionPropertyPropagationTest.java |  48 +-
 .../flink/optimizer/UnionReplacementTest.java   |   5 +-
 .../WorksetIterationCornerCasesTest.java        |  24 +-
 .../WorksetIterationsRecordApiCompilerTest.java | 110 ++--
 .../CoGroupCustomPartitioningTest.java          |   4 +-
 ...ustomPartitioningGlobalOptimizationTest.java |   4 +-
 .../GroupingKeySelectorTranslationTest.java     |   6 +-
 .../GroupingPojoTranslationTest.java            |   8 +-
 .../GroupingTupleTranslationTest.java           |   8 +-
 .../JoinCustomPartitioningTest.java             |   4 +-
 .../optimizer/java/PartitionOperatorTest.java   |   4 +-
 .../testfunctions/IdentityCoGrouper.java        |  30 ++
 .../testfunctions/IdentityCrosser.java          |  32 ++
 .../testfunctions/IdentityGroupReducer.java     |   2 -
 .../IdentityGroupReducerCombinable.java         |  37 ++
 .../optimizer/testfunctions/IdentityJoiner.java |  32 ++
 .../flink/optimizer/util/DummyCoGroupStub.java  |  42 --
 .../flink/optimizer/util/DummyCrossStub.java    |  32 --
 .../flink/optimizer/util/DummyInputFormat.java  |  42 --
 .../flink/optimizer/util/DummyMatchStub.java    |  37 --
 .../util/DummyNonPreservingMatchStub.java       |  35 --
 .../flink/optimizer/util/DummyOutputFormat.java |  34 --
 .../flink/optimizer/util/IdentityMap.java       |  37 --
 .../flink/optimizer/util/IdentityReduce.java    |  40 --
 .../flink/optimizer/util/OperatorResolver.java  |  15 +-
 46 files changed, 752 insertions(+), 1156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
index 0c50536..a4e74a9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
@@ -21,18 +21,14 @@ package org.apache.flink.optimizer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.CrossWithLargeOperator;
-import org.apache.flink.api.java.record.operators.CrossWithSmallOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
@@ -41,27 +37,23 @@ import org.junit.Test;
 * Tests that validate optimizer choices when using operators that are requesting certain specific execution
 * strategies.
 */
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class AdditionalOperatorsTest extends CompilerTestBase {
 
 	@Test
 	public void testCrossWithSmall() {
 		// construct the plan
-		FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
-		FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
-		
-		CrossOperator cross = CrossWithSmallOperator.builder(new DummyCrossStub())
-				.input1(source1).input2(source2)
-				.name("Cross").build();
-	
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
-		
-		Plan plan = new Plan(sink);
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> set1 = env.generateSequence(0,1);
+		DataSet<Long> set2 = env.generateSequence(0,1);
+
+		set1.crossWithTiny(set2).name("Cross")
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
 		try {
-			OptimizedPlan oPlan = compileNoStats(plan);
+			JavaPlan plan = env.createProgramPlan();
+			OptimizedPlan oPlan = compileWithStats(plan);
 			OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
 			
 			DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
@@ -72,27 +64,23 @@ public class AdditionalOperatorsTest extends CompilerTestBase {
 			assertEquals(ShipStrategyType.BROADCAST, in2.getShipStrategy());
 		} catch(CompilerException ce) {
 			ce.printStackTrace();
-			fail("The pact compiler is unable to compile this plan correctly.");
+			fail("The Flink optimizer is unable to compile this plan correctly.");
 		}
 	}
 	
 	@Test
 	public void testCrossWithLarge() {
 		// construct the plan
-		FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
-		FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
-		
-		CrossOperator cross= CrossWithLargeOperator.builder(new DummyCrossStub())
-				.input1(source1).input2(source2)
-				.name("Cross").build();
-	
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
-		
-		Plan plan = new Plan(sink);
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> set1 = env.generateSequence(0,1);
+		DataSet<Long> set2 = env.generateSequence(0,1);
+
+		set1.crossWithHuge(set2).name("Cross")
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());;
+
 		try {
+			JavaPlan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);
 			OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index 94ff41a..1d5b7c1 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
 
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.io.TextOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.testfunctions.IdentityCoGrouper;
+import org.apache.flink.optimizer.testfunctions.IdentityCrosser;
+import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
 import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.util.Collector;
@@ -41,15 +45,6 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
@@ -59,18 +54,8 @@ import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
-import org.apache.flink.optimizer.util.DummyCoGroupStub;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-
-@SuppressWarnings({"serial", "deprecation"})
+
+@SuppressWarnings({"serial"})
 public class BranchingPlansCompilerTest extends CompilerTestBase {
 	
 	
@@ -323,84 +308,53 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	public void testBranchEachContractType() {
 		try {
 			// construct the plan
-			FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), "file:///test/file1", "Source A");
-			FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), "file:///test/file2", "Source B");
-			FileDataSource sourceC = new FileDataSource(new DummyInputFormat(), "file:///test/file3", "Source C");
-			
-			MapOperator map1 = MapOperator.builder(new IdentityMap()).input(sourceA).name("Map 1").build();
-			
-			ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-				.input(map1)
-				.name("Reduce 1")
-				.build();
-			
-			JoinOperator match1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(sourceB, sourceB, sourceC)
-				.input2(sourceC)
-				.name("Match 1")
-				.build();
-			;
-			CoGroupOperator cogroup1 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(sourceA)
-				.input2(sourceB)
-				.name("CoGroup 1")
-				.build();
-			
-			CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub())
-				.input1(reduce1)
-				.input2(cogroup1)
-				.name("Cross 1")
-				.build();
-			
-			
-			CoGroupOperator cogroup2 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(cross1)
-				.input2(cross1)
-				.name("CoGroup 2")
-				.build();
-			
-			CoGroupOperator cogroup3 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(map1)
-				.input2(match1)
-				.name("CoGroup 3")
-				.build();
-			
-			
-			MapOperator map2 = MapOperator.builder(new IdentityMap()).input(cogroup3).name("Map 2").build();
-			
-			CoGroupOperator cogroup4 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(map2)
-				.input2(match1)
-				.name("CoGroup 4")
-				.build();
-			
-			CoGroupOperator cogroup5 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(cogroup2)
-				.input2(cogroup1)
-				.name("CoGroup 5")
-				.build();
-			
-			CoGroupOperator cogroup6 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(reduce1)
-				.input2(cogroup4)
-				.name("CoGroup 6")
-				.build();
-			
-			CoGroupOperator cogroup7 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
-				.input1(cogroup5)
-				.input2(cogroup6)
-				.name("CoGroup 7")
-				.build();
-			
-			FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cogroup7);
-			sink.addInput(sourceA);
-			sink.addInput(cogroup3);
-			sink.addInput(cogroup4);
-			sink.addInput(cogroup1);
-			
-			// return the PACT plan
-			Plan plan = new Plan(sink, "Branching of each contract type");
-			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(DEFAULT_PARALLELISM);
+			DataSet<Long> sourceA = env.generateSequence(0,1);
+			DataSet<Long> sourceB = env.generateSequence(0,1);
+			DataSet<Long> sourceC = env.generateSequence(0,1);
+
+			DataSet<Long> map1 = sourceA.map(new IdentityMapper<Long>()).name("Map 1");
+
+			DataSet<Long> reduce1 = map1.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce 1");
+
+			DataSet<Long> join1 = sourceB.union(sourceB).union(sourceC)
+					.join(sourceC).where("*").equalTo("*")
+					.with(new IdentityJoiner<Long>()).name("Join 1");
+
+			DataSet<Long> coGroup1 = sourceA.coGroup(sourceB).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 1");
+
+			DataSet<Long> cross1 = reduce1.cross(coGroup1)
+					.with(new IdentityCrosser<Long>()).name("Cross 1");
+
+			DataSet<Long> coGroup2 = cross1.coGroup(cross1).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 2");
+
+			DataSet<Long> coGroup3 = map1.coGroup(join1).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 3");
+
+			DataSet<Long> map2 = coGroup3.map(new IdentityMapper<Long>()).name("Map 2");
+
+			DataSet<Long> coGroup4 = map2.coGroup(join1).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 4");
+
+			DataSet<Long> coGroup5 = coGroup2.coGroup(coGroup1).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 5");
+
+			DataSet<Long> coGroup6 = reduce1.coGroup(coGroup4).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 6");
+
+			DataSet<Long> coGroup7 = coGroup5.coGroup(coGroup6).where("*").equalTo("*")
+					.with(new IdentityCoGrouper<Long>()).name("CoGroup 7");
+
+			coGroup7.union(sourceA)
+					.union(coGroup3)
+					.union(coGroup4)
+					.union(coGroup1)
+					.output(new DiscardingOutputFormat<Long>());
+
+			JavaPlan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);
 			
 			JobGraphGenerator jobGen = new JobGraphGenerator();
@@ -418,47 +372,33 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	public void testBranchingUnion() {
 		try {
 			// construct the plan
-			FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE);
-			FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE);
-			
-			JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(source1)
-				.input2(source2)
-				.name("Match 1")
-				.build();
-			
-			MapOperator ma1 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map1").build();
-			
-			ReduceOperator r1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-				.input(ma1)
-				.name("Reduce 1")
-				.build();
-			
-			ReduceOperator r2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-				.input(mat1)
-				.name("Reduce 2")
-				.build();
-			
-			MapOperator ma2 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map 2").build();
-			
-			MapOperator ma3 = MapOperator.builder(new IdentityMap()).input(ma2).name("Map 3").build();
-			
-			@SuppressWarnings("unchecked")
-			JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-				.input1(r1, r2, ma2, ma3)
-				.input2(ma2)
-				.name("Match 2")
-				.build();
-			mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_MERGE);
-			
-			FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat2);
-			
-			
-			// return the PACT plan
-			Plan plan = new Plan(sink, "Branching Union");
-			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(DEFAULT_PARALLELISM);
+			DataSet<Long> source1 = env.generateSequence(0,1);
+			DataSet<Long> source2 = env.generateSequence(0,1);
+
+			DataSet<Long> join1 = source1.join(source2).where("*").equalTo("*")
+					.with(new IdentityJoiner<Long>()).name("Join 1");
+
+			DataSet<Long> map1 = join1.map(new IdentityMapper<Long>()).name("Map 1");
+
+			DataSet<Long> reduce1 = map1.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce 1");
+
+			DataSet<Long> reduce2 = join1.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce 2");
+
+			DataSet<Long> map2 = join1.map(new IdentityMapper<Long>()).name("Map 2");
+
+			DataSet<Long> map3 = map2.map(new IdentityMapper<Long>()).name("Map 3");
+
+			DataSet<Long> join2 = reduce1.union(reduce2).union(map2).union(map3)
+					.join(map2, JoinHint.REPARTITION_SORT_MERGE).where("*").equalTo("*")
+					.with(new IdentityJoiner<Long>()).name("Join 2");
+
+			join2.output(new DiscardingOutputFormat<Long>());
+
+			JavaPlan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);
-			
+
 			JobGraphGenerator jobGen = new JobGraphGenerator();
 			
 			//Compile plan to verify that no error is thrown
@@ -480,22 +420,18 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	@Test
 	public void testBranchingWithMultipleDataSinksSmall() {
 		try {
+			String outPath1 = "/tmp/out1";
+			String outPath2 = "/tmp/out2";
+
 			// construct the plan
-			final String out1Path = "file:///test/1";
-			final String out2Path = "file:///test/2";
-	
-			FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
-			
-			FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA);
-			FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, sourceA);
-			
-			List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-			sinks.add(sinkA);
-			sinks.add(sinkB);
-			
-			// return the PACT plan
-			Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks");
-			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(DEFAULT_PARALLELISM);
+			DataSet<Long> source1 = env.generateSequence(0,1);
+
+			source1.writeAsText(outPath1);
+			source1.writeAsText(outPath2);
+
+			JavaPlan plan = env.createProgramPlan();
 			OptimizedPlan oPlan = compileNoStats(plan);
 			
 			// ---------- check the optimizer plan ----------
@@ -505,15 +441,16 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 			
 			// sinks contain all sink paths
 			Set<String> allSinks = new HashSet<String>();
-			allSinks.add(out1Path);
-			allSinks.add(out2Path);
+			allSinks.add(outPath1);
+			allSinks.add(outPath2);
 			
 			for (SinkPlanNode n : oPlan.getDataSinks()) {
-				String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
+				String path = ((TextOutputFormat<String>)n.getSinkNode().getOperator()
+						.getFormatWrapper().getUserCodeObject()).getOutputFilePath().toString();
 				Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
 			}
 			
-			// ---------- compile plan to nephele job graph to verify that no error is thrown ----------
+			// ---------- compile plan to job graph to verify that no error is thrown ----------
 			
 			JobGraphGenerator jobGen = new JobGraphGenerator();
 			jobGen.compileJobGraph(oPlan);
@@ -541,50 +478,38 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		final String out3Path = "file:///test/3";
 		final String out4Path = "file:///test/4";
 
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
-		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE);
-		
-		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA, "1");
-		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, out2Path, sourceB, "2");
-		FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, out3Path, sourceA, "3");
-		FileDataSink sink4 = new FileDataSink(DummyOutputFormat.class, out4Path, sourceB, "4");
-		
-		
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink1);
-		sinks.add(sink2);
-		sinks.add(sink3);
-		sinks.add(sink4);
-		
-		// return the PACT plan
-		Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks and branches");
+		// construct the plan
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> sourceA = env.generateSequence(0,1);
+		DataSet<Long> sourceB = env.generateSequence(0,1);
+
+		sourceA.writeAsText(out1Path);
+		sourceB.writeAsText(out2Path);
+		sourceA.writeAsText(out3Path);
+		sourceB.writeAsText(out4Path);
+
+		JavaPlan plan = env.createProgramPlan();
 		compileNoStats(plan);
+
 	}
 	
 	@Test
 	public void testBranchAfterIteration() {
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-		
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(sourceA);
-		iteration.setMaximumNumberOfIterations(10);
-		
-		MapOperator mapper = MapOperator.builder(IdentityMap.class).name("Mapper").input(iteration.getPartialSolution()).build();
-		iteration.setNextPartialSolution(mapper);
-		
-		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 1");
-		
-		MapOperator postMap = MapOperator.builder(IdentityMap.class).name("Post Iteration Mapper")
-				.input(iteration).build();
-		
-		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink 2");
-		
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink1);
-		sinks.add(sink2);
-		
-		Plan plan = new Plan(sinks);
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> sourceA = env.generateSequence(0,1);
+
+		IterativeDataSet<Long> loopHead = sourceA.iterate(10);
+		DataSet<Long> loopTail = loopHead.map(new IdentityMapper<Long>()).name("Mapper");
+		DataSet<Long> loopRes = loopHead.closeWith(loopTail);
+
+		loopRes.output(new DiscardingOutputFormat<Long>());
+		loopRes.map(new IdentityMapper<Long>())
+				.output(new DiscardingOutputFormat<Long>());;
+
+		JavaPlan plan = env.createProgramPlan();
+
 		try {
 			compileNoStats(plan);
 		}
@@ -596,31 +521,20 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	
 	@Test
 	public void testBranchBeforeIteration() {
-		FileDataSource source1 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
-		FileDataSource source2 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-		
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(source2);
-		iteration.setMaximumNumberOfIterations(10);
-		
-		MapOperator inMap = MapOperator.builder(new IdentityMap())
-				                       .input(source1)
-				                       .name("In Iteration Map")
-				                       .setBroadcastVariable("BC", iteration.getPartialSolution())
-				                       .build();
-		
-		iteration.setNextPartialSolution(inMap);
-		
-		MapOperator postMap = MapOperator.builder(new IdentityMap())
-										 .input(source1)
-										 .name("Post Iteration Map")
-										 .setBroadcastVariable("BC", iteration)
-										 .build();
-		
-		FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink");
-		
-		Plan plan = new Plan(sink);
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> source1 = env.generateSequence(0,1);
+		DataSet<Long> source2 = env.generateSequence(0,1);
+
+		IterativeDataSet<Long> loopHead = source2.iterate(10).name("Loop");
+		DataSet<Long> loopTail = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopHead, "BC").name("In-Loop Mapper");
+		DataSet<Long> loopRes = loopHead.closeWith(loopTail);
+
+		DataSet<Long> map = source1.map(new IdentityMapper<Long>()).withBroadcastSet(loopRes, "BC").name("Post-Loop Mapper");
+		map.output(new DiscardingOutputFormat<Long>());
+
+		JavaPlan plan = env.createProgramPlan();
+
 		try {
 			compileNoStats(plan);
 		}
@@ -644,31 +558,22 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	 */
 	@Test
 	public void testClosure() {
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
-		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-
-		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
-		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceB, "Sink 2");
-
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(sourceA);
-		iteration.setMaximumNumberOfIterations(10);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> sourceA = env.generateSequence(0,1);
+		DataSet<Long> sourceB = env.generateSequence(0,1);
 
-		CrossOperator stepFunction = CrossOperator.builder(DummyCrossStub.class).name("StepFunction").
-				input1(iteration.getPartialSolution()).
-				input2(sourceB).
-				build();
+		sourceA.output(new DiscardingOutputFormat<Long>());
+		sourceB.output(new DiscardingOutputFormat<Long>());
 
-		iteration.setNextPartialSolution(stepFunction);
+		IterativeDataSet<Long> loopHead = sourceA.iterate(10).name("Loop");
 
-		FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
+		DataSet<Long> loopTail = loopHead.cross(sourceB).with(new IdentityCrosser<Long>());
+		DataSet<Long> loopRes = loopHead.closeWith(loopTail);
 
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink1);
-		sinks.add(sink2);
-		sinks.add(sink3);
+		loopRes.output(new DiscardingOutputFormat<Long>());
 
-		Plan plan = new Plan(sinks);
+		JavaPlan plan = env.createProgramPlan();
 
 		try{
 			compileNoStats(plan);
@@ -691,40 +596,24 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	 */
 	@Test
 	public void testClosureDeltaIteration() {
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
-		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-		FileDataSource sourceC = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 3");
-
-		FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
-		FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceC, "Sink 2");
-
-		DeltaIteration iteration = new DeltaIteration(0, "Loop");
-		iteration.setInitialSolutionSet(sourceA);
-		iteration.setInitialWorkset(sourceB);
-		iteration.setMaximumNumberOfIterations(10);
-
-		CrossOperator nextWorkset = CrossOperator.builder(DummyCrossStub.class).name("Next workset").
-				input1(iteration.getWorkset()).
-				input2(sourceC).
-				build();
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(0,1).map(new Duplicator<Long>());
+		DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(0,1).map(new Duplicator<Long>());
+		DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(0,1).map(new Duplicator<Long>());
 
-		JoinOperator solutionSetDelta = JoinOperator.builder(DummyMatchStub.class, LongValue.class,0,0).
-				name("Next solution set.").
-				input1(nextWorkset).
-				input2(iteration.getSolutionSet()).
-				build();
+		sourceA.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
+		sourceC.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 
-		iteration.setNextWorkset(nextWorkset);
-		iteration.setSolutionSetDelta(solutionSetDelta);
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> loop = sourceA.iterateDelta(sourceB, 10, 0);
 
-		FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
+		DataSet<Tuple2<Long, Long>> workset = loop.getWorkset().cross(sourceB).with(new IdentityCrosser<Tuple2<Long, Long>>()).name("Next work set");
+		DataSet<Tuple2<Long, Long>> delta = workset.join(loop.getSolutionSet()).where(0).equalTo(0).with(new IdentityJoiner<Tuple2<Long, Long>>()).name("Solution set delta");
 
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink1);
-		sinks.add(sink2);
-		sinks.add(sink3);
+		DataSet<Tuple2<Long, Long>> result = loop.closeWith(delta, workset);
+		result.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 
-		Plan plan = new Plan(sinks);
+		JavaPlan plan = env.createProgramPlan();
 
 		try{
 			compileNoStats(plan);
@@ -752,44 +641,26 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	 */
 	@Test
 	public void testDeltaIterationWithStaticInput() {
-		FileDataSource source = new FileDataSource(DummyInputFormat.class, IN_FILE, "source");
-
-		MapOperator mappedSource = MapOperator.builder(IdentityMap.class).
-				input(source).
-				name("Identity mapped source").
-				build();
-
-		ReduceOperator reducedSource = ReduceOperator.builder(IdentityReduce.class).
-				input(source).
-				name("Identity reduce source").
-				build();
-
-		DeltaIteration iteration = new DeltaIteration(0,"Loop");
-		iteration.setMaximumNumberOfIterations(10);
-		iteration.setInitialSolutionSet(source);
-		iteration.setInitialWorkset(mappedSource);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Tuple2<Long, Long>> source = env.generateSequence(0,1).map(new Duplicator<Long>());
 
-		JoinOperator nextWorkset = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,0).
-				input1(iteration.getWorkset()).
-				input2(reducedSource).
-				name("Next work set").
-				build();
+		DataSet<Tuple2<Long,Long>> map = source
+				.map(new IdentityMapper<Tuple2<Long, Long>>());
+		DataSet<Tuple2<Long,Long>> reduce = source
+				.reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>());
 
-		JoinOperator solutionSetDelta = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,
-				0).
-				input1(iteration.getSolutionSet()).
-				input2(nextWorkset).
-				name("Solution set delta").
-				build();
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> loop = source.iterateDelta(map, 10, 0);
 
-		iteration.setNextWorkset(nextWorkset);
-		iteration.setSolutionSetDelta(solutionSetDelta);
+		DataSet<Tuple2<Long, Long>> workset = loop.getWorkset().join(reduce).where(0).equalTo(0)
+				.with(new IdentityJoiner<Tuple2<Long, Long>>()).name("Next work set");
+		DataSet<Tuple2<Long, Long>> delta = loop.getSolutionSet().join(workset).where(0).equalTo(0)
+				.with(new IdentityJoiner<Tuple2<Long, Long>>()).name("Solution set delta");
 
-		FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Iteration sink");
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sink);
+		DataSet<Tuple2<Long, Long>> result = loop.closeWith(delta, workset);
+		result.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 
-		Plan plan = new Plan(sinks);
+		JavaPlan plan = env.createProgramPlan();
 
 		try{
 			compileNoStats(plan);
@@ -871,7 +742,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 				.withBroadcastSet(input3, "bc1")
 				.withBroadcastSet(input1, "bc2")
 				.withBroadcastSet(result1, "bc3")
-			.print();
+				.output(new DiscardingOutputFormat<String>());
 		
 		Plan plan = env.createProgramPlan();
 		
@@ -900,7 +771,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		IterativeDataSet<String> iteration = initialSolution.iterate(100);
 		
 		iteration.closeWith(iteration.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "red"))
-				.print();
+				.output(new DiscardingOutputFormat<String>());
 		
 		Plan plan = env.createProgramPlan();
 		
@@ -927,9 +798,12 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		IterativeDataSet<String> iteration2 = input.iterate(20);
 		IterativeDataSet<String> iteration3 = input.iterate(17);
 		
-		iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1")).print();
-		iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2")).print();
-		iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3")).print();
+		iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1"))
+				.output(new DiscardingOutputFormat<String>());
+		iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2"))
+				.output(new DiscardingOutputFormat<String>());
+		iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3"))
+				.output(new DiscardingOutputFormat<String>());
 		
 		Plan plan = env.createProgramPlan();
 		
@@ -953,9 +827,12 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		IterativeDataSet<String> iteration3 = input.iterate(17);
 		
 		
-		iteration1.closeWith(iteration1.map(new IdentityMapper<String>())).print();
-		iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>())).print();
-		iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>())).print();
+		iteration1.closeWith(iteration1.map(new IdentityMapper<String>()))
+				.output(new DiscardingOutputFormat<String>());
+		iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()))
+				.output(new DiscardingOutputFormat<String>());
+		iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()))
+				.output(new DiscardingOutputFormat<String>());
 		
 		Plan plan = env.createProgramPlan();
 		
@@ -979,7 +856,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 			input
 				.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
 				.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
-				.print();
+				.output(new DiscardingOutputFormat<Long>());
 			
 			Plan plan = env.createProgramPlan();
 			compileNoStats(plan);
@@ -1019,7 +896,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 				.map(new IdentityMapper<Tuple2<Long,Long>>())
 					.withBroadcastSet(bc_input1, "bc1")
 				.union(joinResult)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan plan = env.createProgramPlan();
 			compileNoStats(plan);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
index 57c53ff..b0ecfe5 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.optimizer;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
@@ -43,7 +44,8 @@ public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase {
 			DataSet<String> source1 = env.fromElements("test");
 			DataSet<String> source2 = env.fromElements("test");
 			
-			source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name").print();
+			source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name")
+					.output(new DiscardingOutputFormat<String>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -66,7 +68,8 @@ public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase {
 				
 				DataSet<String> source1 = env.fromElements("test");
 				
-				source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name").print();
+				source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name")
+						.output(new DiscardingOutputFormat<String>());
 				
 				Plan p = env.createProgramPlan();
 				OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
index 1a4cd18..e795508 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.optimizer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
@@ -215,7 +216,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 		Configuration joinStrategy = new Configuration();
 		joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
 		
-		if(strategy != "") {
+		if(!strategy.equals("")) {
 			joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
 		}
 		
@@ -223,7 +224,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 
 		DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
 		
-		output.print();
+		output.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 		
 		return env.createProgramPlan();
 		
@@ -250,7 +251,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 
 		DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
 		
-		output.print();
+		output.output(new DiscardingOutputFormat<Tuple3<Long,Long,Long>>());
 		
 		return env.createProgramPlan();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
index 61d407a..f066b36 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer;
 
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Assert;
 import org.junit.Test;
@@ -64,7 +65,7 @@ public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
 		DataSet<Tuple1<Integer>> feedback = iteration.getWorkset().map(new SimpleMap());
 		DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, feedback);
 
-		result.print();
+		result.output(new DiscardingOutputFormat<Tuple1<Integer>>());
 
 		Plan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
index bb3aa47..68953c0 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -36,8 +37,10 @@ public class DisjointDataFlowsTest extends CompilerTestBase {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			
 			// generate two different flows
-			env.generateSequence(1, 10).print();
-			env.generateSequence(1, 10).print();
+			env.generateSequence(1, 10)
+					.output(new DiscardingOutputFormat<Long>());
+			env.generateSequence(1, 10)
+					.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 7865861..5827d9c 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -50,7 +51,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 
 			data
 					.distinct().name("reducer")
-					.print().name("sink");
+					.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -85,7 +86,6 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}
@@ -104,7 +104,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 					.distinct(new KeySelector<Tuple2<String,Double>, String>() {
 						public String getKey(Tuple2<String, Double> value) { return value.f0; }
 					}).name("reducer")
-					.print().name("sink");
+					.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -146,7 +146,6 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}
@@ -164,7 +163,7 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			DistinctOperator<Tuple2<String, Double>> reduced = data
 					.distinct(1).name("reducer");
 
-			reduced.print().name("sink");
+			reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
 
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -199,7 +198,6 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(8, sinkNode.getParallelism());
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
index 76b3b0e..7328423 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
@@ -20,30 +20,24 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.fail;
 
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.util.DummyCoGroupStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityCoGrouper;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -51,25 +45,24 @@ import org.junit.Test;
  * This test case has been created to validate that correct strategies are used if orders within groups are
  * requested.
  */
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class GroupOrderTest extends CompilerTestBase {
 
 	@Test
 	public void testReduceWithGroupOrder() {
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		
-		ReduceOperator reduce = ReduceOperator.builder(new IdentityReduce()).keyField(IntValue.class, 2).name("Reduce").input(source).build();
-		Ordering groupOrder = new Ordering(5, StringValue.class, Order.DESCENDING);
-		reduce.setGroupOrder(groupOrder);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, reduce, "Sink");
-		
-		
-		Plan plan = new Plan(sink, "Test Temp Task");
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Tuple4<Long, Long, Long, Long>> set1 = env.readCsvFile("/tmp/fake.csv")
+				.types(Long.class, Long.class, Long.class, Long.class);
+
+		set1.groupBy(1).sortGroup(3, Order.DESCENDING)
+				.reduceGroup(new IdentityGroupReducer<Tuple4<Long, Long, Long, Long>>()).name("Reduce")
+				.output(new DiscardingOutputFormat<Tuple4<Long, Long, Long, Long>>()).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan;
+
 		try {
 			oPlan = compileNoStats(plan);
 		} catch(CompilerException ce) {
@@ -89,38 +82,35 @@ public class GroupOrderTest extends CompilerTestBase {
 		Channel c = reducer.getInput();
 		Assert.assertEquals(LocalStrategy.SORT, c.getLocalStrategy());
 		
-		FieldList ship = new FieldList(2);
-		FieldList local = new FieldList(2, 5);
+		FieldList ship = new FieldList(1);
+		FieldList local = new FieldList(1, 3);
 		Assert.assertEquals(ship, c.getShipStrategyKeys());
 		Assert.assertEquals(local, c.getLocalStrategyKeys());
 		Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders(0)[0]);
 		
 		// check that we indeed sort descending
-		Assert.assertTrue(c.getLocalStrategySortOrder()[1] == groupOrder.getFieldSortDirections()[0]);
+		Assert.assertEquals(false, c.getLocalStrategySortOrder()[1]);
 	}
 	
 	@Test
 	public void testCoGroupWithGroupOrder() {
 		// construct the plan
-		FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source1");
-		FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source2");
-		
-		CoGroupOperator coGroup = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 3, 6)
-				.keyField(LongValue.class, 0, 0)
-				.name("CoGroup").input1(source1).input2(source2).build();
-		
-		Ordering groupOrder1 = new Ordering(5, StringValue.class, Order.DESCENDING);
-		Ordering groupOrder2 = new Ordering(1, StringValue.class, Order.DESCENDING);
-		groupOrder2.appendOrdering(4, DoubleValue.class, Order.ASCENDING);
-		coGroup.setGroupOrderForInputOne(groupOrder1);
-		coGroup.setGroupOrderForInputTwo(groupOrder2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, coGroup, "Sink");
-		
-		Plan plan = new Plan(sink, "Reduce Group Order Test");
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Tuple7<Long, Long, Long, Long, Long, Long, Long>> set1 = env.readCsvFile("/tmp/fake1.csv")
+				.types(Long.class, Long.class, Long.class, Long.class, Long.class, Long.class, Long.class);
+		DataSet<Tuple7<Long, Long, Long, Long, Long, Long, Long>> set2 = env.readCsvFile("/tmp/fake2.csv")
+				.types(Long.class, Long.class, Long.class, Long.class, Long.class, Long.class, Long.class);
+
+		set1.coGroup(set2).where(3,0).equalTo(6,0)
+				.sortFirstGroup(5, Order.DESCENDING)
+				.sortSecondGroup(1, Order.DESCENDING).sortSecondGroup(4, Order.ASCENDING)
+				.with(new IdentityCoGrouper<Tuple7<Long, Long, Long, Long, Long, Long, Long>>()).name("CoGroup")
+				.output(new DiscardingOutputFormat<Tuple7<Long, Long, Long, Long, Long, Long, Long>>()).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan;
+
 		try {
 			oPlan = compileNoStats(plan);
 		} catch(CompilerException ce) {
@@ -144,11 +134,11 @@ public class GroupOrderTest extends CompilerTestBase {
 		Assert.assertEquals(LocalStrategy.SORT, c1.getLocalStrategy());
 		Assert.assertEquals(LocalStrategy.SORT, c2.getLocalStrategy());
 		
-		FieldList ship1 = new FieldList(new int[] {3, 0});
-		FieldList ship2 = new FieldList(new int[] {6, 0});
+		FieldList ship1 = new FieldList(3, 0);
+		FieldList ship2 = new FieldList(6, 0);
 		
-		FieldList local1 = new FieldList(new int[] {3, 0, 5});
-		FieldList local2 = new FieldList(new int[] {6, 0, 1, 4});
+		FieldList local1 = new FieldList(3, 0, 5);
+		FieldList local2 = new FieldList(6, 0, 1, 4);
 		
 		Assert.assertEquals(ship1, c1.getShipStrategyKeys());
 		Assert.assertEquals(ship2, c2.getShipStrategyKeys());
@@ -161,8 +151,8 @@ public class GroupOrderTest extends CompilerTestBase {
 		Assert.assertTrue(c2.getLocalStrategySortOrder()[1] == coGroupNode.getSortOrders()[1]);
 		
 		// check that the local group orderings are correct
-		Assert.assertTrue(c1.getLocalStrategySortOrder()[2] == groupOrder1.getFieldSortDirections()[0]);
-		Assert.assertTrue(c2.getLocalStrategySortOrder()[2] == groupOrder2.getFieldSortDirections()[0]);
-		Assert.assertTrue(c2.getLocalStrategySortOrder()[3] == groupOrder2.getFieldSortDirections()[1]);
+		Assert.assertEquals(false, c1.getLocalStrategySortOrder()[2]);
+		Assert.assertEquals(false, c2.getLocalStrategySortOrder()[2]);
+		Assert.assertEquals(true, c2.getLocalStrategySortOrder()[3]);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
index 52e9a2d..adca504 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
@@ -18,21 +18,16 @@
 
 package org.apache.flink.optimizer;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityCrosser;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.IntValue;
 import org.junit.Test;
 
 /**
@@ -41,7 +36,7 @@ import org.junit.Test;
  *   <li> Ticket 158
  * </ul>
  */
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class HardPlansCompilationTest extends CompilerTestBase {
 	
 	/**
@@ -54,27 +49,21 @@ public class HardPlansCompilationTest extends CompilerTestBase {
 	@Test
 	public void testTicket158() {
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		
-		MapOperator map = MapOperator.builder(new IdentityMap()).name("Map1").input(source).build();
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce1").input(map).build();
-		
-		CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub()).name("Cross1").input1(reduce1).input2(source).build();
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce2").input(cross1).build();
-		
-		CrossOperator cross2 = CrossOperator.builder(new DummyCrossStub()).name("Cross2").input1(reduce2).input2(source).build();
-		
-		ReduceOperator reduce3 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce3").input(cross2).build();
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setInput(reduce3);
-		
-		Plan plan = new Plan(sink, "Test Temp Task");
-		plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		DataSet<Long> set1 = env.generateSequence(0,1);
+
+		set1.map(new IdentityMapper<Long>()).name("Map1")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce1")
+				.cross(set1).with(new IdentityCrosser<Long>()).withForwardedFieldsFirst("*").name("Cross1")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce2")
+				.cross(set1).with(new IdentityCrosser<Long>()).name("Cross2")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>()).name("Reduce3")
+				.output(new DiscardingOutputFormat<Long>()).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		OptimizedPlan oPlan = compileNoStats(plan);
+
 		JobGraphGenerator jobGen = new JobGraphGenerator();
 		jobGen.compileJobGraph(oPlan);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
index 0afbe93..269be6e 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
@@ -72,7 +73,8 @@ public class IterationsCompilerTest extends CompilerTestBase {
 					.map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
 					.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1);
 			
-			iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result).print();
+			iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result)
+					.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			OptimizedPlan p = compileNoStats(env.createProgramPlan());
 			
@@ -104,7 +106,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(mappedBulk, edges);
 			
-			depResult.print();
+			depResult.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -120,7 +122,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -140,7 +141,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> depResult = doDeltaIteration(bulkResult, edges);
 			
-			depResult.print();
+			depResult.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -156,7 +157,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -176,7 +176,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> secondResult = doDeltaIteration(firstResult, edges);
 			
-			secondResult.print();
+			secondResult.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -192,7 +192,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -208,7 +207,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> input2 = env.readCsvFile("/some/file/path").types(Long.class, Long.class);
 			
-			doBulkIteration(input1, input2).print();
+			doBulkIteration(input1, input2).output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -226,7 +225,6 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			new JobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
@@ -253,7 +251,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			initialWorkset
 				.join(result, JoinHint.REPARTITION_HASH_FIRST)
 				.where(0).equalTo(0)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
 			
 			Plan p = env.createProgramPlan();
 			compileNoStats(p);
@@ -295,7 +293,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			DataSet<Long> result = iteration.closeWith(width.union(update).union(lastGradient));
 			
-			result.print();
+			result.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -348,7 +346,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 				.flatMap(new FlatMapJoin());
 		
 		DataSet<Tuple2<Long, Long>> depResult = depIteration.closeWith(updatedComponentId, updatedComponentId);
-		
+
 		return depResult;
 		
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
index 34fc085..3a51451 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -52,7 +53,7 @@ public class NestedIterationsTest extends CompilerTestBase {
 			
 			DataSet<Long> outerResult = outerIteration.closeWith(innerResult.map(new IdentityMapper<Long>()));
 			
-			outerResult.print();
+			outerResult.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			
@@ -88,7 +89,7 @@ public class NestedIterationsTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> outerResult = outerIteration.closeWith(innerResult, innerResult);
 			
-			outerResult.print();
+			outerResult.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			
@@ -126,7 +127,7 @@ public class NestedIterationsTest extends CompilerTestBase {
 			
 			DataSet<Long> mainResult = mainIteration.closeWith(joined);
 			
-			mainResult.print();
+			mainResult.output(new DiscardingOutputFormat<Long>());
 			
 			Plan p = env.createProgramPlan();
 			
@@ -164,7 +165,7 @@ public class NestedIterationsTest extends CompilerTestBase {
 			
 			DataSet<Tuple2<Long, Long>> mainResult = mainIteration.closeWith(joined, joined);
 			
-			mainResult.print();
+			mainResult.output(new DiscardingOutputFormat<Tuple2<Long,Long>>());
 			
 			Plan p = env.createProgramPlan();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
index 8236f10..9ddff33 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.flink.optimizer;
 
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -24,22 +28,13 @@ import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Assert;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.IntValue;
 import org.apache.flink.util.Visitor;
 import org.junit.Test;
 
@@ -50,7 +45,7 @@ import org.junit.Test;
  *       parallelism between tasks is increased or decreased.
  * </ul>
  */
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class ParallelismChangeTest extends CompilerTestBase {
 	
 	/**
@@ -62,34 +57,24 @@ public class ParallelismChangeTest extends CompilerTestBase {
 	 */
 	@Test
 	public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
-		final int degOfPar = DEFAULT_PARALLELISM;
-		
+		final int p = DEFAULT_PARALLELISM;
+
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setParallelism(degOfPar);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setParallelism(degOfPar);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setParallelism(degOfPar);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setParallelism(degOfPar * 2);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setParallelism(degOfPar * 2);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setParallelism(degOfPar * 2);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing parallelism");
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(p);
+		DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);
+
+		set1.map(new IdentityMapper<Long>())
+					.withForwardedFields("*").setParallelism(p).name("Map1")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+					.withForwardedFields("*").setParallelism(p).name("Reduce1")
+				.map(new IdentityMapper<Long>())
+					.withForwardedFields("*").setParallelism(p * 2).name("Map2")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+					.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
+				.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		// submit the plan to the compiler
 		OptimizedPlan oPlan = compileNoStats(plan);
 		
@@ -116,33 +101,24 @@ public class ParallelismChangeTest extends CompilerTestBase {
 	 */
 	@Test
 	public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
-		final int degOfPar = DEFAULT_PARALLELISM;
-		
+		final int p = DEFAULT_PARALLELISM;
+
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setParallelism(degOfPar);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setParallelism(degOfPar);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setParallelism(degOfPar);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setParallelism(degOfPar);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setParallelism(degOfPar * 2);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setParallelism(degOfPar * 2);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing parallelism");
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(p);
+		DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);
+
+		set1.map(new IdentityMapper<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Map1")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Reduce1")
+				.map(new IdentityMapper<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Map2")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+				.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
+				.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		
 		// submit the plan to the compiler
 		OptimizedPlan oPlan = compileNoStats(plan);
@@ -170,34 +146,24 @@ public class ParallelismChangeTest extends CompilerTestBase {
 	 */
 	@Test
 	public void checkPropertyHandlingWithIncreasingLocalParallelism() {
-		final int degOfPar = 2 * DEFAULT_PARALLELISM;
-		
+		final int p = DEFAULT_PARALLELISM * 2;
+
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setParallelism(degOfPar);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setParallelism(degOfPar);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setParallelism(degOfPar);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setParallelism(degOfPar * 2);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setParallelism(degOfPar * 2);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setParallelism(degOfPar * 2);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing parallelism");
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(p);
+		DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(p);
+
+		set1.map(new IdentityMapper<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Map1")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Reduce1")
+				.map(new IdentityMapper<Long>())
+				.withForwardedFields("*").setParallelism(p * 2).name("Map2")
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+				.withForwardedFields("*").setParallelism(p * 2).name("Reduce2")
+				.output(new DiscardingOutputFormat<Long>()).setParallelism(p * 2).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		// submit the plan to the compiler
 		OptimizedPlan oPlan = compileNoStats(plan);
 		
@@ -217,38 +183,27 @@ public class ParallelismChangeTest extends CompilerTestBase {
 				(ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
 	}
 	
-	
-	
 	@Test
 	public void checkPropertyHandlingWithDecreasingParallelism() {
-		final int degOfPar = DEFAULT_PARALLELISM;
-		
+		final int p = DEFAULT_PARALLELISM;
+
 		// construct the plan
-		FileDataSource source = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source");
-		source.setParallelism(degOfPar * 2);
-		
-		MapOperator map1 = MapOperator.builder(new IdentityMap()).name("Map1").build();
-		map1.setParallelism(degOfPar * 2);
-		map1.setInput(source);
-		
-		ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-		reduce1.setParallelism(degOfPar * 2);
-		reduce1.setInput(map1);
-		
-		MapOperator map2 = MapOperator.builder(new IdentityMap()).name("Map2").build();
-		map2.setParallelism(degOfPar);
-		map2.setInput(reduce1);
-		
-		ReduceOperator reduce2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-		reduce2.setParallelism(degOfPar);
-		reduce2.setInput(map2);
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, "Sink");
-		sink.setParallelism(degOfPar);
-		sink.setInput(reduce2);
-		
-		Plan plan = new Plan(sink, "Test Increasing parallelism");
-		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(p);
+
+		env
+			.generateSequence(0, 1).setParallelism(p * 2)
+			.map(new IdentityMapper<Long>())
+				.withForwardedFields("*").setParallelism(p * 2).name("Map1")
+			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+				.withForwardedFields("*").setParallelism(p * 2).name("Reduce1")
+			.map(new IdentityMapper<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Map2")
+			.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+				.withForwardedFields("*").setParallelism(p).name("Reduce2")
+			.output(new DiscardingOutputFormat<Long>()).setParallelism(p).name("Sink");
+
+		JavaPlan plan = env.createProgramPlan();
 		// submit the plan to the compiler
 		OptimizedPlan oPlan = compileNoStats(plan);
 
@@ -284,40 +239,29 @@ public class ParallelismChangeTest extends CompilerTestBase {
 	 */
 	@Test
 	public void checkPropertyHandlingWithTwoInputs() {
+
 		// construct the plan
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
 
-		FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), IN_FILE);
-		FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), IN_FILE);
-		
-		ReduceOperator redA = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-			.input(sourceA)
-			.build();
-		ReduceOperator redB = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
-			.input(sourceB)
-			.build();
-		
-		JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
-			.input1(redA)
-			.input2(redB)
-			.build();
-		
-		FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat);
-		
-		sourceA.setParallelism(5);
-		sourceB.setParallelism(7);
-		redA.setParallelism(5);
-		redB.setParallelism(7);
-		
-		mat.setParallelism(5);
-		
-		sink.setParallelism(5);
-		
-		
-		// return the PACT plan
-		Plan plan = new Plan(sink, "Partition on DoP Change");
-		
+		DataSet<Long> set1 = env.generateSequence(0,1).setParallelism(5);
+		DataSet<Long> set2 = env.generateSequence(0,1).setParallelism(7);
+
+		DataSet<Long> reduce1 = set1
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+					.withForwardedFields("*").setParallelism(5);
+		DataSet<Long> reduce2 = set2
+				.groupBy("*").reduceGroup(new IdentityGroupReducer<Long>())
+					.withForwardedFields("*").setParallelism(7);
+
+		reduce1.join(reduce2).where("*").equalTo("*")
+					.with(new IdentityJoiner<Long>()).setParallelism(5)
+				.output(new DiscardingOutputFormat<Long>()).setParallelism(5);
+
+		JavaPlan plan = env.createProgramPlan();
+		// submit the plan to the compiler
 		OptimizedPlan oPlan = compileNoStats(plan);
-		
+
 		JobGraphGenerator jobGen = new JobGraphGenerator();
 		
 		//Compile plan to verify that no error is thrown

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
index 72effc1..365726d 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -46,7 +47,7 @@ public class PartitionPushdownTest extends CompilerTestBase {
 			input
 				.groupBy(0, 1).sum(2)
 				.groupBy(0).sum(1)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
@@ -82,7 +83,7 @@ public class PartitionPushdownTest extends CompilerTestBase {
 			input
 				.groupBy(0).sum(1)
 				.groupBy(0, 1).sum(2)
-				.print();
+				.output(new DiscardingOutputFormat<Tuple3<Long, Long, Long>>());
 			
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);


[2/4] flink git commit: [FLINK-1682] Ported optimizer unit tests from Record API to Java API

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
index 920b713..fe0a533 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
@@ -29,14 +29,13 @@ import java.util.Set;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.util.Visitor;
 
 /**
  * Utility to get operator instances from plans via name.
  */
-@SuppressWarnings("deprecation")
 public class OperatorResolver implements Visitor<Operator<?>> {
 	
 	private final Map<String, List<Operator<?>>> map;
@@ -109,11 +108,11 @@ public class OperatorResolver implements Visitor<Operator<?>> {
 			list.add(visitable);
 			
 			// recurse into bulk iterations
-			if (visitable instanceof BulkIteration) {
-				((BulkIteration) visitable).getNextPartialSolution().accept(this);
-			} else if (visitable instanceof DeltaIteration) {
-				((DeltaIteration) visitable).getSolutionSetDelta().accept(this);
-				((DeltaIteration) visitable).getNextWorkset().accept(this);
+			if (visitable instanceof BulkIterationBase) {
+				((BulkIterationBase) visitable).getNextPartialSolution().accept(this);
+			} else if (visitable instanceof DeltaIterationBase) {
+				((DeltaIterationBase) visitable).getSolutionSetDelta().accept(this);
+				((DeltaIterationBase) visitable).getNextWorkset().accept(this);
 			}
 			
 			return true;