You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/11/24 18:17:48 UTC

[05/16] flink git commit: Remove ITCases for Record API operators

Remove ITCases for Record API operators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50ced5e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50ced5e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50ced5e3

Branch: refs/heads/master
Commit: 50ced5e3dca0983352ac9eb2ae5ba1a7de66f25b
Parents: 3333f84
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Oct 22 21:13:59 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Nov 24 18:16:51 2015 +0100

----------------------------------------------------------------------
 .../broadcastvars/BroadcastBranchingITCase.java |   2 +-
 .../test/failingPrograms/TaskFailureITCase.java |   4 +-
 .../test/operators/CoGroupGroupSortITCase.java  | 122 ----------
 .../flink/test/operators/CoGroupITCase.java     | 202 ----------------
 .../flink/test/operators/CrossITCase.java       | 197 ----------------
 .../apache/flink/test/operators/JoinITCase.java | 182 --------------
 .../apache/flink/test/operators/MapITCase.java  | 133 -----------
 .../test/operators/MapPartitionITCase.java      | 100 --------
 .../flink/test/operators/ObjectReuseITCase.java | 236 -------------------
 .../flink/test/operators/ReduceITCase.java      | 171 --------------
 .../flink/test/operators/UnionITCase.java       | 182 --------------
 .../flink/test/operators/UnionSinkITCase.java   | 175 --------------
 .../operators/io/ContractITCaseIOFormats.java   |  85 -------
 .../test/util/ContractITCaseIOFormats.java      |  85 +++++++
 14 files changed, 88 insertions(+), 1788 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
index 4868aff..69be0ed 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.JoinOperator;
 import org.apache.flink.api.java.record.operators.MapOperator;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
+import org.apache.flink.test.util.ContractITCaseIOFormats.ContractITCaseOutputFormat;
 import org.apache.flink.test.util.RecordAPITestBase;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
index fe98e18..efb1c32 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
+import org.apache.flink.test.util.ContractITCaseIOFormats.ContractITCaseInputFormat;
+import org.apache.flink.test.util.ContractITCaseIOFormats.ContractITCaseOutputFormat;
 import org.apache.flink.test.util.FailingTestBase;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
deleted file mode 100644
index 5b78a51..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.operators;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class CoGroupGroupSortITCase extends JavaProgramTestBase {
-
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		
-		DataSet<Tuple2<Long, Long>> input1 = env.fromElements(
-				new Tuple2<Long, Long>(0L, 5L),
-				new Tuple2<Long, Long>(0L, 4L),
-				new Tuple2<Long, Long>(0L, 3L),
-				new Tuple2<Long, Long>(0L, 2L),
-				new Tuple2<Long, Long>(0L, 1L),
-				new Tuple2<Long, Long>(1L, 10L),
-				new Tuple2<Long, Long>(1L, 8L),
-				new Tuple2<Long, Long>(1L, 9L),
-				new Tuple2<Long, Long>(1L, 7L));
-		
-		DataSet<TestPojo> input2 = env.fromElements(
-				new TestPojo(0L, 10L, 3L),
-				new TestPojo(0L, 8L, 3L),
-				new TestPojo(0L, 10L, 1L),
-				new TestPojo(0L, 9L, 0L),
-				new TestPojo(0L, 8L, 2L),
-				new TestPojo(0L, 8L, 4L),
-				new TestPojo(1L, 10L, 3L),
-				new TestPojo(1L, 8L, 3L),
-				new TestPojo(1L, 10L, 1L),
-				new TestPojo(1L, 9L, 0L),
-				new TestPojo(1L, 8L, 2L),
-				new TestPojo(1L, 8L, 4L));
-		
-		input1.coGroup(input2)
-		.where(1).equalTo("b")
-		.sortFirstGroup(0, Order.DESCENDING)
-		.sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
-		
-		.with(new ValidatingCoGroup())
-		.output(new DiscardingOutputFormat<NullValue>());
-		
-		env.execute();
-	}
-	
-	
-	private static class ValidatingCoGroup implements CoGroupFunction<Tuple2<Long, Long>, TestPojo, NullValue> {
-
-		@Override
-		public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPojo> second, Collector<NullValue> out) throws Exception {
-			// validate the tuple input, field 1, descending
-			{
-				long lastValue = Long.MAX_VALUE;
-				
-				for (Tuple2<Long, Long> t : first) {
-					long current = t.f1;
-					Assert.assertTrue(current <= lastValue);
-					lastValue = current;
-				}
-			}
-			
-			
-			// validate the pojo input
-			{
-				TestPojo lastValue = new TestPojo(Long.MAX_VALUE, 0, Long.MIN_VALUE);
-				
-				for (TestPojo current : second) {
-					Assert.assertTrue(current.c >= lastValue.c);
-					Assert.assertTrue(current.c != lastValue.c || current.a <= lastValue.a);
-					
-					lastValue = current;
-				}
-			}
-			
-		}
-	}
-	
-	public static class TestPojo implements Cloneable {
-		public long a;
-		public long b;
-		public long c;
-		
-		
-		public TestPojo() {}
-		
-		public TestPojo(long a, long b, long c) {
-			this.a = a;
-			this.b = b;
-			this.c = c;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
deleted file mode 100644
index be05186..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.operators;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class CoGroupITCase extends RecordAPITestBase {
-
-	String leftInPath = null;
-	String rightInPath = null;
-	String resultPath = null;
-
-	public CoGroupITCase(Configuration testConfig) {
-		super(testConfig);
-	}
-
-	private static final String LEFT_IN = "1 1\n2 2\n3 3\n4 4\n1 2\n2 3\n3 4\n4 5\n" +
-			"1 3\n2 4\n3 5\n4 6\n1 4\n2 5\n3 6\n4 7\n";
-
-	private static final String RIGHT_IN = "1 1\n2 2\n3 3\n5 1\n1 1\n2 2\n3 3\n6 1\n" +
-			"1 1\n2 2\n2 2\n7 1\n1 1\n2 2\n2 2\n8 1\n";
-
-	private static final String RESULT = "1 6\n2 2\n3 12\n4 22\n5 -1\n6 -1\n7 -1\n8 -1\n";
-
-	@Override
-	protected void preSubmit() throws Exception {
-		leftInPath = createTempFile("left_in.txt", LEFT_IN);
-		rightInPath = createTempFile("right_in.txt", RIGHT_IN);
-		resultPath = getTempDirPath("result");
-	}
-
-	public static class CoGroupTestInFormat extends DelimitedInputFormat {
-		private static final long serialVersionUID = 1L;
-		
-		private final StringValue keyString = new StringValue();
-		private final StringValue valueString = new StringValue();
-		
-		@Override
-		public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-			this.keyString.setValueAscii(bytes, offset, 1);
-			this.valueString.setValueAscii(bytes, offset + 2, 1);
-			target.setField(0, keyString);
-			target.setField(1, valueString);
-
-			return target;
-		}
-
-	}
-
-	public static class CoGroupOutFormat extends FileOutputFormat {
-		private static final long serialVersionUID = 1L;
-		
-		private final StringBuilder buffer = new StringBuilder();
-		private final StringValue keyString = new StringValue();
-		private final IntValue valueInteger = new IntValue();
-		
-		@Override
-		public void writeRecord(Record record) throws IOException {
-			this.buffer.setLength(0);
-			this.buffer.append(record.getField(0, keyString).toString());
-			this.buffer.append(' ');
-			this.buffer.append(record.getField(1, valueInteger).getValue());
-			this.buffer.append('\n');
-			
-			byte[] bytes = this.buffer.toString().getBytes();
-			this.stream.write(bytes);
-		}
-	}
-
-	public static class TestCoGrouper extends CoGroupFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private StringValue keyString = new StringValue();
-		private StringValue valueString = new StringValue();
-		
-		@Override
-		public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
-			
-			Record record = null;
-			int sum = 0;
-			
-			while (records1.hasNext()) {
-				record = records1.next();
-				keyString = record.getField(0, keyString);
-				valueString = record.getField(1, valueString);
-				sum += Integer.parseInt(valueString.getValue());
-			}
-			
-			
-			while (records2.hasNext()) {
-				record = records2.next();
-				keyString = record.getField(0, keyString);
-				valueString = record.getField(1, valueString);
-				sum -= Integer.parseInt(valueString.getValue());
-			}
-			record.setField(1, new IntValue(sum));
-			
-			out.collect(record);
-		}
-
-	}
-
-	@Override
-	protected Plan getTestJob() {
-		FileDataSource input_left =  new FileDataSource(new CoGroupTestInFormat(), leftInPath);
-		DelimitedInputFormat.configureDelimitedFormat(input_left)
-			.recordDelimiter('\n');
-		input_left.setParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
-
-		FileDataSource input_right =  new FileDataSource(new CoGroupTestInFormat(), rightInPath);
-		DelimitedInputFormat.configureDelimitedFormat(input_right)
-			.recordDelimiter('\n');
-		input_right.setParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
-
-		CoGroupOperator testCoGrouper = CoGroupOperator.builder(new TestCoGrouper(), StringValue.class, 0, 0)
-			.build();
-		testCoGrouper.setParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1));
-		testCoGrouper.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
-				config.getString("CoGroupTest#LocalStrategy", ""));
-		testCoGrouper.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
-				config.getString("CoGroupTest#ShipStrategy", ""));
-
-		FileDataSink output = new FileDataSink(new CoGroupOutFormat(), resultPath);
-		output.setParallelism(1);
-
-		output.setInput(testCoGrouper);
-		testCoGrouper.setFirstInput(input_left);
-		testCoGrouper.setSecondInput(input_right);
-
-		return new Plan(output);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(RESULT, resultPath);
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE };
-
-		String[] shipStrategies = { Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH };
-
-		for (String localStrategy : localStrategies) {
-			for (String shipStrategy : shipStrategies) {
-
-				Configuration config = new Configuration();
-				config.setString("CoGroupTest#LocalStrategy", localStrategy);
-				config.setString("CoGroupTest#ShipStrategy", shipStrategy);
-				config.setInteger("CoGroupTest#NoSubtasks", 4);
-
-				tConfigs.add(config);
-			}
-		}
-
-		return toParameterList(tConfigs);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
deleted file mode 100644
index 3fde5a9..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.operators;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class CrossITCase extends RecordAPITestBase {
-
-	private String leftInPath = null;
-	private String rightInPath = null;
-	private String resultPath = null;
-
-	public CrossITCase(Configuration testConfig) {
-		super(testConfig);
-	}
-
-	//private static final String LEFT_IN = "1 1\n2 2\n1 1\n2 2\n3 3\n4 4\n3 3\n4 4\n";
-
-	//private static final String RIGHT_IN = "1 1\n1 2\n2 2\n2 4\n3 3\n3 6\n4 4\n4 8\n";
-
-	//private static final String RESULT = "4 1\n4 1\n4 2\n4 2\n5 2\n5 2\n5 4\n5 4\n6 3\n6 3\n7 4\n7 4\n"
-	//	+ "5 0\n5 0\n5 1\n5 1\n6 1\n6 1\n6 3\n6 3\n7 2\n7 2\n8 3\n8 3\n"
-	//	+ "6 -1\n6 -1\n6 0\n6 0\n7 0\n7 0\n8 1\n8 1\n" + "7 -2\n7 -2\n7 -1\n7 -1\n8 -1\n8 -1\n";
-
-	//private static final String RESULT = "10 1\n10 1\n10 5\n10 5\n4 1\n4 1\n4 2\n4 2\n5 0\n5 0\n5 1\n," +
-	//		"5 1\n5 2\n5 2\n5 4\n5 4\n6 -1\n6 -1\n6 0\n6 0\n6 1\n6 1\n6 3\n6 3\n6 3\n6 3\n6 6\n6 6\n7 -1\n" +
-	//		"7 -1\n7 -2\n7 -2\n7 0\n7 0\n7 2\n7 2\n7 2\n7 2\n7 4\n7 4\n7 5\n7 5\n7 8\n7 8\n8 -1\n8 -1\n8 1\n" +
-	//		"8 1\n8 1\n8 1\n8 3\n8 3\n8 4\n8 4\n8 7\n8 7\n9 0\n9 0\n9 2\n9 2\n9 3\n9 3\n9 6\n9 6\n";
-
-	//private static final String RESULT = "2 2\n4 4\n1 1\n3 3\n2 2\n4 4\n1 1\n3 3\n5 0\n5 1\n6 1\n 6 3\n" +
-	//		"7 2\n7 5\n8 3\n8 7\n7 -2\n7 -1\n8 -1\n8 1\n9 0\n9 3\n10 1\n10 5\n4 1\n4 2\n5 2\n5 4\n6 3\n" +
-	//		"6 6\n7 4\n7 8\n6 -1\n6 0\n7 0\n7 2\n8 1\n8 4\n9 2\n9 6\n5 0\n5 1\n6 1\n6 3\n7 2\n7 5\n 8 3\n" +
-	//		"8 7\n7 -2\n7 -1\n8 -1\n8 1\n9 0\n9 3\n10 1\n10 5\n4 1\n4 2\n5 2\n5 4\n6 3\n6 6\n7 4\n7 8\n" +
-	//		"6 -1\n6 0\n7 0\n7 2\n8 1\n8 4\n9 2\n9 6";
-
-
-	private static final String LEFT_IN = "1 1\n2 2\n3 3\n";
-	private static final String RIGHT_IN = "3 6\n4 4\n4 8\n";
-
-	private static final String RESULT = "6 6\n7 5\n7 8\n7 4\n8 3\n8 7\n8 4\n9 2\n9 6\n";
-
-	@Override
-	protected void preSubmit() throws Exception {
-		leftInPath = createTempFile("left_in.txt", LEFT_IN);
-		rightInPath = createTempFile("right_in.txt", RIGHT_IN);
-		resultPath = getTempDirPath("result");
-	}
-
-
-	public static class TestCross extends CrossFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private StringValue string = new StringValue();
-		private IntValue integer = new IntValue();
-		
-		@Override
-		public Record cross(Record record1, Record record2) throws Exception {
-			string = record1.getField(1, string);
-			int val1 = Integer.parseInt(string.toString());
-			string = record2.getField(1, string);
-			int val2 = Integer.parseInt(string.toString());
-			string = record1.getField(0, string);
-			int key1 = Integer.parseInt(string.toString());
-			string = record2.getField(0, string);
-			int key2 = Integer.parseInt(string.toString());
-
-			string.setValue((key1 + key2 + 2) + "");
-			integer.setValue(val2 - val1 + 1);
-
-			record1.setField(0, string);
-			record1.setField(1, integer);
-
-			return record1;
-		}
-
-	}
-
-	@Override
-	protected Plan getTestJob() {
-
-		FileDataSource input_left = new FileDataSource(
-				new ContractITCaseInputFormat(), leftInPath);
-		DelimitedInputFormat.configureDelimitedFormat(input_left)
-			.recordDelimiter('\n');
-		input_left.setParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
-
-		FileDataSource input_right = new FileDataSource(
-				new ContractITCaseInputFormat(), rightInPath);
-		DelimitedInputFormat.configureDelimitedFormat(input_right)
-			.recordDelimiter('\n');
-		input_right.setParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
-
-		CrossOperator testCross = CrossOperator.builder(new TestCross()).build();
-		testCross.setParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
-		testCross.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
-				config.getString("CrossTest#LocalStrategy", ""));
-		if (config.getString("CrossTest#ShipStrategy", "").equals("BROADCAST_FIRST")) {
-			testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
-					Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
-			testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
-					Optimizer.HINT_SHIP_STRATEGY_FORWARD);
-		} else if (config.getString("CrossTest#ShipStrategy", "").equals("BROADCAST_SECOND")) {
-			testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
-					Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
-			testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
-					Optimizer.HINT_SHIP_STRATEGY_FORWARD);
-		} else {
-			testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
-					config.getString("CrossTest#ShipStrategy", ""));
-		}
-
-		FileDataSink output = new FileDataSink(
-				new ContractITCaseOutputFormat(), resultPath);
-		output.setParallelism(1);
-
-		output.setInput(testCross);
-		testCross.setFirstInput(input_left);
-		testCross.setSecondInput(input_right);
-
-		return new Plan(output);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(RESULT, resultPath);
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST,
-				Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND,
-				Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST,
-				Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND };
-
-		String[] shipStrategies = { "BROADCAST_FIRST", "BROADCAST_SECOND"
-		// PactCompiler.HINT_SHIP_STRATEGY_BROADCAST
-		// PactCompiler.HINT_SHIP_STRATEGY_SFR
-		};
-
-		for (String localStrategy : localStrategies) {
-			for (String shipStrategy : shipStrategies) {
-
-				Configuration config = new Configuration();
-				config.setString("CrossTest#LocalStrategy", localStrategy);
-				config.setString("CrossTest#ShipStrategy", shipStrategy);
-				config.setInteger("CrossTest#NoSubtasks", 4);
-
-				tConfigs.add(config);
-			}
-		}
-
-		return toParameterList(tConfigs);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
deleted file mode 100644
index c2ec55a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.operators;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class JoinITCase extends RecordAPITestBase {
-
-	private static final Logger LOG = LoggerFactory.getLogger(JoinITCase.class);
-
-	String leftInPath = null;
-	String rightInPath = null;
-	String resultPath = null;
-
-	public JoinITCase(Configuration testConfig) {
-		super(testConfig);
-	}
-
-	private static final String LEFT_IN = "1 1\n2 2\n3 3\n4 4\n1 2\n2 3\n3 4\n4 5\n" +
-			"1 3\n2 4\n3 5\n4 6\n1 4\n2 5\n3 6\n4 7\n";
-
-	private static final String RIGHT_IN = "1 1\n2 2\n3 3\n5 1\n1 1\n2 2\n3 3\n6 1\n" +
-			"1 1\n2 2\n2 2\n7 1\n1 1\n2 2\n2 2\n8 1\n";
-
-	private static final String RESULT = "2 1\n2 1\n2 1\n2 1\n2 2\n2 2\n2 2\n2 2\n2 3\n2 3\n2 3\n2 3\n2 4\n2 4\n2 4\n2 4\n"
-		+ "4 1\n4 1\n4 2\n4 2\n4 3\n4 3\n4 4\n4 4\n"
-		+ "3 1\n3 2\n3 3\n3 4\n3 1\n3 2\n3 3\n3 4\n3 1\n3 2\n3 3\n3 4\n3 1\n3 2\n3 3\n3 4\n3 1\n3 2\n3 3\n3 4\n3 1\n3 2\n3 3\n3 4\n";
-
-	@Override
-	protected void preSubmit() throws Exception {
-		leftInPath = createTempFile("left_in.txt", LEFT_IN);
-		rightInPath = createTempFile("right_in.txt", RIGHT_IN);
-		resultPath = getTempDirPath("result");
-	}
-
-	public static class TestMatcher extends JoinFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private StringValue keyString = new StringValue();
-		private StringValue valueString = new StringValue();
-		
-		@Override
-		public void join(Record value1, Record value2, Collector<Record> out)
-				throws Exception {
-			keyString = value1.getField(0, keyString);
-			keyString.setValue(""+ (Integer.parseInt(keyString.getValue())+1));
-			value1.setField(0, keyString);
-			valueString = value1.getField(1, valueString);
-			int val1 = Integer.parseInt(valueString.getValue())+2;
-			valueString = value2.getField(1, valueString);
-			int val2 = Integer.parseInt(valueString.getValue())+1;
-			
-			value1.setField(1, new IntValue(val1 - val2));
-			
-			out.collect(value1);
-			
-			if (LOG.isDebugEnabled())
-				LOG.debug("Processed: [" + keyString.toString() + "," + val1 + "] + " +
-						"[" + keyString.toString() + "," + val2 + "]");
-		}
-
-	}
-
-	@Override
-	protected Plan getTestJob() {
-		FileDataSource input_left = new FileDataSource(
-				new ContractITCaseInputFormat(), leftInPath);
-		DelimitedInputFormat.configureDelimitedFormat(input_left)
-			.recordDelimiter('\n');
-		input_left.setParallelism(config.getInteger("MatchTest#NoSubtasks", 1));
-
-		FileDataSource input_right = new FileDataSource(
-				new ContractITCaseInputFormat(), rightInPath);
-		DelimitedInputFormat.configureDelimitedFormat(input_right)
-			.recordDelimiter('\n');
-		input_right.setParallelism(config.getInteger("MatchTest#NoSubtasks", 1));
-
-		JoinOperator testMatcher = JoinOperator.builder(new TestMatcher(), StringValue.class, 0, 0)
-			.build();
-		testMatcher.setParallelism(config.getInteger("MatchTest#NoSubtasks", 1));
-		testMatcher.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
-				config.getString("MatchTest#LocalStrategy", ""));
-		if (config.getString("MatchTest#ShipStrategy", "").equals("BROADCAST_FIRST")) {
-			testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
-					Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
-			testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
-					Optimizer.HINT_SHIP_STRATEGY_FORWARD);
-		} else if (config.getString("MatchTest#ShipStrategy", "").equals("BROADCAST_SECOND")) {
-			testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
-					Optimizer.HINT_SHIP_STRATEGY_FORWARD);
-			testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
-					Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
-		} else {
-			testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
-					config.getString("MatchTest#ShipStrategy", ""));
-		}
-
-		FileDataSink output = new FileDataSink(
-				new ContractITCaseOutputFormat(), resultPath);
-		output.setParallelism(1);
-
-		output.setInput(testMatcher);
-		testMatcher.setFirstInput(input_left);
-		testMatcher.setSecondInput(input_right);
-
-		return new Plan(output);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(RESULT, resultPath);
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE,
-				Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST, Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND };
-
-		String[] shipStrategies = { Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH, "BROADCAST_FIRST", "BROADCAST_SECOND"};
-
-		for (String localStrategy : localStrategies) {
-			for (String shipStrategy : shipStrategies) {
-
-				Configuration config = new Configuration();
-				config.setString("MatchTest#LocalStrategy", localStrategy);
-				config.setString("MatchTest#ShipStrategy", shipStrategy);
-				config.setInteger("MatchTest#NoSubtasks", 4);
-
-				tConfigs.add(config);
-			}
-		}
-
-		return toParameterList(tConfigs);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
deleted file mode 100644
index 28b9501..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.operators;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class MapITCase extends RecordAPITestBase {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(MapITCase.class);
-
-	String inPath = null;
-	String resultPath = null;
-	
-	public MapITCase(Configuration testConfig) {
-		super(testConfig);
-	}
-
-	private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
-			"1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n" +
-			"5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
-	private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-
-	@Override
-	protected void preSubmit() throws Exception {
-		inPath = createTempFile("in.txt", IN);
-		resultPath = getTempDirPath("result");
-	}
-
-	public static class TestMapper extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private StringValue keyString = new StringValue();
-		private StringValue valueString = new StringValue();
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			keyString = record.getField(0, keyString);
-			valueString = record.getField(1, valueString);
-			
-			LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]");
-			
-			if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) {
-
-				record.setField(0, valueString);
-				record.setField(1, new IntValue(Integer.parseInt(keyString.toString()) + 10));
-				
-				out.collect(record);
-			}
-			
-		}
-	}
-
-	@Override
-	protected Plan getTestJob() {
-		FileDataSource input = new FileDataSource(
-				new ContractITCaseInputFormat(), inPath);
-		DelimitedInputFormat.configureDelimitedFormat(input)
-			.recordDelimiter('\n');
-		input.setParallelism(config.getInteger("MapTest#NoSubtasks", 1));
-
-		MapOperator testMapper = MapOperator.builder(new TestMapper()).build();
-		testMapper.setParallelism(config.getInteger("MapTest#NoSubtasks", 1));
-
-		FileDataSink output = new FileDataSink(
-				new ContractITCaseOutputFormat(), resultPath);
-		output.setParallelism(1);
-
-		output.setInput(testMapper);
-		testMapper.setInput(input);
-
-		return new Plan(output);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(RESULT, resultPath);
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-		LinkedList<Configuration> testConfigs = new LinkedList<Configuration>();
-
-		Configuration config = new Configuration();
-		config.setInteger("MapTest#NoSubtasks", 4);
-		testConfigs.add(config);
-
-		return toParameterList(testConfigs);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
deleted file mode 100644
index 3cb5580..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/MapPartitionITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.operators;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("serial")
-public class MapPartitionITCase extends JavaProgramTestBase {
-
-	private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n"
-			+ "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n"
-			+ "5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
-	private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-	
-	
-	private List<Tuple2<String, String>> input = new ArrayList<Tuple2<String,String>>();
-	
-	private List<Tuple2<String, Integer>> expected = new ArrayList<Tuple2<String,Integer>>();
-	
-	private List<Tuple2<String, Integer>> result = new ArrayList<Tuple2<String,Integer>>();
-	
-
-	@Override
-	protected void preSubmit() throws Exception {
-
-		// create input
-		for (String s :IN.split("\n")) {
-			String[] fields = s.split(" ");
-			input.add(new Tuple2<String, String>(fields[0], fields[1]));
-		}
-		
-		// create expected
-		for (String s : RESULT.split("\n")) {
-			String[] fields = s.split(" ");
-			expected.add(new Tuple2<String, Integer>(fields[0], Integer.parseInt(fields[1])));
-		}
-		
-	}
-	
-	@Override
-	protected void postSubmit() {
-		compareResultCollections(expected, result, new TupleComparator<Tuple2<String, Integer>>());
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		DataSet<Tuple2<String, String>> data = env.fromCollection(input);
-		
-		data.mapPartition(new TestMapPartition()).output(new LocalCollectionOutputFormat<Tuple2<String,Integer>>(result));
-		
-		env.execute();
-	}
-	
-	
-	public static class TestMapPartition implements MapPartitionFunction<Tuple2<String, String>, Tuple2<String, Integer>> {
-
-		@Override
-		public void mapPartition(Iterable<Tuple2<String, String>> values, Collector<Tuple2<String, Integer>> out) {
-			for (Tuple2<String, String> value : values) {
-				String keyString = value.f0;
-				String valueString = value.f1;
-				
-				int keyInt = Integer.parseInt(keyString);
-				int valueInt = Integer.parseInt(valueString);
-
-				if (keyInt + valueInt < 10) {
-					out.collect(new Tuple2<String, Integer>(valueString, keyInt + 10));
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
deleted file mode 100644
index faf6de5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.operators;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * These check whether the object-reuse execution mode does really reuse objects.
- */
-@RunWith(Parameterized.class)
-public class ObjectReuseITCase extends JavaProgramTestBase {
-
-	private static int NUM_PROGRAMS = 4;
-
-	private int curProgId = config.getInteger("ProgramId", -1);
-	private String resultPath;
-	private String expectedResult;
-
-	private static String inReducePath;
-	private static String inGroupReducePath;
-
-	private String IN_REDUCE = "a,1\na,2\na,3\na,4\na,50\n";
-	private String IN_GROUP_REDUCE = "a,1\na,2\na,3\na,4\na,5\n";
-
-	public ObjectReuseITCase(Configuration config) {
-		super(config);
-	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		inReducePath = createTempFile("in_reduce.txt", IN_REDUCE);
-		inGroupReducePath = createTempFile("in_group_reduce.txt", IN_GROUP_REDUCE);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = Progs.runProgram(curProgId, resultPath);
-	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Override
-	protected boolean skipCollectionExecution() {
-		return true;
-	}
-
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
-	}
-	
-	@SuppressWarnings({"unchecked", "serial"})
-	private static class Progs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-
-			case 1: {
-
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.getConfig().enableObjectReuse();
-
-				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1);
-				DataSet<Tuple2<String, Integer>> result = input.groupBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
-
-					@Override
-					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws
-							Exception {
-						value2.f1 += value1.f1;
-						return value2;
-					}
-
-				});
-
-				result.writeAsCsv(resultPath);
-				env.execute();
-
-				// return expected result
-				return "a,100\n";
-
-			}
-
-			case 2: {
-
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.getConfig().enableObjectReuse();
-
-				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inReducePath).types(String.class, Integer.class).setParallelism(1);
-
-				DataSet<Tuple2<String, Integer>> result = input
-						.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
-
-							@Override
-							public Tuple2<String, Integer> reduce(
-									Tuple2<String, Integer> value1,
-									Tuple2<String, Integer> value2) throws Exception {
-								value2.f1 += value1.f1;
-								return value2;
-							}
-
-						});
-
-				result.writeAsCsv(resultPath);
-				env.execute();
-
-				// return expected result
-				return "a,100\n";
-
-			}
-
-			case 3: {
-
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.getConfig().enableObjectReuse();
-
-				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1);
-
-				DataSet<Tuple2<String, Integer>> result = input.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
-
-					@Override
-					public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
-						List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
-						for (Tuple2<String, Integer> val : values) {
-							list.add(val);
-						}
-
-						for (Tuple2<String, Integer> val : list) {
-							out.collect(val);
-						}
-					}
-
-				});
-
-				result.writeAsCsv(resultPath);
-				env.execute();
-
-				// return expected result
-				return "a,4\n" +
-						"a,4\n" +
-						"a,5\n" +
-						"a,5\n" +
-						"a,5\n";
-
-			}
-
-			case 4: {
-
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				env.getConfig().enableObjectReuse();
-
-				DataSet<Tuple2<String, Integer>> input = env.readCsvFile(inGroupReducePath).types(String.class, Integer.class).setParallelism(1);
-
-				DataSet<Tuple2<String, Integer>> result = input.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
-
-					@Override
-					public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) throws Exception {
-						List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
-						for (Tuple2<String, Integer> val : values) {
-							list.add(val);
-						}
-
-						for (Tuple2<String, Integer> val : list) {
-							out.collect(val);
-						}
-					}
-
-				});
-
-				result.writeAsCsv(resultPath);
-				env.execute();
-
-				// return expected result
-				return "a,4\n" +
-						"a,4\n" +
-						"a,5\n" +
-						"a,5\n" +
-						"a,5\n";
-
-			}
-
-			default:
-				throw new IllegalArgumentException("Invalid program id");
-			}
-			
-		}
-	
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
deleted file mode 100644
index f5511c8..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class ReduceITCase extends RecordAPITestBase {
-
-	String inPath = null;
-	String resultPath = null;
-
-	public ReduceITCase(Configuration testConfig) {
-		super(testConfig);
-	}
-
-	private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n1 1\n" +
-			"2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n5 9\n7 7\n8 8\n" +
-			"1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
-	private static final String RESULT = "1 4\n2 18\n3 0\n4 28\n5 27\n6 15\n7 21\n8 32\n9 1\n";
-
-	@Override
-	protected void preSubmit() throws Exception {
-		inPath = createTempFile("in.txt", IN);
-		resultPath = getTempDirPath("result");
-	}
-
-	@ReduceOperator.Combinable
-	public static class TestReducer extends ReduceFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		private StringValue reduceValue = new StringValue();
-		private StringValue combineValue = new StringValue();
-
-		@Override
-		public void combine(Iterator<Record> records, Collector<Record> out) {
-			Record record = null;
-			int sum = 0;
-			
-			while (records.hasNext()) {
-				record = records.next();
-				combineValue = record.getField(1, combineValue);
-				sum += Integer.parseInt(combineValue.toString());
-			}
-			combineValue.setValue(sum + "");
-			record.setField(1, combineValue);
-			out.collect(record);
-		}
-
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) {
-			Record record = null;
-			int sum = 0;
-			
-			while (records.hasNext()) {
-				record = records.next();
-				reduceValue = record.getField(1, reduceValue);
-				sum += Integer.parseInt(reduceValue.toString());
-			}
-			record.setField(1, new IntValue(sum));
-			out.collect(record);
-		}
-	}
-
-	@Override
-	protected JobGraph getJobGraph() throws Exception {
-		FileDataSource input = new FileDataSource(
-				new ContractITCaseInputFormat(), inPath);
-		DelimitedInputFormat.configureDelimitedFormat(input)
-			.recordDelimiter('\n');
-		input.setParallelism(config.getInteger("ReduceTest#NoSubtasks", 1));
-
-		ReduceOperator testReducer = ReduceOperator.builder(new TestReducer(), StringValue.class, 0)
-			.build();
-		testReducer.setParallelism(config.getInteger("ReduceTest#NoSubtasks", 1));
-		testReducer.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
-				config.getString("ReduceTest#LocalStrategy", ""));
-		testReducer.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
-				config.getString("ReduceTest#ShipStrategy", ""));
-
-		FileDataSink output = new FileDataSink(
-				new ContractITCaseOutputFormat(), resultPath);
-		output.setParallelism(1);
-
-		output.setInput(testReducer);
-		testReducer.setInput(input);
-
-		Plan plan = new Plan(output);
-		plan.setExecutionConfig(new ExecutionConfig());
-		Optimizer pc = new Optimizer(new DataStatistics(), this.config);
-		OptimizedPlan op = pc.compile(plan);
-
-		JobGraphGenerator jgg = new JobGraphGenerator();
-		return jgg.compileJobGraph(op);
-
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(RESULT, resultPath);
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT };
-		String[] shipStrategies = { Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH };
-
-		for (String localStrategy : localStrategies) {
-			for (String shipStrategy : shipStrategies) {
-
-				Configuration config = new Configuration();
-				config.setString("ReduceTest#LocalStrategy", localStrategy);
-				config.setString("ReduceTest#ShipStrategy", shipStrategy);
-				config.setInteger("ReduceTest#NoSubtasks", 4);
-				tConfigs.add(config);
-			}
-		}
-
-		return toParameterList(tConfigs);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java
deleted file mode 100644
index b833421..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionITCase.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.operators;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.LinkedList;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class UnionITCase extends RecordAPITestBase {
-	private static final Logger LOG = LoggerFactory.getLogger(UnionITCase.class);
-
-	String inPath = null;
-	String emptyInPath = null;
-	String resultPath = null;
-	
-	public UnionITCase(Configuration testConfig) {
-		super(testConfig);
-	}
-
-	private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n1 1\n" +
-			"2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n5 9\n7 7\n8 8\n" +
-			"1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
-	private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n" +
-			"4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-
-	private static final String EMPTY_RESULT = "";
-	
-	private static final String MAP_RESULT_TWICE = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n" +
-												"1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		inPath = createTempFile("in.txt", IN);
-		emptyInPath = createTempFile("empty_in.txt", "");
-		resultPath = getTempDirPath("result");
-	}
-
-	public static class TestMapper extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		private StringValue keyString = new StringValue();
-		private StringValue valueString = new StringValue();
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			keyString = record.getField(0, keyString);
-			valueString = record.getField(1, valueString);
-			
-			if (LOG.isDebugEnabled())
-				LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]");
-			
-			if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) {
-
-				record.setField(0, valueString);
-				record.setField(1, new IntValue(Integer.parseInt(keyString.toString()) + 10));
-				
-				out.collect(record);
-			}
-			
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected Plan getTestJob() {
-		String input1Path = config.getString("UnionTest#Input1Path", "").equals("empty") ? emptyInPath : inPath;
-		String input2Path = config.getString("UnionTest#Input2Path", "").equals("empty") ? emptyInPath : inPath;
-
-		FileDataSource input1 = new FileDataSource(
-			new ContractITCaseInputFormat(), input1Path);
-		DelimitedInputFormat.configureDelimitedFormat(input1)
-			.recordDelimiter('\n');
-		input1.setParallelism(config.getInteger("UnionTest#NoSubtasks", 1));
-		
-		FileDataSource input2 = new FileDataSource(
-				new ContractITCaseInputFormat(), input2Path);
-		DelimitedInputFormat.configureDelimitedFormat(input2)
-			.recordDelimiter('\n');
-		input2.setParallelism(config.getInteger("UnionTest#NoSubtasks", 1));
-		
-		MapOperator testMapper = MapOperator.builder(new TestMapper()).build();
-		testMapper.setParallelism(config.getInteger("UnionTest#NoSubtasks", 1));
-
-		FileDataSink output = new FileDataSink(
-				new ContractITCaseOutputFormat(), resultPath);
-		output.setParallelism(1);
-
-		output.setInput(testMapper);
-
-		testMapper.addInput(input1);
-		testMapper.addInput(input2);
-
-		return new Plan(output);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(config.getString("UnionTest#ExpectedResult", ""), resultPath);
-
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws IOException {
-		LinkedList<Configuration> testConfigs = new LinkedList<Configuration>();
-
-		//second input empty
-		Configuration config = new Configuration();
-		config.setInteger("UnionTest#NoSubtasks", 4);
-		config.setString("UnionTest#ExpectedResult", RESULT);
-		config.setString("UnionTest#Input1Path", "non-empty");
-		config.setString("UnionTest#Input2Path", "empty");
-		testConfigs.add(config);
-		
-		
-		//first input empty
-		config = new Configuration();
-		config.setInteger("UnionTest#NoSubtasks", 4);
-		config.setString("UnionTest#ExpectedResult", RESULT);
-		config.setString("UnionTest#Input1Path", "empty");
-		config.setString("UnionTest#Input2Path", "non-empty");
-		testConfigs.add(config);
-		
-		//both inputs full
-		config = new Configuration();
-		config.setInteger("UnionTest#NoSubtasks", 4);
-		config.setString("UnionTest#ExpectedResult", MAP_RESULT_TWICE);
-		config.setString("UnionTest#Input1Path", "non-empty");
-		config.setString("UnionTest#Input2Path", "non-empty");
-		testConfigs.add(config);
-		
-		//both inputs empty
-		config = new Configuration();
-		config.setInteger("UnionTest#NoSubtasks", 4);
-		config.setString("UnionTest#ExpectedResult", EMPTY_RESULT);
-		config.setString("UnionTest#Input1Path", "empty");
-		config.setString("UnionTest#Input2Path", "empty");
-		testConfigs.add(config);
-
-		return toParameterList(testConfigs);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
deleted file mode 100644
index 35cb8af..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.operators;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
-import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class UnionSinkITCase extends RecordAPITestBase {
-	
-	public UnionSinkITCase(Configuration testConfig) {
-		super(testConfig);
-		setTaskManagerNumSlots(parallelism);
-	}
-
-	private static final String MAP_IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" +
-	                                     "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n" +
-	                                     "1 1\n2 2\n2 2\n3 0\n4 4\n5 9\n7 7\n8 8\n" +
-	                                     "1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
-	private static final String MAP_RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-
-	private static final String EMPTY_MAP_RESULT = "";
-	
-	private static final String MAP_RESULT_TWICE = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n" +
-												"1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-	
-	private String textInput;
-	
-	private String emptyInput;
-	
-	private String resultDir;
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		textInput = createTempFile("textdata.txt", MAP_IN);
-		emptyInput = createTempFile("emptyfile.txt", "");
-		resultDir = getTempDirPath("result");
-	}
-
-	public static class TestMapper extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		private StringValue keyString = new StringValue();
-		private StringValue valueString = new StringValue();
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			keyString = record.getField(0, keyString);
-			valueString = record.getField(1, valueString);
-			
-			
-			if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) {
-
-				record.setField(0, valueString);
-				record.setField(1, new IntValue(Integer.parseInt(keyString.toString()) + 10));
-				
-				out.collect(record);
-			}
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected JobGraph getJobGraph() throws Exception {
-		
-		String path1 = config.getBoolean("input1PathHasData", false) ? textInput : emptyInput;
-		String path2 = config.getBoolean("input2PathHasData", false) ? textInput : emptyInput;
-		
-		FileDataSource input1 = new FileDataSource(new ContractITCaseInputFormat(), path1);
-		FileDataSource input2 = new FileDataSource(new ContractITCaseInputFormat(), path2);
-		
-		MapOperator testMapper1 = MapOperator.builder(new TestMapper()).build();
-		MapOperator testMapper2 = MapOperator.builder(new TestMapper()).build();
-
-		FileDataSink output = new FileDataSink(new ContractITCaseOutputFormat(), resultDir);
-
-		testMapper1.setInput(input1);
-		testMapper2.setInput(input2);
-
-		output.addInput(testMapper1);
-		output.addInput(testMapper2);
-		
-		Plan plan = new Plan(output);
-		plan.setExecutionConfig(new ExecutionConfig());
-		plan.setDefaultParallelism(parallelism);
-
-		Optimizer pc = new Optimizer(new DataStatistics(), this.config);
-		OptimizedPlan op = pc.compile(plan);
-
-		JobGraphGenerator jgg = new JobGraphGenerator();
-		return jgg.compileJobGraph(op);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		String expectedResult = config.getString("UnionTest#ExpectedResult", null);
-		if (expectedResult == null) {
-			throw new Exception("Test corrupt, no expected return data set.");
-		}
-		compareResultsByLinesInMemory(expectedResult, resultDir);
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() {
-
-		//second input empty
-		Configuration config1 = new Configuration();
-		config1.setString("UnionTest#ExpectedResult", MAP_RESULT);
-		config1.setBoolean("input1PathHasData", true);
-		config1.setBoolean("input2PathHasData", false);
-		
-		
-		//first input empty
-		Configuration config2 = new Configuration();
-		config2.setString("UnionTest#ExpectedResult", MAP_RESULT);
-		config2.setBoolean("input1PathHasData", false);
-		config2.setBoolean("input2PathHasData", true);
-		
-		//both inputs full
-		Configuration config3 = new Configuration();
-		config3.setString("UnionTest#ExpectedResult", MAP_RESULT_TWICE);
-		config3.setBoolean("input1PathHasData", true);
-		config3.setBoolean("input2PathHasData", true);
-		
-		//both inputs empty
-		Configuration config4 = new Configuration();
-		config4.setString("UnionTest#ExpectedResult", EMPTY_MAP_RESULT);
-		config4.setBoolean("input1PathHasData", false);
-		config4.setBoolean("input2PathHasData", false);
-
-		return toParameterList(config1, config2, config3, config4);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/operators/io/ContractITCaseIOFormats.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/io/ContractITCaseIOFormats.java b/flink-tests/src/test/java/org/apache/flink/test/operators/io/ContractITCaseIOFormats.java
deleted file mode 100644
index 7649299..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/io/ContractITCaseIOFormats.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.operators.io;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-
-public class ContractITCaseIOFormats {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ContractITCaseIOFormats.class);
-	
-	public static class ContractITCaseInputFormat extends DelimitedInputFormat {
-		private static final long serialVersionUID = 1L;
-
-		private final StringValue keyString = new StringValue();
-		private final StringValue valueString = new StringValue();
-		
-		@Override
-		public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
-			this.keyString.setValueAscii(bytes, offset, 1);
-			this.valueString.setValueAscii(bytes, offset + 2, 1);
-			target.setField(0, keyString);
-			target.setField(1, valueString);
-			
-			if (LOG.isDebugEnabled())
-				LOG.debug("Read in: [" + keyString.getValue() + "," + valueString.getValue() + "]");
-			
-			return target;
-		}
-	}
-
-	public static class ContractITCaseOutputFormat extends FileOutputFormat {
-		private static final long serialVersionUID = 1L;
-		
-		private final StringBuilder buffer = new StringBuilder();
-		private final StringValue keyString = new StringValue();
-		private final IntValue valueInteger = new IntValue();
-		
-		
-		public ContractITCaseOutputFormat() {
-			setWriteMode(WriteMode.OVERWRITE);
-		}
-		
-		@Override
-		public void writeRecord(Record record) throws IOException {
-			this.buffer.setLength(0);
-			this.buffer.append(record.getField(0, keyString).toString());
-			this.buffer.append(' ');
-			this.buffer.append(record.getField(1, valueInteger).getValue());
-			this.buffer.append('\n');
-			
-			byte[] bytes = this.buffer.toString().getBytes();
-			
-			if (LOG.isDebugEnabled())
-				LOG.debug("Writing out: [" + keyString.toString() + "," + valueInteger.getValue() + "]");
-			
-			this.stream.write(bytes);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/50ced5e3/flink-tests/src/test/java/org/apache/flink/test/util/ContractITCaseIOFormats.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/ContractITCaseIOFormats.java b/flink-tests/src/test/java/org/apache/flink/test/util/ContractITCaseIOFormats.java
new file mode 100644
index 0000000..97aaaab
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/ContractITCaseIOFormats.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.util;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.java.record.io.DelimitedInputFormat;
+import org.apache.flink.api.java.record.io.FileOutputFormat;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+
+public class ContractITCaseIOFormats {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ContractITCaseIOFormats.class);
+
+	public static class ContractITCaseInputFormat extends DelimitedInputFormat {
+		private static final long serialVersionUID = 1L;
+
+		private final StringValue keyString = new StringValue();
+		private final StringValue valueString = new StringValue();
+
+		@Override
+		public Record readRecord(Record target, byte[] bytes, int offset, int numBytes) {
+			this.keyString.setValueAscii(bytes, offset, 1);
+			this.valueString.setValueAscii(bytes, offset + 2, 1);
+			target.setField(0, keyString);
+			target.setField(1, valueString);
+
+			if (LOG.isDebugEnabled())
+				LOG.debug("Read in: [" + keyString.getValue() + "," + valueString.getValue() + "]");
+
+			return target;
+		}
+	}
+
+	public static class ContractITCaseOutputFormat extends FileOutputFormat {
+		private static final long serialVersionUID = 1L;
+
+		private final StringBuilder buffer = new StringBuilder();
+		private final StringValue keyString = new StringValue();
+		private final IntValue valueInteger = new IntValue();
+
+
+		public ContractITCaseOutputFormat() {
+			setWriteMode(WriteMode.OVERWRITE);
+		}
+
+		@Override
+		public void writeRecord(Record record) throws IOException {
+			this.buffer.setLength(0);
+			this.buffer.append(record.getField(0, keyString).toString());
+			this.buffer.append(' ');
+			this.buffer.append(record.getField(1, valueInteger).getValue());
+			this.buffer.append('\n');
+
+			byte[] bytes = this.buffer.toString().getBytes();
+
+			if (LOG.isDebugEnabled())
+				LOG.debug("Writing out: [" + keyString.toString() + "," + valueInteger.getValue() + "]");
+
+			this.stream.write(bytes);
+		}
+	}
+}