You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/11/24 18:17:58 UTC

[15/16] flink git commit: [FLINK-2901] Remove Record API dependencies from flink-tests #1

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
deleted file mode 100644
index 3ce021b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationTerminationWithTwoTails extends RecordAPITestBase {
-
-	private static final String INPUT = "1\n" + "2\n" + "3\n" + "4\n" + "5\n";
-	private static final String EXPECTED = "22\n";
-
-	protected String dataPath;
-	protected String resultPath;
-
-	public IterationTerminationWithTwoTails(){
-		setTaskManagerNumSlots(parallelism);
-	}
-
-	@Override
-	protected void preSubmit() throws Exception {
-		dataPath = createTempFile("datapoints.txt", INPUT);
-		resultPath = getTempFilePath("result");
-	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED, resultPath);
-	}
-
-	@Override
-	protected Plan getTestJob() {
-		return getTestPlanPlan(parallelism, dataPath, resultPath);
-	}
-	
-	private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
-
-		FileDataSource initialInput = new FileDataSource(TextInputFormat.class, input, "input");
-		
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(initialInput);
-		iteration.setMaximumNumberOfIterations(5);
-		Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
-
-		ReduceOperator sumReduce = ReduceOperator.builder(new SumReducer())
-				.input(iteration.getPartialSolution())
-				.name("Compute sum (Reduce)")
-				.build();
-		
-		iteration.setNextPartialSolution(sumReduce);
-		
-		MapOperator terminationMapper = MapOperator.builder(new TerminationMapper())
-				.input(iteration.getPartialSolution())
-				.name("Compute termination criterion (Map)")
-				.build();
-		
-		iteration.setTerminationCriterion(terminationMapper);
-
-		FileDataSink finalResult = new FileDataSink(CsvOutputFormat.class, output, iteration, "Output");
-		CsvOutputFormat.configureRecordFormat(finalResult)
-			.recordDelimiter('\n')
-			.fieldDelimiter(' ')
-			.field(StringValue.class, 0);
-
-		Plan plan = new Plan(finalResult, "Iteration with AllReducer (keyless Reducer)");
-		plan.setDefaultParallelism(4);
-		Assert.assertTrue(plan.getDefaultParallelism() > 1);
-		return plan;
-	}
-	
-	static final class SumReducer extends ReduceFunction implements Serializable {
-		
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void reduce(Iterator<Record> it, Collector<Record> out) {
-			// Compute the sum
-			int sum = 0;
-			
-			while (it.hasNext()) {
-				sum += Integer.parseInt(it.next().getField(0, StringValue.class).getValue()) + 1;
-			}
-			
-			out.collect(new Record(new StringValue(Integer.toString(sum))));
-		}
-	}
-	
-	public static class TerminationMapper extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void map(Record record, Collector<Record> collector) {
-			
-			int currentSum = Integer.parseInt(record.getField(0, StringValue.class).getValue());
-			
-			if(currentSum < 21)
-				collector.collect(record);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
index cb16c15..ab66f31 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
@@ -18,91 +18,34 @@
 
 package org.apache.flink.test.iterative;
 
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationWithAllReducerITCase extends RecordAPITestBase {
-
-	private static final String INPUT = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+import java.util.List;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class IterationWithAllReducerITCase extends JavaProgramTestBase {
 	private static final String EXPECTED = "1\n";
 
-	protected String dataPath;
-	protected String resultPath;
-
-	public IterationWithAllReducerITCase(){
-		setTaskManagerNumSlots(4);
-	}
-
 	@Override
-	protected void preSubmit() throws Exception {
-		dataPath = createTempFile("datapoints.txt", INPUT);
-		resultPath = getTempFilePath("result");
-	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(EXPECTED, resultPath);
-	}
-
-	@Override
-	protected Plan getTestJob() {
-		Plan plan = getTestPlanPlan(parallelism, dataPath, resultPath);
-		return plan;
-	}
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
 
-	
-	private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
+		DataSet<String> initialInput = env.fromElements("1", "1", "1", "1", "1", "1", "1", "1");
 
-		FileDataSource initialInput = new FileDataSource(TextInputFormat.class, input, "input");
-		
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(initialInput);
-		iteration.setMaximumNumberOfIterations(5);
-		
-		Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
+		IterativeDataSet<String> iteration = initialInput.iterate(5).name("Loop");
 
-		ReduceOperator sumReduce = ReduceOperator.builder(new PickOneReducer())
-				.input(iteration.getPartialSolution())
-				.name("Compute sum (Reduce)")
-				.build();
-		
-		iteration.setNextPartialSolution(sumReduce);
+		DataSet<String> sumReduce = iteration.reduce(new ReduceFunction<String>(){
+			@Override
+			public String reduce(String value1, String value2) throws Exception {
+				return value1;
+			}
+		}).name("Compute sum (Reduce)");
 
-		FileDataSink finalResult = new FileDataSink(CsvOutputFormat.class, output, iteration, "Output");
-		CsvOutputFormat.configureRecordFormat(finalResult)
-			.recordDelimiter('\n')
-			.fieldDelimiter(' ')
-			.field(StringValue.class, 0);
+		List<String> result = iteration.closeWith(sumReduce).collect();
 
-		Plan plan = new Plan(finalResult, "Iteration with AllReducer (keyless Reducer)");
-		
-		plan.setDefaultParallelism(numSubTasks);
-		Assert.assertTrue(plan.getDefaultParallelism() > 1);
-		
-		return plan;
-	}
-	
-	public static final class PickOneReducer extends ReduceFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void reduce(Iterator<Record> it, Collector<Record> out) {
-			out.collect(it.next());
-		}
+		compareResultAsText(result, EXPECTED);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
index c11c9ea..c283df1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
@@ -18,43 +18,25 @@
 
 package org.apache.flink.test.iterative;
 
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.PointFormatter;
+import org.apache.flink.test.util.PointInFormat;
+import org.apache.flink.test.util.CoordVector;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class IterationWithChainingITCase extends RecordAPITestBase {
+public class IterationWithChainingITCase extends JavaProgramTestBase {
 
 	private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
 
 	private String dataPath;
 	private String resultPath;
 
-	public IterationWithChainingITCase(Configuration config) {
-		super(config);
-		setTaskManagerNumSlots(parallelism);
-	}
-
 	@Override
 	protected void preSubmit() throws Exception {
 		dataPath = createTempFile("data_points.txt", DATA_POINTS);
@@ -62,63 +44,35 @@ public class IterationWithChainingITCase extends RecordAPITestBase {
 	}
 
 	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(DATA_POINTS, resultPath);
-	}
-
-	@Override
-	protected Plan getTestJob() {
-		return getTestPlan(config.getInteger("ChainedMapperITCase#NoSubtasks", 1), dataPath, resultPath);
-	}
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(4);
 
-	@Parameters
-	public static Collection<Object[]> getConfigurations() {
-		Configuration config1 = new Configuration();
-		config1.setInteger("ChainedMapperITCase#NoSubtasks", parallelism);
-		return toParameterList(config1);
-	}
+		DataSet<Tuple2<Integer, CoordVector>> initialInput
+				= env.readFile(new PointInFormat(), dataPath).setParallelism(1).name("Input");
 
-	public static final class IdentityMapper extends MapFunction implements Serializable {
+		IterativeDataSet<Tuple2<Integer, CoordVector>> iteration = initialInput.iterate(2).name("Loop");
 
-		private static final long serialVersionUID = 1L;
+		DataSet<Tuple2<Integer, CoordVector>> identity
+				= iteration.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>() {
+					@Override
+					public void reduce(Iterable<Tuple2<Integer, CoordVector>> values, Collector<Tuple2<Integer, CoordVector>> out) throws Exception {
+						for (Tuple2<Integer, CoordVector> value : values) {
+							out.collect(value);
+						}
+					}
+				}).map(new MapFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>() {
+					@Override
+					public Tuple2<Integer, CoordVector> map(Tuple2<Integer, CoordVector> value) throws Exception {
+						return value;
+					}
 
-		@Override
-		public void map(Record rec, Collector<Record> out) {
-			out.collect(rec);
-		}
-	}
+				});
 
-	public static final class DummyReducer extends ReduceFunction implements Serializable {
+		iteration.closeWith(identity).writeAsFormattedText(resultPath, new PointFormatter());
 
-		private static final long serialVersionUID = 1L;
+		env.execute("Iteration with chained map test");
 
-		@Override
-		public void reduce(Iterator<Record> it, Collector<Record> out) {
-			while (it.hasNext()) {
-				out.collect(it.next());
-			}
-		}
-	}
-
-	static Plan getTestPlan(int numSubTasks, String input, String output) {
-
-		FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input");
-		initialInput.setParallelism(1);
-
-		BulkIteration iteration = new BulkIteration("Loop");
-		iteration.setInput(initialInput);
-		iteration.setMaximumNumberOfIterations(2);
-
-		ReduceOperator dummyReduce = ReduceOperator.builder(new DummyReducer(), IntValue.class, 0).input(iteration.getPartialSolution())
-				.name("Reduce something").build();
-
-		MapOperator dummyMap = MapOperator.builder(new IdentityMapper()).input(dummyReduce).build();
-		iteration.setNextPartialSolution(dummyMap);
-
-		FileDataSink finalResult = new FileDataSink(new PointOutFormat(), output, iteration, "Output");
-
-		Plan plan = new Plan(finalResult, "Iteration with chained map test");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
+		compareResultsByLinesInMemory(DATA_POINTS, resultPath);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
index 2a4a4b7..8756429 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
@@ -25,10 +25,11 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.CoordVector;
+import org.apache.flink.test.util.PointFormatter;
+import org.apache.flink.test.util.PointInFormat;
 import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
 public class IterationWithUnionITCase extends JavaProgramTestBase {
@@ -54,32 +55,32 @@ public class IterationWithUnionITCase extends JavaProgramTestBase {
 	protected void testProgram() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		
-		DataSet<Record> initialInput = env.readFile(new PointInFormat(), this.dataPath).setParallelism(1);
+		DataSet<Tuple2<Integer, CoordVector>> initialInput = env.readFile(new PointInFormat(), this.dataPath).setParallelism(1);
 		
-		IterativeDataSet<Record> iteration = initialInput.iterate(2);
+		IterativeDataSet<Tuple2<Integer, CoordVector>> iteration = initialInput.iterate(2);
 		
-		DataSet<Record> result = iteration.union(iteration).map(new IdentityMapper());
+		DataSet<Tuple2<Integer, CoordVector>> result = iteration.union(iteration).map(new IdentityMapper());
 		
-		iteration.closeWith(result).write(new PointOutFormat(), this.resultPath);
+		iteration.closeWith(result).writeAsFormattedText(this.resultPath, new PointFormatter());
 		
 		env.execute();
 	}
 	
-	static final class IdentityMapper implements MapFunction<Record, Record>, Serializable {
+	static final class IdentityMapper implements MapFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>, Serializable {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Record map(Record rec) {
+		public Tuple2<Integer, CoordVector> map(Tuple2<Integer, CoordVector> rec) {
 			return rec;
 		}
 	}
 
-	static class DummyReducer implements GroupReduceFunction<Record, Record>, Serializable {
+	static class DummyReducer implements GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>, Serializable {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void reduce(Iterable<Record> it, Collector<Record> out) {
-			for (Record r : it) {
+		public void reduce(Iterable<Tuple2<Integer, CoordVector>> it, Collector<Tuple2<Integer, CoordVector>> out) {
+			for (Tuple2<Integer, CoordVector> r : it) {
 				out.collect(r);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
deleted file mode 100644
index ac3659a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-
-public class IterativeKMeansITCase extends RecordAPITestBase {
-
-	protected String dataPath;
-	protected String clusterPath;
-	protected String resultPath;
-
-	public IterativeKMeansITCase(){
-		setTaskManagerNumSlots(parallelism);
-	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS);
-		clusterPath = createTempFile("initial_centers.txt", KMeansData.INITIAL_CENTERS);
-		resultPath = getTempDirPath("result");
-	}
-	
-	@Override
-	protected Plan getTestJob() {
-		KMeansBroadcast kmi = new KMeansBroadcast();
-		return kmi.getPlan(String.valueOf(parallelism), dataPath, clusterPath, resultPath, "20");
-	}
-
-
-	@Override
-	protected void postSubmit() throws Exception {
-		List<String> resultLines = new ArrayList<String>();
-		readAllResultLines(resultLines, resultPath);
-		
-		KMeansData.checkResultsWithDelta(KMeansData.CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT, resultLines, 0.1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
deleted file mode 100644
index fcf43df..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-
-public class KMeansITCase extends RecordAPITestBase {
-
-	protected String dataPath;
-	protected String clusterPath;
-	protected String resultPath;
-
-	public KMeansITCase(){
-		setTaskManagerNumSlots(parallelism);
-	}
-
-	@Override
-	protected void preSubmit() throws Exception {
-		dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS);
-		clusterPath = createTempFile("initial_centers.txt", KMeansData.INITIAL_CENTERS);
-		resultPath = getTempDirPath("result");
-	}
-	
-	@Override
-	protected Plan getTestJob() {
-		KMeansBroadcast kmi = new KMeansBroadcast();
-		return kmi.getPlan(String.valueOf(parallelism), dataPath, clusterPath, resultPath, "20");
-	}
-
-
-	@Override
-	protected void postSubmit() throws Exception {
-		List<String> resultLines = new ArrayList<String>();
-		readAllResultLines(resultLines, resultPath);
-		
-		KMeansData.checkResultsWithDelta(KMeansData.CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT, resultLines, 0.1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
index ab8ff45..959a17a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
@@ -21,11 +21,24 @@ package org.apache.flink.test.optimizer.examples;
 import static org.junit.Assert.*;
 
 import java.util.Arrays;
+import java.util.Collection;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
@@ -34,11 +47,10 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-@SuppressWarnings("deprecation")
 public class KMeansSingleStepTest extends CompilerTestBase {
 	
 	private static final String DATAPOINTS = "Data Points";
@@ -54,16 +66,15 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 	
 	@Test
 	public void testCompileKMeansSingleStepWithStats() {
-		
-		KMeansSingleStep kmi = new KMeansSingleStep();
-		Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+
+		Plan p = getKMeansPlan();
 		p.setExecutionConfig(new ExecutionConfig());
 		// set the statistics
 		OperatorResolver cr = getContractResolver(p);
-		FileDataSource pointsSource = cr.getNode(DATAPOINTS);
-		FileDataSource centersSource = cr.getNode(CENTERS);
-		setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
-		setSourceStatistics(centersSource, 1024*1024, 32f);
+		GenericDataSourceBase pointsSource = cr.getNode(DATAPOINTS);
+		GenericDataSourceBase centersSource = cr.getNode(CENTERS);
+		setSourceStatistics(pointsSource, 100l * 1024 * 1024 * 1024, 32f);
+		setSourceStatistics(centersSource, 1024 * 1024, 32f);
 		
 		OptimizedPlan plan = compileWithStats(p);
 		checkPlan(plan);
@@ -71,9 +82,8 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 
 	@Test
 	public void testCompileKMeansSingleStepWithOutStats() {
-		
-		KMeansSingleStep kmi = new KMeansSingleStep();
-		Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+
+		Plan p = getKMeansPlan();
 		p.setExecutionConfig(new ExecutionConfig());
 		OptimizedPlan plan = compileNoStats(p);
 		checkPlan(plan);
@@ -97,7 +107,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 		assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
 		assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
 		
-		assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy());
+		assertEquals(DriverStrategy.MAP, mapper.getDriverStrategy());
 		
 		assertNull(mapper.getInput().getLocalStrategyKeys());
 		assertNull(mapper.getInput().getLocalStrategySortOrder());
@@ -127,4 +137,145 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
 		assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
 	}
+
+	public static Plan getKMeansPlan() {
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			KMeans(new String[]{IN_FILE, IN_FILE, OUT_FILE, "20"});
+		} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("KMeans failed with an exception");
+		}
+		return env.getPlan();
+	}
+
+	public static void KMeans(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Point> points = env.readCsvFile(args[0])
+				.fieldDelimiter(" ")
+				.includeFields(true, true)
+				.types(Double.class, Double.class)
+				.name(DATAPOINTS)
+				.map(new MapFunction<Tuple2<Double, Double>, Point>() {
+					@Override
+					public Point map(Tuple2<Double, Double> value) throws Exception {
+						return new Point(value.f0, value.f1);
+					}
+				});
+
+		DataSet<Centroid> centroids = env.readCsvFile(args[1])
+				.fieldDelimiter(" ")
+				.includeFields(true, true, true)
+				.types(Integer.class, Double.class, Double.class)
+				.name(CENTERS)
+				.map(new MapFunction<Tuple3<Integer, Double, Double>, Centroid>() {
+					@Override
+					public Centroid map(Tuple3<Integer, Double, Double> value) throws Exception {
+						return new Centroid(value.f0, value.f1, value.f2);
+					}
+				});
+
+		DataSet<Tuple3<Integer, Point, Integer>> newCentroids = points
+				.map(new SelectNearestCenter()).name(MAPPER_NAME).withBroadcastSet(centroids, "centroids");
+
+		DataSet<Tuple3<Integer, Point, Integer>> recomputeClusterCenter
+				= newCentroids.groupBy(0).reduceGroup(new RecomputeClusterCenter()).name(REDUCER_NAME);
+
+		recomputeClusterCenter.project(0, 1).writeAsCsv(args[2], "\n", " ").name(SINK);
+
+		env.execute("KMeans Example");
+	}
+
+	public static class Point extends Tuple2<Double, Double> {
+		public Point(double x, double y) {
+			this.f0 = x;
+			this.f1 = y;
+		}
+
+		public Point add(Point other) {
+			f0 += other.f0;
+			f1 += other.f1;
+			return this;
+		}
+
+		public Point div(long val) {
+			f0 /= val;
+			f1 /= val;
+			return this;
+		}
+
+		public double euclideanDistance(Point other) {
+			return Math.sqrt((f0 - other.f0) * (f0 - other.f0) + (f1 - other.f1) * (f1 - other.f1));
+		}
+
+		public double euclideanDistance(Centroid other) {
+			return Math.sqrt((f0 - other.f1.f0) * (f0 - other.f1.f0) + (f1 - other.f1.f1) * (f1 - other.f1.f1));
+		}
+	}
+
+	public static class Centroid extends Tuple2<Integer, Point> {
+		public Centroid(int id, double x, double y) {
+			this.f0 = id;
+			this.f1 = new Point(x, y);
+		}
+
+		public Centroid(int id, Point p) {
+			this.f0 = id;
+			this.f1 = p;
+		}
+	}
+
+	/**
+	 * Determines the closest cluster center for a data point.
+	 */
+	public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple3<Integer, Point, Integer>> {
+		private Collection<Centroid> centroids;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
+		}
+
+		@Override
+		public Tuple3<Integer, Point, Integer> map(Point p) throws Exception {
+			double minDistance = Double.MAX_VALUE;
+			int closestCentroidId = -1;
+			for (Centroid centroid : centroids) {
+				double distance = p.euclideanDistance(centroid);
+				if (distance < minDistance) {
+					minDistance = distance;
+					closestCentroidId = centroid.f0;
+				}
+			}
+			return new Tuple3<>(closestCentroidId, p, 1);
+		}
+	}
+
+	@Combinable
+	public static final class RecomputeClusterCenter extends RichGroupReduceFunction<Tuple3<Integer, Point, Integer>, Tuple3<Integer, Point, Integer>> {
+		@Override
+		public void reduce(Iterable<Tuple3<Integer, Point, Integer>> values, Collector<Tuple3<Integer, Point, Integer>> out) throws Exception {
+			int id = -1;
+			double x = 0;
+			double y = 0;
+			int count = 0;
+			for (Tuple3<Integer, Point, Integer> value : values) {
+				id = value.f0;
+				x += value.f1.f0;
+				y += value.f1.f1;
+				count += value.f2;
+			}
+			out.collect(new Tuple3<>(id, new Point(x, y), count));
+		}
+
+		@Override
+		public void combine(Iterable<Tuple3<Integer, Point, Integer>> values, Collector<Tuple3<Integer, Point, Integer>> out) throws Exception {
+			reduce(values, out);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
index f4efb8a..e929913 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
@@ -22,10 +22,23 @@ import java.util.Arrays;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.operators.DualInputOperator;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -35,23 +48,25 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
 /**
  * Tests TPCH Q3 (simplified) under various input conditions.
  */
-@SuppressWarnings("deprecation")
+@SuppressWarnings("serial")
 public class RelationalQueryCompilerTest extends CompilerTestBase {
 	
 	private static final String ORDERS = "Orders";
 	private static final String LINEITEM = "LineItems";
 	private static final String MAPPER_NAME = "FilterO";
 	private static final String JOIN_NAME = "JoinLiO";
+	private static final String REDUCE_NAME = "AggLiO";
+	private static final String SINK = "Output";
 	
 	private final FieldList set0 = new FieldList(0);
-	private final FieldList set01 = new FieldList(new int[] {0,1});
+	private final FieldList set01 = new FieldList(0,1);
 	private final ExecutionConfig defaultExecutionConfig = new ExecutionConfig();
 	
 	// ------------------------------------------------------------------------
@@ -63,8 +78,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 	@Test
 	public void testQueryNoStatistics() {
 		try {
-			TPCHQuery3 query = new TPCHQuery3();
-			Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
+			Plan p = getTPCH3Plan();
 			p.setExecutionConfig(defaultExecutionConfig);
 			// compile
 			final OptimizedPlan plan = compileNoStats(p);
@@ -72,12 +86,12 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
 			
 			// get the nodes from the final plan
-			final SinkPlanNode sink = or.getNode("Output");
-			final SingleInputPlanNode reducer = or.getNode("AggLio");
+			final SinkPlanNode sink = or.getNode(SINK);
+			final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME);
 			final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
 					(SingleInputPlanNode) reducer.getPredecessor() : null;
-			final DualInputPlanNode join = or.getNode("JoinLiO");
-			final SingleInputPlanNode filteringMapper = or.getNode("FilterO");
+			final DualInputPlanNode join = or.getNode(JOIN_NAME);
+			final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME);
 			
 			// verify the optimizer choices
 			checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
@@ -95,7 +109,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 	 */
 	@Test
 	public void testQueryAnyValidPlan() {
-		testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, true, true, true, false, true);
+		testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, 0.05f, true, true, true, false, true);
 	}
 	
 	/**
@@ -103,7 +117,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 	 */
 	@Test
 	public void testQueryWithSizeZeroInputs() {
-		testQueryGeneric(0, 0, 0.5f, true, true, true, false, true);
+		testQueryGeneric(0, 0, 0.1f, 0.5f, true, true, true, false, true);
 	}
 	
 	/**
@@ -111,7 +125,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 	 */
 	@Test
 	public void testQueryWithStatsForBroadcastHash() {
-		testQueryGeneric(1024l*1024*1024*1024, 1024l*1024*1024*1024, 0.05f, true, false, true, false, false);
+		testQueryGeneric(1024l*1024*1024*1024, 1024l*1024*1024*1024, 0.01f, 0.05f, true, false, true, false, false);
 	}
 	
 	/**
@@ -119,7 +133,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 	 */
 	@Test
 	public void testQueryWithStatsForRepartitionAny() {
-		testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.5f, false, true, true, true, true);
+		testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.1f, 0.5f, false, true, true, true, true);
 	}
 	
 	/**
@@ -128,34 +142,23 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 	 */
 	@Test
 	public void testQueryWithStatsForRepartitionMerge() {
-		TPCHQuery3 query = new TPCHQuery3();
-		Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
+		Plan p = getTPCH3Plan();
 		p.setExecutionConfig(defaultExecutionConfig);
 		// set compiler hints
 		OperatorResolver cr = getContractResolver(p);
-		JoinOperator match = cr.getNode("JoinLiO");
+		DualInputOperator<?,?,?,?> match = cr.getNode(JOIN_NAME);
 		match.getCompilerHints().setFilterFactor(100f);
 		
-		testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.05f, 100f, false, true, false, false, true);
+		testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.01f, 100f, false, true, false, false, true);
 	}
 	
 	// ------------------------------------------------------------------------
-	
-	private void testQueryGeneric(long orderSize, long lineItemSize, 
-			float ordersFilterFactor, 
-			boolean broadcastOkay, boolean partitionedOkay,
-			boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
-	{
-		testQueryGeneric(orderSize, lineItemSize, ordersFilterFactor, ordersFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
-	}
-	
 	private void testQueryGeneric(long orderSize, long lineItemSize, 
 			float ordersFilterFactor, float joinFilterFactor,
 			boolean broadcastOkay, boolean partitionedOkay,
 			boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay)
 	{
-		TPCHQuery3 query = new TPCHQuery3();
-		Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE);
+		Plan p = getTPCH3Plan();
 		p.setExecutionConfig(defaultExecutionConfig);
 		testQueryGeneric(p, orderSize, lineItemSize, ordersFilterFactor, joinFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
 	}
@@ -168,10 +171,10 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 		try {
 			// set statistics
 			OperatorResolver cr = getContractResolver(p);
-			FileDataSource ordersSource = cr.getNode(ORDERS);
-			FileDataSource lineItemSource = cr.getNode(LINEITEM);
-			MapOperator mapper = cr.getNode(MAPPER_NAME);
-			JoinOperator joiner = cr.getNode(JOIN_NAME);
+			GenericDataSourceBase<?,?> ordersSource = cr.getNode(ORDERS);
+			GenericDataSourceBase<?,?> lineItemSource = cr.getNode(LINEITEM);
+			SingleInputOperator<?,?,?> mapper = cr.getNode(MAPPER_NAME);
+			DualInputOperator<?,?,?,?> joiner = cr.getNode(JOIN_NAME);
 			setSourceStatistics(ordersSource, orderSize, 100f);
 			setSourceStatistics(lineItemSource, lineitemSize, 140f);
 			mapper.getCompilerHints().setAvgOutputRecordSize(16f);
@@ -183,12 +186,12 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
 			
 			// get the nodes from the final plan
-			final SinkPlanNode sink = or.getNode("Output");
-			final SingleInputPlanNode reducer = or.getNode("AggLio");
+			final SinkPlanNode sink = or.getNode(SINK);
+			final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME);
 			final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ?
 					(SingleInputPlanNode) reducer.getPredecessor() : null;
-			final DualInputPlanNode join = or.getNode("JoinLiO");
-			final SingleInputPlanNode filteringMapper = or.getNode("FilterO");
+			final DualInputPlanNode join = or.getNode(JOIN_NAME);
+			final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME);
 			
 			checkStandardStrategies(filteringMapper, join, combiner, reducer, sink);
 			
@@ -230,7 +233,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 	// ------------------------------------------------------------------------
 	//  Checks for special conditions
 	// ------------------------------------------------------------------------
-	
+
 	private void checkStandardStrategies(SingleInputPlanNode map, DualInputPlanNode join, SingleInputPlanNode combiner,
 			SingleInputPlanNode reducer, SinkPlanNode sink)
 	{
@@ -239,7 +242,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 		Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
 		
 		// check the driver strategies that are always fix
-		Assert.assertEquals(DriverStrategy.COLLECTOR_MAP, map.getDriverStrategy());
+		Assert.assertEquals(DriverStrategy.FLAT_MAP, map.getDriverStrategy());
 		Assert.assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
 		Assert.assertEquals(DriverStrategy.NONE, sink.getDriverStrategy());
 		if (combiner != null) {
@@ -348,4 +351,73 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			return false;
 		}
 	}
+
+	public static Plan getTPCH3Plan() {
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			TCPH3(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE});
+		} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("TCPH3 failed with an exception");
+		}
+		return env.getPlan();
+	}
+
+	public static void TCPH3(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(Integer.parseInt(args[0]));
+
+		//order id, order status, order data, order prio, ship prio
+		DataSet<Tuple5<Long, String, String, String, Integer>> orders
+				= env.readCsvFile(args[1])
+				.fieldDelimiter("|").lineDelimiter("\n")
+				.includeFields("101011001").types(Long.class, String.class, String.class, String.class, Integer.class)
+				.name(ORDERS);
+
+		//order id, extended price
+		DataSet<Tuple2<Long, Double>> lineItems
+				= env.readCsvFile(args[2])
+				.fieldDelimiter("|").lineDelimiter("\n")
+				.includeFields("100001").types(Long.class, Double.class)
+				.name(LINEITEM);
+
+		DataSet<Tuple2<Long, Integer>> filterO = orders.flatMap(new FilterO()).name(MAPPER_NAME);
+
+		DataSet<Tuple3<Long, Integer, Double>> joinLiO = filterO.join(lineItems).where(0).equalTo(0).with(new JoinLiO()).name(JOIN_NAME);
+
+		DataSet<Tuple3<Long, Integer, Double>> aggLiO = joinLiO.groupBy(0, 1).reduceGroup(new AggLiO()).name(REDUCE_NAME);
+
+		aggLiO.writeAsCsv(args[3], "\n", "|").name(SINK);
+
+		env.execute();
+	}
+
+	@ForwardedFields("f0; f4->f1")
+	public static class FilterO implements FlatMapFunction<Tuple5<Long, String, String, String, Integer>, Tuple2<Long, Integer>> {
+		@Override
+		public void flatMap(Tuple5<Long, String, String, String, Integer> value, Collector<Tuple2<Long, Integer>> out) throws Exception {
+			// not going to be executed
+		}
+	}
+
+	@ForwardedFieldsFirst("f0; f1")
+	public static class JoinLiO implements FlatJoinFunction<Tuple2<Long, Integer>, Tuple2<Long, Double>, Tuple3<Long, Integer, Double>> {
+		@Override
+		public void join(Tuple2<Long, Integer> first, Tuple2<Long, Double> second, Collector<Tuple3<Long, Integer, Double>> out) throws Exception {
+			// not going to be executed
+		}
+	}
+
+	@ForwardedFields("f0; f1")
+	@Combinable
+	public static class AggLiO extends RichGroupReduceFunction<Tuple3<Long, Integer, Double>, Tuple3<Long, Integer, Double>> {
+		@Override
+		public void reduce(Iterable<Tuple3<Long, Integer, Double>> values, Collector<Tuple3<Long, Integer, Double>> out) throws Exception {
+			// not going to be executed
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
index 99402a5..e134c7a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
@@ -20,7 +20,19 @@ package org.apache.flink.test.optimizer.iterations;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.optimizer.dag.TempMode;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -33,13 +45,14 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.graph.ConnectedComponentsWithCoGroup;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
 /**
  *
  */
+@SuppressWarnings("serial")
 public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
 	
 	private static final String VERTEX_SOURCE = "Vertices";
@@ -59,10 +72,7 @@ public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
 	
 	@Test
 	public void testWorksetConnectedComponents() {
-		ConnectedComponentsWithCoGroup cc = new ConnectedComponentsWithCoGroup();
-
-		Plan plan = cc.getPlan(String.valueOf(DEFAULT_PARALLELISM),
-				IN_FILE, IN_FILE, OUT_FILE, String.valueOf(100));
+		Plan plan = getConnectedComponentsCoGroupPlan();
 		plan.setExecutionConfig(new ExecutionConfig());
 		OptimizedPlan optPlan = compileNoStats(plan);
 		OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(optPlan);
@@ -134,4 +144,68 @@ public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
 		JobGraphGenerator jgg = new JobGraphGenerator();
 		jgg.compileJobGraph(optPlan);
 	}
+
+	public static Plan getConnectedComponentsCoGroupPlan() {
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			ConnectedComponentsWithCoGroup(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE, "100"});
+		} catch (ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("ConnectedComponentsWithCoGroup failed with an exception");
+		}
+		return env.getPlan();
+	}
+
+	public static void ConnectedComponentsWithCoGroup(String[] args) throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(Integer.parseInt(args[0]));
+
+		DataSet<Tuple1<Long>> initialVertices = env.readCsvFile(args[1]).types(Long.class).name(VERTEX_SOURCE);
+
+		DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(args[2]).types(Long.class, Long.class).name(EDGES_SOURCE);
+
+		DataSet<Tuple2<Long, Long>> verticesWithId = initialVertices.flatMap(new DummyMapFunction());
+
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration
+				= verticesWithId.iterateDelta(verticesWithId, Integer.parseInt(args[4]), 0).name(ITERATION_NAME);
+
+		DataSet<Tuple2<Long, Long>> joinWithNeighbors = iteration.getWorkset().join(edges)
+				.where(0).equalTo(0)
+				.with(new DummyJoinFunction()).name(JOIN_NEIGHBORS_MATCH);
+
+		DataSet<Tuple2<Long, Long>> minAndUpdate = joinWithNeighbors.coGroup(iteration.getSolutionSet())
+				.where(0).equalTo(0)
+				.with(new DummyCoGroupFunction()).name(MIN_ID_AND_UPDATE);
+
+		iteration.closeWith(minAndUpdate, minAndUpdate).writeAsCsv(args[3]).name(SINK);
+
+		env.execute();
+	}
+
+	public static class DummyMapFunction implements FlatMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
+		@Override
+		public void flatMap(Tuple1<Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			// won't be executed
+		}
+	}
+
+	public static class DummyJoinFunction implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		@Override
+		public void join(Tuple2<Long, Long> first, Tuple2<Long, Long> second, Collector<Tuple2<Long, Long>> out) throws Exception {
+			// won't be executed
+		}
+	}
+
+	@ForwardedFieldsFirst("f0->f0")
+	@ForwardedFieldsSecond("f0->f0")
+	public static class DummyCoGroupFunction implements CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		@Override
+		public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<Tuple2<Long, Long>> second, Collector<Tuple2<Long, Long>> out) throws Exception {
+			// won't be executed
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
deleted file mode 100644
index 3785270..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.optimizer.iterations;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class IterativeKMeansTest extends CompilerTestBase {
-	
-	private static final String DATAPOINTS = "Data Points";
-	private static final String CENTERS = "Centers";
-	
-	private static final String MAPPER_NAME = "Find Nearest Centers";
-	private static final String REDUCER_NAME = "Recompute Center Positions";
-	
-	private static final String ITERATION_NAME = "k-means loop";
-	
-	private static final String SINK = "New Center Positions";
-	
-	private final FieldList set0 = new FieldList(0);
-	
-	// --------------------------------------------------------------------------------------------
-	//  K-Means (Bulk Iteration)
-	// --------------------------------------------------------------------------------------------
-	
-	@Test
-	public void testCompileKMeansSingleStepWithStats() {
-		
-		KMeansBroadcast kmi = new KMeansBroadcast();
-		Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
-		p.setExecutionConfig(new ExecutionConfig());
-		// set the statistics
-		OperatorResolver cr = getContractResolver(p);
-		FileDataSource pointsSource = cr.getNode(DATAPOINTS);
-		FileDataSource centersSource = cr.getNode(CENTERS);
-		setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
-		setSourceStatistics(centersSource, 1024*1024, 32f);
-		
-		OptimizedPlan plan = compileWithStats(p);
-		checkPlan(plan);
-		
-		new JobGraphGenerator().compileJobGraph(plan);
-	}
-
-	@Test
-	public void testCompileKMeansSingleStepWithOutStats() {
-		
-		KMeansBroadcast kmi = new KMeansBroadcast();
-		Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
-		p.setExecutionConfig(new ExecutionConfig());
-		OptimizedPlan plan = compileNoStats(p);
-		checkPlan(plan);
-		
-		new JobGraphGenerator().compileJobGraph(plan);
-	}
-	
-	private void checkPlan(OptimizedPlan plan) {
-		
-		OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
-		
-		final SinkPlanNode sink = or.getNode(SINK);
-		final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME);
-		final SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
-		final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME);
-		
-		final BulkIterationPlanNode iter = or.getNode(ITERATION_NAME);
-		
-		// -------------------- outside the loop -----------------------
-		
-		// check the sink
-		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-		assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
-		
-		// check the iteration
-		assertEquals(ShipStrategyType.FORWARD, iter.getInput().getShipStrategy());
-		assertEquals(LocalStrategy.NONE, iter.getInput().getLocalStrategy());
-		
-		
-		// -------------------- inside the loop -----------------------
-		
-		// check the mapper
-		assertEquals(1, mapper.getBroadcastInputs().size());
-		assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
-		assertEquals(ShipStrategyType.BROADCAST, mapper.getBroadcastInputs().get(0).getShipStrategy());
-		assertFalse(mapper.getInput().isOnDynamicPath());
-		assertTrue(mapper.getBroadcastInputs().get(0).isOnDynamicPath());
-		assertTrue(mapper.getInput().getTempMode().isCached());
-		
-		assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy());
-		assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy());
-		
-		assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy());
-		
-		assertNull(mapper.getInput().getLocalStrategyKeys());
-		assertNull(mapper.getInput().getLocalStrategySortOrder());
-		assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys());
-		assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder());
-		
-		// check the combiner
-		Assert.assertNotNull(combiner);
-		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
-		assertTrue(combiner.getInput().isOnDynamicPath());
-		
-		assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
-		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
-		assertNull(combiner.getInput().getLocalStrategyKeys());
-		assertNull(combiner.getInput().getLocalStrategySortOrder());
-		assertEquals(set0, combiner.getKeys(0));
-		assertEquals(set0, combiner.getKeys(1));
-		
-		// check the reducer
-		assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
-		assertTrue(reducer.getInput().isOnDynamicPath());
-		assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
-		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
-		assertEquals(set0, reducer.getKeys(0));
-		assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
-		assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
index 24d9416..d186cbb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -25,13 +25,12 @@ import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.examples.java.clustering.KMeans;
+import org.apache.flink.examples.java.graph.ConnectedComponents;
+import org.apache.flink.examples.java.graph.PageRankBasic;
+import org.apache.flink.examples.java.relational.TPCHQuery3;
+import org.apache.flink.examples.java.relational.WebLogAnalysis;
+import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
-import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonParser;
@@ -45,17 +44,34 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
 	
 	@Test
 	public void dumpWordCount() {
-		dump(new WordCount().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE));
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			WordCount.main(new String[] {IN_FILE, OUT_FILE});
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("WordCount failed with an exception");
+		}
+		dump(env.getPlan());
 	}
 	
 	@Test
 	public void dumpTPCH3() {
-		dump(new TPCHQuery3().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE));
-	}
-	
-	@Test
-	public void dumpKMeans() {
-		dump(new KMeansSingleStep().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE));
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			TPCHQuery3.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("TPCH3 failed with an exception");
+		}
+		dump(env.getPlan());
 	}
 	
 	@Test
@@ -64,7 +80,6 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
 		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 		env.setAsContext();
 		try {
-			// <points path> <centers path> <result path> <num iterations
 			KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
 		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
@@ -77,17 +92,50 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
 	
 	@Test
 	public void dumpWebLogAnalysis() {
-		dump(new WebLogAnalysis().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			WebLogAnalysis.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("WebLogAnalysis failed with an exception");
+		}
+		dump(env.getPlan());
 	}
 
 	@Test
 	public void dumpBulkIterationKMeans() {
-		dump(new KMeansBroadcast().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE));
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			ConnectedComponents.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("ConnectedComponents failed with an exception");
+		}
+		dump(env.getPlan());
 	}
 	
 	@Test
-	public void dumpDeltaPageRank() {
-		dump(new DeltaPageRankWithInitialDeltas().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
+	public void dumpPageRank() {
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			PageRankBasic.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "10", "123"});
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("PagaRank failed with an exception");
+		}
+		dump(env.getPlan());
 	}
 	
 	private void dump(Plan p) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
index 49fe6d8..95a06c3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
@@ -21,16 +21,17 @@ package org.apache.flink.test.optimizer.jsonplan;
 import java.util.List;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.examples.java.clustering.KMeans;
+import org.apache.flink.examples.java.graph.ConnectedComponents;
+import org.apache.flink.examples.java.graph.PageRankBasic;
+import org.apache.flink.examples.java.relational.TPCHQuery3;
+import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
-import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
-import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonParser;
@@ -40,51 +41,102 @@ import org.junit.Test;
 /*
  * The tests in this class simply invokes the JSON dump code for the original plan.
  */
-public class PreviewPlanDumpTest {
+public class PreviewPlanDumpTest extends CompilerTestBase {
 	
-	protected static final String IN_FILE = OperatingSystem.isWindows() ?  "file:/c:/test/file" : "file:///test/file";
-	
-	protected static final String OUT_FILE = OperatingSystem.isWindows() ?  "file:/c:/test/output" : "file:///test/output";
-	
-	protected static final String[] NO_ARGS = new String[0];
-
 	@Test
 	public void dumpWordCount() {
-		dump(new WordCount().getPlan("4", IN_FILE, OUT_FILE));
-		
-		// The web interface passes empty string-args to compute the preview of the
-		// job, so we should test this situation too
-		dump(new WordCount().getPlan(NO_ARGS));
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			WordCount.main(new String[] {IN_FILE, OUT_FILE});
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("WordCount failed with an exception");
+		}
+		dump(env.getPlan());
 	}
 	
 	@Test
 	public void dumpTPCH3() {
-		dump(new TPCHQuery3().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
-		dump(new TPCHQuery3().getPlan(NO_ARGS));
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			TPCHQuery3.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("TPCH3 failed with an exception");
+		}
+		dump(env.getPlan());
 	}
 	
 	@Test
-	public void dumpKMeans() {
-		dump(new KMeansSingleStep().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
-		dump(new KMeansSingleStep().getPlan(NO_ARGS));
+	public void dumpIterativeKMeans() {
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("KMeans failed with an exception");
+		}
+		dump(env.getPlan());
 	}
 	
 	@Test
 	public void dumpWebLogAnalysis() {
-		dump(new WebLogAnalysis().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
-		dump(new WebLogAnalysis().getPlan(NO_ARGS));
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			org.apache.flink.examples.java.relational.WebLogAnalysis.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("WebLogAnalysis failed with an exception");
+		}
+		dump(env.getPlan());
 	}
-	
+
 	@Test
 	public void dumpBulkIterationKMeans() {
-		dump(new KMeansBroadcast().getPlan("4", IN_FILE, OUT_FILE));
-		dump(new KMeansBroadcast().getPlan(NO_ARGS));
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			ConnectedComponents.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("ConnectedComponents failed with an exception");
+		}
+		dump(env.getPlan());
 	}
 	
 	@Test
-	public void dumpDeltaPageRank() {
-		dump(new DeltaPageRankWithInitialDeltas().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10"));
-		dump(new DeltaPageRankWithInitialDeltas().getPlan(NO_ARGS));
+	public void dumpPageRank() {
+		// prepare the test environment
+		PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+		env.setAsContext();
+		try {
+			PageRankBasic.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "10", "123"});
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+			// all good.
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("PagaRank failed with an exception");
+		}
+		dump(env.getPlan());
 	}
 	
 	private void dump(Plan p) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 310ded8..ec2dbb7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -29,8 +29,6 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.junit.After;
 import org.junit.Ignore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +36,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.test.util.JavaProgramTestBase;
 
 @Ignore
 public class NetworkStackThroughputITCase {
@@ -62,8 +62,8 @@ public class NetworkStackThroughputITCase {
 
 	// ------------------------------------------------------------------------
 
-	// wrapper to reuse RecordAPITestBase code in runs via main()
-	private static class TestBaseWrapper extends RecordAPITestBase {
+	// wrapper to reuse JavaProgramTestBase code in runs via main()
+	private static class TestBaseWrapper extends JavaProgramTestBase {
 
 		private int dataVolumeGb;
 		private boolean useForwarder;
@@ -90,7 +90,6 @@ public class NetworkStackThroughputITCase {
 			setTaskManagerNumSlots(numSlots);
 		}
 
-		@Override
 		protected JobGraph getJobGraph() throws Exception {
 			return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism);
 		}
@@ -138,19 +137,19 @@ public class NetworkStackThroughputITCase {
 			return jobGraph;
 		}
 
-		@After
-		public void calculateThroughput() {
-			if (getJobExecutionResult() != null) {
-				int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
 
-				long dataVolumeMbit = dataVolumeGb * 8192;
-				long runtimeSecs = getJobExecutionResult().getNetRuntime(TimeUnit.SECONDS);
+		@Override
+		protected void testProgram() throws Exception {
+			JobExecutionResult jer = executor.submitJobAndWait(getJobGraph(), false);
+			int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+
+			long dataVolumeMbit = dataVolumeGb * 8192;
+			long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);
 
-				int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
+			int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);
 
-				LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, " +
-						"data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
-			}
+			LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, " +
+					"data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
 		}
 	}
 
@@ -289,8 +288,7 @@ public class NetworkStackThroughputITCase {
 			TestBaseWrapper test = new TestBaseWrapper(config);
 
 			System.out.println(Arrays.toString(p));
-			test.testJob();
-			test.calculateThroughput();
+			test.testProgram();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java b/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java
deleted file mode 100644
index 0cff9b6..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.testPrograms.util.tests;
-
-import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
-import org.apache.flink.test.recordJobs.util.Tuple;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class IntTupleDataInFormatTest
-{
-	@Test
-	public void testReadLineKeyValuePairOfIntValueTupleByteArray() {
-		
-		String[] testTuples = {
-			"1|attribute1|attribute2|3|attribute4|5|",
-			"2|3|",
-			"3|attribute1|attribute2|",
-			"-1|attr1|attr2|",
-			"-2|attribute1|attribute2|",
-			Integer.MAX_VALUE+"|attr1|attr2|attr3|attr4|",
-			Integer.MIN_VALUE+"|attr1|attr2|attr3|attr4|"
-		};
-		
-		int[] expectedKeys = {
-			1,2,3,-1,-2,Integer.MAX_VALUE,Integer.MIN_VALUE
-		};
-		
-		int[] expectedAttrCnt = {6,2,3,3,3,5,5};
-
-		IntTupleDataInFormat inFormat = new IntTupleDataInFormat();
-		Record rec = new Record();	
-		
-		for(int i = 0; i < testTuples.length; i++) {
-			
-			byte[] tupleBytes = testTuples[i].getBytes();
-			
-			inFormat.readRecord(rec, tupleBytes, 0, tupleBytes.length);
-			
-			Assert.assertTrue("Expected Key: "+expectedKeys[i]+" != Returned Key: "+rec.getField(0, IntValue.class), rec.getField(0, IntValue.class).equals(new IntValue(expectedKeys[i])));
-			Assert.assertTrue("Expected Attr Cnt: "+expectedAttrCnt[i]+" != Returned Attr Cnt: "+rec.getField(1, Tuple.class), rec.getField(1, Tuple.class).getNumberOfColumns() == expectedAttrCnt[i]);
-		}
-	}
-}