You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:13 UTC

[13/22] flink git commit: [FLINK-6731] [tests] Activate strict checkstyle for flink-tests

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
deleted file mode 100644
index fb3e589..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
+++ /dev/null
@@ -1,514 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Assert;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-@RunWith(Parameterized.class)
-public class MapITCase extends MultipleProgramsTestBase {
-
-	public MapITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test
-	public void testIdentityMapWithBasicType() throws Exception {
-		/*
-		 * Test identity map with basic type
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
-		DataSet<String> identityMapDs = ds.
-				map(new Mapper1());
-
-		List<String> result = identityMapDs.collect();
-
-		String expected = "Hi\n" +
-				"Hello\n" +
-				"Hello world\n" +
-				"Hello world, how are you?\n" +
-				"I am fine.\n" +
-				"Luke Skywalker\n" +
-				"Random comment\n" +
-				"LOL\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testRuntimeContextAndExecutionConfigParams() throws Exception {
-		/*
-		 * Test identity map with basic type
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().setNumberOfExecutionRetries(1000);
-		env.getConfig().setTaskCancellationInterval(50000);
-
-		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
-		DataSet<String> identityMapDs = ds.
-			map(new RichMapFunction<String, String>() {
-				@Override
-				public String map(String value) throws Exception {
-					Assert.assertTrue(1000 == getRuntimeContext().getExecutionConfig().getNumberOfExecutionRetries());
-					Assert.assertTrue(50000 == getRuntimeContext().getExecutionConfig().getTaskCancellationInterval());
-					return value;
-				}
-			});
-
-		List<String> result = identityMapDs.collect();
-
-		String expected = "Hi\n" +
-			"Hello\n" +
-			"Hello world\n" +
-			"Hello world, how are you?\n" +
-			"I am fine.\n" +
-			"Luke Skywalker\n" +
-			"Random comment\n" +
-			"LOL\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class Mapper1 implements MapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map(String value) throws Exception {
-			return value;
-		}
-	}
-
-	@Test
-	public void testIdentityMapWithTuple() throws Exception {
-		/*
-		 * Test identity map with a tuple
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds.
-				map(new Mapper2());
-
-		List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
-
-		String expected = "1,1,Hi\n" +
-				"2,2,Hello\n" +
-				"3,2,Hello world\n" +
-				"4,3,Hello world, how are you?\n" +
-				"5,3,I am fine.\n" +
-				"6,3,Luke Skywalker\n" +
-				"7,4,Comment#1\n" +
-				"8,4,Comment#2\n" +
-				"9,4,Comment#3\n" +
-				"10,4,Comment#4\n" +
-				"11,5,Comment#5\n" +
-				"12,5,Comment#6\n" +
-				"13,5,Comment#7\n" +
-				"14,5,Comment#8\n" +
-				"15,5,Comment#9\n" +
-				"16,6,Comment#10\n" +
-				"17,6,Comment#11\n" +
-				"18,6,Comment#12\n" +
-				"19,6,Comment#13\n" +
-				"20,6,Comment#14\n" +
-				"21,6,Comment#15\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class Mapper2 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
-				throws Exception {
-			return value;
-		}
-	}
-
-	@Test
-	public void testTypeConversionMapperCustomToTuple() throws Exception {
-		/*
-		 * Test type conversion mapper (Custom -> Tuple)
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> typeConversionMapDs = ds.
-				map(new Mapper3());
-
-		List<Tuple3<Integer, Long, String>> result = typeConversionMapDs.collect();
-
-		String expected = "1,0,Hi\n" +
-				"2,1,Hello\n" +
-				"2,2,Hello world\n" +
-				"3,3,Hello world, how are you?\n" +
-				"3,4,I am fine.\n" +
-				"3,5,Luke Skywalker\n" +
-				"4,6,Comment#1\n" +
-				"4,7,Comment#2\n" +
-				"4,8,Comment#3\n" +
-				"4,9,Comment#4\n" +
-				"5,10,Comment#5\n" +
-				"5,11,Comment#6\n" +
-				"5,12,Comment#7\n" +
-				"5,13,Comment#8\n" +
-				"5,14,Comment#9\n" +
-				"6,15,Comment#10\n" +
-				"6,16,Comment#11\n" +
-				"6,17,Comment#12\n" +
-				"6,18,Comment#13\n" +
-				"6,19,Comment#14\n" +
-				"6,20,Comment#15\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class Mapper3 implements MapFunction<CustomType, Tuple3<Integer, Long, String>> {
-		private static final long serialVersionUID = 1L;
-		private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
-
-		@Override
-		public Tuple3<Integer, Long, String> map(CustomType value) throws Exception {
-			out.setField(value.myInt, 0);
-			out.setField(value.myLong, 1);
-			out.setField(value.myString, 2);
-			return out;
-		}
-	}
-
-	@Test
-	public void testTypeConversionMapperTupleToBasic() throws Exception {
-		/*
-		 * Test type conversion mapper (Tuple -> Basic)
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<String> typeConversionMapDs = ds.
-				map(new Mapper4());
-
-		List<String> result = typeConversionMapDs.collect();
-
-		String expected = "Hi\n" + "Hello\n" + "Hello world\n" +
-				"Hello world, how are you?\n" +
-				"I am fine.\n" + "Luke Skywalker\n" +
-				"Comment#1\n" +	"Comment#2\n" +
-				"Comment#3\n" +	"Comment#4\n" +
-				"Comment#5\n" +	"Comment#6\n" +
-				"Comment#7\n" + "Comment#8\n" +
-				"Comment#9\n" +	"Comment#10\n" +
-				"Comment#11\n" + "Comment#12\n" +
-				"Comment#13\n" + "Comment#14\n" +
-				"Comment#15\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class Mapper4 implements MapFunction<Tuple3<Integer, Long, String>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map(Tuple3<Integer, Long, String> value) throws Exception {
-			return value.getField(2);
-		}
-	}
-
-	@Test
-	public void testMapperOnTupleIncrementIntegerFieldReorderSecondAndThirdFields() throws
-	Exception {
-		/*
-		 * Test mapper on tuple - Increment Integer field, reorder second and third fields
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, String, Long>> tupleMapDs = ds.
-				map(new Mapper5());
-
-		List<Tuple3<Integer, String, Long>> result = tupleMapDs.collect();
-
-		String expected = "2,Hi,1\n" +
-				"3,Hello,2\n" +
-				"4,Hello world,2\n" +
-				"5,Hello world, how are you?,3\n" +
-				"6,I am fine.,3\n" +
-				"7,Luke Skywalker,3\n" +
-				"8,Comment#1,4\n" +
-				"9,Comment#2,4\n" +
-				"10,Comment#3,4\n" +
-				"11,Comment#4,4\n" +
-				"12,Comment#5,5\n" +
-				"13,Comment#6,5\n" +
-				"14,Comment#7,5\n" +
-				"15,Comment#8,5\n" +
-				"16,Comment#9,5\n" +
-				"17,Comment#10,6\n" +
-				"18,Comment#11,6\n" +
-				"19,Comment#12,6\n" +
-				"20,Comment#13,6\n" +
-				"21,Comment#14,6\n" +
-				"22,Comment#15,6\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class Mapper5 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, String, Long>> {
-		private static final long serialVersionUID = 1L;
-		private final Tuple3<Integer, String, Long> out = new Tuple3<Integer, String, Long>();
-
-		@Override
-		public Tuple3<Integer, String, Long> map(Tuple3<Integer, Long, String> value)
-				throws Exception {
-			Integer incr = Integer.valueOf(value.f0.intValue() + 1);
-			out.setFields(incr, value.f2, value.f1);
-			return out;
-		}
-	}
-
-	@Test
-	public void testMapperOnCustomLowercaseString() throws Exception {
-		/*
-		 * Test mapper on Custom - lowercase myString
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-		DataSet<CustomType> customMapDs = ds.
-				map(new Mapper6());
-
-		List<CustomType> result = customMapDs.collect();
-
-		String expected = "1,0,hi\n" +
-				"2,1,hello\n" +
-				"2,2,hello world\n" +
-				"3,3,hello world, how are you?\n" +
-				"3,4,i am fine.\n" +
-				"3,5,luke skywalker\n" +
-				"4,6,comment#1\n" +
-				"4,7,comment#2\n" +
-				"4,8,comment#3\n" +
-				"4,9,comment#4\n" +
-				"5,10,comment#5\n" +
-				"5,11,comment#6\n" +
-				"5,12,comment#7\n" +
-				"5,13,comment#8\n" +
-				"5,14,comment#9\n" +
-				"6,15,comment#10\n" +
-				"6,16,comment#11\n" +
-				"6,17,comment#12\n" +
-				"6,18,comment#13\n" +
-				"6,19,comment#14\n" +
-				"6,20,comment#15\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class Mapper6 implements MapFunction<CustomType, CustomType> {
-		private static final long serialVersionUID = 1L;
-		private final CustomType out = new CustomType();
-
-		@Override
-		public CustomType map(CustomType value) throws Exception {
-			out.myInt = value.myInt;
-			out.myLong = value.myLong;
-			out.myString = value.myString.toLowerCase();
-			return out;
-		}
-	}
-
-	@Test
-	public void test() throws Exception {
-		/*
-		 * Test mapper if UDF returns input object - increment first field of a tuple
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> inputObjMapDs = ds.
-				map(new Mapper7());
-
-		List<Tuple3<Integer, Long, String>> result = inputObjMapDs.collect();
-
-		String expected = "2,1,Hi\n" +
-				"3,2,Hello\n" +
-				"4,2,Hello world\n" +
-				"5,3,Hello world, how are you?\n" +
-				"6,3,I am fine.\n" +
-				"7,3,Luke Skywalker\n" +
-				"8,4,Comment#1\n" +
-				"9,4,Comment#2\n" +
-				"10,4,Comment#3\n" +
-				"11,4,Comment#4\n" +
-				"12,5,Comment#5\n" +
-				"13,5,Comment#6\n" +
-				"14,5,Comment#7\n" +
-				"15,5,Comment#8\n" +
-				"16,5,Comment#9\n" +
-				"17,6,Comment#10\n" +
-				"18,6,Comment#11\n" +
-				"19,6,Comment#12\n" +
-				"20,6,Comment#13\n" +
-				"21,6,Comment#14\n" +
-				"22,6,Comment#15\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class Mapper7 implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
-				throws Exception {
-			Integer incr = Integer.valueOf(value.f0.intValue() + 1);
-			value.setField(incr, 0);
-			return value;
-		}
-	}
-
-	@Test
-	public void testMapWithBroadcastSet() throws Exception {
-		/*
-		 * Test map with broadcast set
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
-				map(new RichMapper1()).withBroadcastSet(ints, "ints");
-		List<Tuple3<Integer, Long, String>> result = bcMapDs.collect();
-
-		String expected = "55,1,Hi\n" +
-				"55,2,Hello\n" +
-				"55,2,Hello world\n" +
-				"55,3,Hello world, how are you?\n" +
-				"55,3,I am fine.\n" +
-				"55,3,Luke Skywalker\n" +
-				"55,4,Comment#1\n" +
-				"55,4,Comment#2\n" +
-				"55,4,Comment#3\n" +
-				"55,4,Comment#4\n" +
-				"55,5,Comment#5\n" +
-				"55,5,Comment#6\n" +
-				"55,5,Comment#7\n" +
-				"55,5,Comment#8\n" +
-				"55,5,Comment#9\n" +
-				"55,6,Comment#10\n" +
-				"55,6,Comment#11\n" +
-				"55,6,Comment#12\n" +
-				"55,6,Comment#13\n" +
-				"55,6,Comment#14\n" +
-				"55,6,Comment#15\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class RichMapper1 extends RichMapFunction<Tuple3<Integer,Long,String>,
-	Tuple3<Integer,	Long,String>> {
-		private static final long serialVersionUID = 1L;
-		private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
-		private Integer f2Replace = 0;
-
-		@Override
-		public void open(Configuration config) {
-			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
-			int sum = 0;
-			for(Integer i : ints) {
-				sum += i;
-			}
-			f2Replace = sum;
-		}
-
-		@Override
-		public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
-				throws Exception {
-			out.setFields(f2Replace, value.f1, value.f2);
-			return out;
-		}
-	}
-
-	static final String testKey = "testVariable";
-	static final int testValue = 666;
-
-	@Test
-	public void testPassingConfigurationObject() throws Exception {
-		/*
-		 * Test passing configuration object.
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-		Configuration conf = new Configuration();
-		conf.setInteger(testKey, testValue);
-		DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
-				map(new RichMapper2()).withParameters(conf);
-		List<Tuple3<Integer, Long, String>> result = bcMapDs.collect();
-
-		String expected = "1,1,Hi\n"
-				+ "2,2,Hello\n"
-				+ "3,2,Hello world";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class RichMapper2 extends RichMapFunction<Tuple3<Integer,Long,String>,
-	Tuple3<Integer,	Long,String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void open(Configuration config) {
-			int val = config.getInteger(testKey, -1);
-			Assert.assertEquals(testValue, val);
-		}
-
-		@Override
-		public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) {
-			return value;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapPartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapPartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapPartitionITCase.java
deleted file mode 100644
index cc895c2..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapPartitionITCase.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("serial")
-public class MapPartitionITCase extends JavaProgramTestBase {
-
-	private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n"
-			+ "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n"
-			+ "5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n";
-
-	private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n";
-	
-	
-	private List<Tuple2<String, String>> input = new ArrayList<Tuple2<String,String>>();
-	
-	private List<Tuple2<String, Integer>> expected = new ArrayList<Tuple2<String,Integer>>();
-	
-	private List<Tuple2<String, Integer>> result = new ArrayList<Tuple2<String,Integer>>();
-	
-
-	@Override
-	protected void preSubmit() throws Exception {
-
-		// create input
-		for (String s :IN.split("\n")) {
-			String[] fields = s.split(" ");
-			input.add(new Tuple2<String, String>(fields[0], fields[1]));
-		}
-		
-		// create expected
-		for (String s : RESULT.split("\n")) {
-			String[] fields = s.split(" ");
-			expected.add(new Tuple2<String, Integer>(fields[0], Integer.parseInt(fields[1])));
-		}
-		
-	}
-	
-	@Override
-	protected void postSubmit() {
-		compareResultCollections(expected, result, new TestBaseUtils.TupleComparator<Tuple2<String, Integer>>());
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		DataSet<Tuple2<String, String>> data = env.fromCollection(input);
-		
-		data.mapPartition(new TestMapPartition()).output(new LocalCollectionOutputFormat<Tuple2<String,Integer>>(result));
-		
-		env.execute();
-	}
-	
-	
-	public static class TestMapPartition implements MapPartitionFunction<Tuple2<String, String>, Tuple2<String, Integer>> {
-
-		@Override
-		public void mapPartition(Iterable<Tuple2<String, String>> values, Collector<Tuple2<String, Integer>> out) {
-			for (Tuple2<String, String> value : values) {
-				String keyString = value.f0;
-				String valueString = value.f1;
-				
-				int keyInt = Integer.parseInt(keyString);
-				int valueInt = Integer.parseInt(valueString);
-
-				if (keyInt + valueInt < 10) {
-					out.collect(new Tuple2<String, Integer>(valueString, keyInt + 10));
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
deleted file mode 100644
index 63abe63..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ObjectReuseITCase.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * These check whether the object-reuse execution mode does really reuse objects.
- */
-@SuppressWarnings("serial" )
-@RunWith(Parameterized.class)
-public class ObjectReuseITCase extends MultipleProgramsTestBase {
-
-	private static final List<Tuple2<String, Integer>> REDUCE_DATA =
-		Arrays.asList(
-			new Tuple2<>("a", 1), new Tuple2<>("a", 2),
-			new Tuple2<>("a", 3), new Tuple2<>("a", 4),
-			new Tuple2<>("a", 50));
-
-	private static final List<Tuple2<String, Integer>> GROUP_REDUCE_DATA =
-		Arrays.asList(
-			new Tuple2<>("a", 1), new Tuple2<>("a", 2),
-			new Tuple2<>("a", 3), new Tuple2<>("a", 4),
-			new Tuple2<>("a", 5));
-	
-	
-	private final boolean objectReuse;
-	
-	public ObjectReuseITCase(boolean objectReuse) {
-		super(TestExecutionMode.CLUSTER);
-		this.objectReuse = objectReuse;
-	}
-
-	@Test
-	public void testKeyedReduce() throws Exception {
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		if (objectReuse) {
-			env.getConfig().enableObjectReuse();
-		} else {
-			env.getConfig().disableObjectReuse();
-		}
-
-		DataSet<Tuple2<String, Integer>> input = env.fromCollection(REDUCE_DATA);
-		
-		DataSet<Tuple2<String, Integer>> result = input
-			.groupBy(0)
-			.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
-
-				@Override
-				public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
-					value2.f1 += value1.f1;
-					return value2;
-				}
-			});
-
-		Tuple2<String, Integer> res = result.collect().get(0);
-		assertEquals(new Tuple2<>("a", 60), res);
-	}
-
-	@Test
-	public void testGlobalReduce() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		if (objectReuse) {
-			env.getConfig().enableObjectReuse();
-		} else {
-			env.getConfig().disableObjectReuse();
-		}
-
-		DataSet<Tuple2<String, Integer>> input = env.fromCollection(REDUCE_DATA);
-
-		DataSet<Tuple2<String, Integer>> result = input.reduce(
-			new ReduceFunction<Tuple2<String, Integer>>() {
-
-				@Override
-				public Tuple2<String, Integer> reduce(
-						Tuple2<String, Integer> value1,
-						Tuple2<String, Integer> value2) {
-					
-					if (value1.f1 % 3 == 0) {
-						value1.f1 += value2.f1;
-						return value1;
-					} else {
-						value2.f1 += value1.f1;
-						return value2;
-					}
-				}
-
-			});
-
-		Tuple2<String, Integer> res = result.collect().get(0);
-		assertEquals(new Tuple2<>("a", 60), res);
-	}
-
-	@Test
-	public void testKeyedGroupReduce() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		if (objectReuse) {
-			env.getConfig().enableObjectReuse();
-		} else {
-			env.getConfig().disableObjectReuse();
-		}
-
-		DataSet<Tuple2<String, Integer>> input = env.fromCollection(GROUP_REDUCE_DATA);
-
-		DataSet<Tuple2<String, Integer>> result = input.groupBy(0).reduceGroup(
-			new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
-
-				@Override
-				public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
-					List<Tuple2<String, Integer>> list = new ArrayList<>();
-					for (Tuple2<String, Integer> val : values) {
-						list.add(val);
-					}
-
-					for (Tuple2<String, Integer> val : list) {
-						out.collect(val);
-					}
-				}
-			});
-
-		List<Tuple2<String, Integer>> is = result.collect();
-		Collections.sort(is, new TupleComparator<Tuple2<String, Integer>>());
-
-		List<Tuple2<String, Integer>> expected = env.getConfig().isObjectReuseEnabled() ?
-			Arrays.asList(new Tuple2<>("a", 4), new Tuple2<>("a", 4),
-				new Tuple2<>("a", 5), new Tuple2<>("a", 5), new Tuple2<>("a", 5)) :
-			Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("a", 2),
-				new Tuple2<>("a", 3), new Tuple2<>("a", 4), new Tuple2<>("a", 5));
-
-		assertEquals(expected, is);
-	}
-	
-	@Test
-	public void testGlobalGroupReduce() throws Exception {
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		if (objectReuse) {
-			env.getConfig().enableObjectReuse();
-		} else {
-			env.getConfig().disableObjectReuse();
-		}
-
-		DataSet<Tuple2<String, Integer>> input = env.fromCollection(GROUP_REDUCE_DATA);
-		
-		DataSet<Tuple2<String, Integer>> result = input.reduceGroup(
-			new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
-
-				@Override
-				public void reduce(Iterable<Tuple2<String, Integer>> values, Collector<Tuple2<String, Integer>> out) {
-					List<Tuple2<String, Integer>> list = new ArrayList<>();
-					for (Tuple2<String, Integer> val : values) {
-						list.add(val);
-					}
-	
-					for (Tuple2<String, Integer> val : list) {
-						out.collect(val);
-					}
-				}
-			});
-
-		List<Tuple2<String, Integer>> is = result.collect();
-		Collections.sort(is, new TupleComparator<Tuple2<String, Integer>>());
-
-		List<Tuple2<String, Integer>> expected = env.getConfig().isObjectReuseEnabled() ?
-			Arrays.asList(new Tuple2<>("a", 4), new Tuple2<>("a", 4),
-				new Tuple2<>("a", 5), new Tuple2<>("a", 5), new Tuple2<>("a", 5)) :
-			Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("a", 2),
-				new Tuple2<>("a", 3), new Tuple2<>("a", 4), new Tuple2<>("a", 5));
-		
-		assertEquals(expected, is);
-	}
-
-	@Parameterized.Parameters(name = "Execution mode = CLUSTER, Reuse = {0}")
-	public static Collection<Object[]> executionModes() {
-		return Arrays.asList(
-			new Object[] { false, },
-			new Object[] { true } );
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
deleted file mode 100644
index 5215a36..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
+++ /dev/null
@@ -1,680 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.RichFlatJoinFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Collection;
-import java.util.List;
-
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class OuterJoinITCase extends MultipleProgramsTestBase {
-
-	public OuterJoinITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testLeftOuterJoin1() throws Exception {
-		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
-	}
-
-	@Test
-	public void testLeftOuterJoin2() throws Exception {
-		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST);
-	}
-
-	@Test
-	public void testLeftOuterJoin3() throws Exception {
-		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
-	}
-
-	@Test
-	public void testLeftOuterJoin4() throws Exception {
-		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
-	}
-
-	@Test (expected = InvalidProgramException.class)
-	public void testLeftOuterJoin5() throws Exception {
-		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
-	}
-
-	private void testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
-		/*
-		 * UDF Join on tuples with key field positions
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple2<String, String>> joinDs =
-				ds1.leftOuterJoin(ds2, hint)
-						.where(0)
-						.equalTo(0)
-						.with(new T3T5FlatJoin());
-
-		List<Tuple2<String, String>> result = joinDs.collect();
-
-		String expected = "Hi,Hallo\n" +
-				"Hello,Hallo Welt\n" +
-				"Hello,Hallo Welt wie\n" +
-				"Hello world,null\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testRightOuterJoin1() throws Exception {
-		testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
-	}
-
-	@Test
-	public void testRightOuterJoin2() throws Exception {
-		testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST);
-	}
-
-	@Test
-	public void testRightOuterJoin3() throws Exception {
-		testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
-	}
-
-	@Test
-	public void testRightOuterJoin4() throws Exception {
-		testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
-	}
-
-	@Test (expected = InvalidProgramException.class)
-	public void testRightOuterJoin5() throws Exception {
-		testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
-	}
-
-	private void testRightOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
-		/*
-		 * UDF Join on tuples with key field positions
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple2<String, String>> joinDs =
-				ds1.rightOuterJoin(ds2, hint)
-						.where(1)
-						.equalTo(1)
-						.with(new T3T5FlatJoin());
-
-		List<Tuple2<String, String>> result = joinDs.collect();
-
-		String expected = "Hi,Hallo\n" +
-				"Hello,Hallo Welt\n" +
-				"null,Hallo Welt wie\n" +
-				"Hello world,Hallo Welt\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testFullOuterJoin1() throws Exception {
-		testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
-	}
-
-	@Test
-	public void testFullOuterJoin2() throws Exception {
-		testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST);
-	}
-
-	@Test
-	public void testFullOuterJoin3() throws Exception {
-		testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
-	}
-
-	@Test (expected = InvalidProgramException.class)
-	public void testFullOuterJoin4() throws Exception {
-		testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
-	}
-
-	@Test (expected = InvalidProgramException.class)
-	public void testFullOuterJoin5() throws Exception {
-		testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
-	}
-
-	private void testFullOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
-		/*
-		 * UDF Join on tuples with key field positions
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple2<String, String>> joinDs =
-				ds1.fullOuterJoin(ds2, hint)
-						.where(0)
-						.equalTo(2)
-						.with(new T3T5FlatJoin());
-
-		List<Tuple2<String, String>> result = joinDs.collect();
-
-		String expected = "null,Hallo\n" +
-				"Hi,Hallo Welt\n" +
-				"Hello,Hallo Welt wie\n" +
-				"Hello world,null\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testJoinOnTuplesWithCompositeKeyPositions() throws Exception {
-		/*
-		 * UDF Join on tuples with multiple key field positions
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple2<String, String>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where(0, 1)
-						.equalTo(0, 4)
-						.with(new T3T5FlatJoin());
-
-		List<Tuple2<String, String>> result = joinDs.collect();
-
-		String expected = "Hi,Hallo\n" +
-				"Hello,Hallo Welt\n" +
-				"Hello world,null\n" +
-				"null,Hallo Welt wie\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testJoinWithBroadcastSet() throws Exception {
-		/*
-		 * Join with broadcast set
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple3<String, String, Integer>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where(1)
-						.equalTo(4)
-						.with(new T3T5BCJoin())
-						.withBroadcastSet(intDs, "ints");
-
-		List<Tuple3<String, String, Integer>> result = joinDs.collect();
-
-		String expected = "Hi,Hallo,55\n" +
-				"Hi,Hallo Welt wie,55\n" +
-				"Hello,Hallo Welt,55\n" +
-				"Hello world,Hallo Welt,55\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testJoinWithMixedKeyTypes1() throws Exception {
-		/*
-		 * Join on a tuple input with key field selector and a custom type input with key extractor
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<CustomType> ds1 = CollectionDataSets.getSmallCustomTypeDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple2<String, String>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where(new KeySelector1())
-						.equalTo(0)
-						.with(new CustT3Join());
-
-		List<Tuple2<String, String>> result = joinDs.collect();
-
-		String expected = "Hi,Hi\n" +
-				"Hello,Hello\n" +
-				"Hello world,Hello\n" +
-				"null,Hello world\n";
-
-		compareResultAsTuples(result, expected);
-
-	}
-
-	public static class KeySelector1 implements KeySelector<CustomType, Integer> {
-		@Override
-		public Integer getKey(CustomType value) {
-			return value.myInt;
-		}
-	}
-
-
-	@Test
-	public void testJoinWithMixedKeyTypes2()
-			throws Exception {
-		/*
-		 * Join on a tuple input with key field selector and a custom type input with key extractor
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
-		DataSet<Tuple2<String, String>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where(1)
-						.equalTo(new KeySelector2())
-						.with(new T3CustJoin());
-
-		List<Tuple2<String, String>> result = joinDs.collect();
-
-		String expected = "null,Hi\n" +
-				"Hi,Hello\n" +
-				"Hello,Hello world\n" +
-				"Hello world,Hello world\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class KeySelector2 implements KeySelector<CustomType, Long> {
-		@Override
-		public Long getKey(CustomType value) {
-			return value.myLong;
-		}
-	}
-
-	@Test
-	public void testJoinWithTupleReturningKeySelectors() throws Exception {
-		/*
-		 * UDF Join on tuples with tuple-returning key selectors
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple2<String, String>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where(new KeySelector3()) //0, 1
-						.equalTo(new KeySelector4()) // 0, 4
-						.with(new T3T5FlatJoin());
-
-		List<Tuple2<String, String>> result = joinDs.collect();
-
-		String expected = "Hi,Hallo\n" +
-				"Hello,Hallo Welt\n" +
-				"Hello world,null\n" +
-				"null,Hallo Welt wie\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class KeySelector3 implements KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
-			return new Tuple2<>(t.f0, t.f1);
-		}
-	}
-
-	public static class KeySelector4 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
-			return new Tuple2<>(t.f0, t.f4);
-		}
-	}
-
-	@Test
-	public void testJoinWithNestedKeyExpression1() throws Exception {
-		/*
-		 * Join nested pojo against tuple (selected using a string)
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where("nestedPojo.longNumber")
-						.equalTo("f6")
-						.with(new ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>());
-
-		List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
-
-		String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
-				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
-				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testJoinWithNestedKeyExpression2() throws Exception {
-		/*
-		 * Join nested pojo against tuple (selected as an integer)
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where("nestedPojo.longNumber")
-						.equalTo(6) // <--- difference!
-						.with(new ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>());
-
-		List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
-
-		String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
-				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
-				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testJoinWithCompositeKeyExpressions() throws Exception {
-		/*
-		 * selecting multiple fields using expression language
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where("nestedPojo.longNumber", "number", "str")
-						.equalTo("f6", "f0", "f1")
-						.with(new ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>());
-
-		env.setParallelism(1);
-		List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
-
-		String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
-				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
-				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testNestedIntoTuple() throws Exception {
-		/*
-		 * nested into tuple
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where("nestedPojo.longNumber", "number", "nestedTupleWithCustom.f0")
-						.equalTo("f6", "f0", "f2")
-						.with(new ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>());
-
-		env.setParallelism(1);
-		List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
-
-		String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
-				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
-				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testNestedIntoTupleIntoPojo() throws Exception {
-		/*
-		 * nested into tuple into pojo
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-		DataSet<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where("nestedTupleWithCustom.f0", "nestedTupleWithCustom.f1.myInt", "nestedTupleWithCustom.f1.myLong")
-						.equalTo("f2", "f3", "f4")
-						.with(new ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>());
-
-		env.setParallelism(1);
-		List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
-
-		String expected = "1 First (10,100,1000,One) 10000,(1,First,10,100,1000,One,10000)\n" +
-				"2 Second (20,200,2000,Two) 20000,(2,Second,20,200,2000,Two,20000)\n" +
-				"3 Third (30,300,3000,Three) 30000,(3,Third,30,300,3000,Three,30000)\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testNonPojoToVerifyFullTupleKeys() throws Exception {
-		/*
-		 * Non-POJO test to verify that full-tuple keys are working.
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
-		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
-		DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where(0)
-						.equalTo("f0.f0", "f0.f1") // key is now Tuple2<Integer, Integer>
-						.with(new ProjectBothFunction<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>());
-
-		env.setParallelism(1);
-		List<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>> result = joinDs.collect();
-
-		String expected = "((1,1),one),((1,1),one)\n" +
-				"((2,2),two),((2,2),two)\n" +
-				"((3,3),three),((3,3),three)\n";
-
-		compareResultAsTuples(result, expected);
-
-	}
-
-	@Test
-	public void testNonPojoToVerifyNestedTupleElementSelection() throws Exception {
-		/*
-		 * Non-POJO test to verify "nested" tuple-element selection.
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds1 = CollectionDataSets.getSmallNestedTupleDataSet(env);
-		DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds2 = CollectionDataSets.getSmallNestedTupleDataSet(env);
-		DataSet<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where("f0.f0")
-						.equalTo("f0.f0") // key is now Integer from Tuple2<Integer, Integer>
-						.with(new ProjectBothFunction<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>());
-
-		env.setParallelism(1);
-		List<Tuple2<Tuple2<Tuple2<Integer, Integer>, String>, Tuple2<Tuple2<Integer, Integer>, String>>> result = joinDs.collect();
-
-		String expected = "((1,1),one),((1,1),one)\n" +
-				"((2,2),two),((2,2),two)\n" +
-				"((3,3),three),((3,3),three)\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testFullPojoWithFullTuple() throws Exception {
-		/*
-		 * full pojo with full tuple
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<POJO> ds1 = CollectionDataSets.getSmallPojoDataSet(env);
-		DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> ds2 = CollectionDataSets.getSmallTuplebasedDataSetMatchingPojo(env);
-		DataSet<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String>>> joinDs =
-				ds1.fullOuterJoin(ds2)
-						.where("*")
-						.equalTo("*")
-						.with(new ProjectBothFunction<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String>>());
-
-		env.setParallelism(1);
-		List<Tuple2<POJO, Tuple7<Long, Integer, Integer, Long, String, Integer, String>>> result = joinDs.collect();
-
-		String expected = "1 First (10,100,1000,One) 10000,(10000,10,100,1000,One,1,First)\n" +
-				"2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n" +
-				"3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testJoinWithAtomicType1() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Integer> ds2 = env.fromElements(1, 2);
-
-		DataSet<Tuple2<Tuple3<Integer, Long, String>, Integer>> joinDs = ds1
-				.fullOuterJoin(ds2)
-				.where(0)
-				.equalTo("*")
-				.with(new ProjectBothFunction<Tuple3<Integer, Long, String>, Integer>())
-				.returns("Tuple2<java.lang.Object,java.lang.Object>");
-
-		List<Tuple2<Tuple3<Integer, Long, String>, Integer>> result = joinDs.collect();
-
-		String expected = "(1,1,Hi),1\n" +
-				"(2,2,Hello),2\n" +
-				"(3,2,Hello world),null\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testJoinWithAtomicType2() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Integer> ds1 = env.fromElements(1, 2);
-		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
-
-		DataSet<Tuple2<Integer, Tuple3<Integer, Long, String>>> joinDs = ds1
-				.fullOuterJoin(ds2)
-				.where("*")
-				.equalTo(0)
-				.with(new ProjectBothFunction<Integer, Tuple3<Integer, Long, String>>())
-				.returns("Tuple2<java.lang.Object,java.lang.Object>");
-
-
-		List<Tuple2<Integer, Tuple3<Integer, Long, String>>> result = joinDs.collect();
-
-		String expected = "1,(1,1,Hi)\n" +
-				"2,(2,2,Hello)\n" +
-				"null,(3,2,Hello world)\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
-
-		@Override
-		public void join(Tuple3<Integer, Long, String> first,
-				Tuple5<Integer, Long, Integer, String, Long> second,
-				Collector<Tuple2<String, String>> out) {
-
-			out.collect(new Tuple2<>(first == null ? null : first.f2, second == null ? null : second.f3));
-		}
-
-	}
-
-	public static class T3T5BCJoin extends RichFlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<String, String, Integer>> {
-
-		private int broadcast;
-
-		@Override
-		public void open(Configuration config) {
-			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
-			int sum = 0;
-			for (Integer i : ints) {
-				sum += i;
-			}
-			broadcast = sum;
-		}
-
-		@Override
-		public void join(Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second,
-				Collector<Tuple3<String, String, Integer>> out) throws Exception {
-			out.collect(new Tuple3<>(first == null ? null : first.f2, second == null ? null : second.f3, broadcast));
-		}
-	}
-
-	public static class T3CustJoin implements JoinFunction<Tuple3<Integer, Long, String>, CustomType, Tuple2<String, String>> {
-
-		@Override
-		public Tuple2<String, String> join(Tuple3<Integer, Long, String> first,
-				CustomType second) {
-
-			return new Tuple2<>(first == null ? null : first.f2, second == null ? null : second.myString);
-		}
-	}
-
-	public static class CustT3Join implements JoinFunction<CustomType, Tuple3<Integer, Long, String>, Tuple2<String, String>> {
-
-		@Override
-		public Tuple2<String, String> join(CustomType first, Tuple3<Integer, Long, String> second) {
-
-			return new Tuple2<>(first == null ? null : first.myString, second == null ? null : second.f2);
-		}
-	}
-
-	/**
-	 * Deliberately untyped join function, which emits a Tuple2 of the left and right side.
-	 */
-	public static class ProjectBothFunction<IN1, IN2> implements JoinFunction<IN1, IN2, Tuple2<IN1, IN2>> {
-		@Override
-		public Tuple2<IN1, IN2> join(IN1 first, IN2 second) throws Exception {
-			return new Tuple2<>(first, second);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
deleted file mode 100644
index 85d70e3..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/PartitionITCase.java
+++ /dev/null
@@ -1,848 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.MapPartitionFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.AggregateOperator;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-@SuppressWarnings("serial")
-public class PartitionITCase extends MultipleProgramsTestBase {
-
-	public PartitionITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test
-	public void testHashPartitionByKeyField() throws Exception {
-		/*
-		 * Test hash partition by key field
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Long> uniqLongs = ds
-				.partitionByHash(1)
-				.mapPartition(new UniqueTupleLongMapper());
-		List<Long> result = uniqLongs.collect();
-
-		String expected = "1\n" +
-				"2\n" +
-				"3\n" +
-				"4\n" +
-				"5\n" +
-				"6\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testRangePartitionByKeyField() throws Exception {
-		/*
-		 * Test range partition by key field
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Long> uniqLongs = ds
-			.partitionByRange(1)
-			.mapPartition(new UniqueTupleLongMapper());
-		List<Long> result = uniqLongs.collect();
-
-		String expected = "1\n" +
-			"2\n" +
-			"3\n" +
-			"4\n" +
-			"5\n" +
-			"6\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testHashPartitionByKeyField2() throws Exception {
-		/*
-		 * Test hash partition by key field
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		AggregateOperator<Tuple3<Integer, Long, String>> sum = ds
-			.map(new PrefixMapper())
-			.partitionByHash(1, 2)
-			.groupBy(1, 2)
-			.sum(0);
-
-		List<Tuple3<Integer, Long, String>> result = sum.collect();
-
-		String expected = "(1,1,Hi)\n" +
-			"(5,2,Hello)\n" +
-			"(4,3,Hello)\n" +
-			"(5,3,I am )\n" +
-			"(6,3,Luke )\n" +
-			"(34,4,Comme)\n" +
-			"(65,5,Comme)\n" +
-			"(111,6,Comme)";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testRangePartitionByKeyField2() throws Exception {
-		/*
-		 * Test range partition by key field
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		AggregateOperator<Tuple3<Integer, Long, String>> sum = ds
-			.map(new PrefixMapper())
-			.partitionByRange(1, 2)
-			.groupBy(1, 2)
-			.sum(0);
-
-		List<Tuple3<Integer, Long, String>> result = sum.collect();
-
-		String expected = "(1,1,Hi)\n" +
-		"(5,2,Hello)\n" +
-		"(4,3,Hello)\n" +
-		"(5,3,I am )\n" +
-		"(6,3,Luke )\n" +
-		"(34,4,Comme)\n" +
-		"(65,5,Comme)\n" +
-		"(111,6,Comme)";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testHashPartitionOfAtomicType() throws Exception {
-		/*
-		 * Test hash partition of atomic type
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Long> uniqLongs = env.generateSequence(1, 6)
-			.union(env.generateSequence(1, 6))
-			.rebalance()
-			.partitionByHash("*")
-			.mapPartition(new UniqueLongMapper());
-		List<Long> result = uniqLongs.collect();
-
-		String expected = "1\n" +
-			"2\n" +
-			"3\n" +
-			"4\n" +
-			"5\n" +
-			"6\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testRangePartitionOfAtomicType() throws Exception {
-		/*
-		 * Test range partition of atomic type
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Long> uniqLongs = env.generateSequence(1, 6)
-			.union(env.generateSequence(1, 6))
-			.rebalance()
-			.partitionByRange("*")
-			.mapPartition(new UniqueLongMapper());
-		List<Long> result = uniqLongs.collect();
-
-		String expected = "1\n" +
-			"2\n" +
-			"3\n" +
-			"4\n" +
-			"5\n" +
-			"6\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testHashPartitionByKeySelector() throws Exception {
-		/*
-		 * Test hash partition by key selector
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Long> uniqLongs = ds
-				.partitionByHash(new KeySelector1())
-				.mapPartition(new UniqueTupleLongMapper());
-		List<Long> result = uniqLongs.collect();
-
-		String expected = "1\n" +
-				"2\n" +
-				"3\n" +
-				"4\n" +
-				"5\n" +
-				"6\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	private static class PrefixMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
-		@Override
-		public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) throws Exception {
-			if (value.f2.length() > 5) {
-				value.f2 = value.f2.substring(0, 5);
-			}
-			return value;
-		}
-	}
-
-	@Test
-	public void testRangePartitionByKeySelector() throws Exception {
-		/*
-		 * Test range partition by key selector
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Long> uniqLongs = ds
-			.partitionByRange(new KeySelector1())
-			.mapPartition(new UniqueTupleLongMapper());
-		List<Long> result = uniqLongs.collect();
-
-		String expected = "1\n" +
-			"2\n" +
-			"3\n" +
-			"4\n" +
-			"5\n" +
-			"6\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class KeySelector1 implements KeySelector<Tuple3<Integer,Long,String>, Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Long getKey(Tuple3<Integer, Long, String> value) throws Exception {
-			return value.f1;
-		}
-
-	}
-
-	@Test
-	public void testForcedRebalancing() throws Exception {
-		/*
-		 * Test forced rebalancing
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// generate some number in parallel
-		DataSet<Long> ds = env.generateSequence(1,3000);
-		DataSet<Tuple2<Integer, Integer>> uniqLongs = ds
-				// introduce some partition skew by filtering
-				.filter(new Filter1())
-				// rebalance
-				.rebalance()
-				// count values in each partition
-				.map(new PartitionIndexMapper())
-				.groupBy(0)
-				.reduce(new Reducer1())
-				// round counts to mitigate runtime scheduling effects (lazy split assignment)
-				.map(new Mapper1());
-
-		List<Tuple2<Integer, Integer>> result = uniqLongs.collect();
-
-		StringBuilder expected = new StringBuilder();
-		int numPerPartition = 2220 / env.getParallelism() / 10;
-		for (int i = 0; i < env.getParallelism(); i++) {
-			expected.append('(').append(i).append(',')
-			.append(numPerPartition).append(")\n");
-		}
-
-		compareResultAsText(result, expected.toString());
-	}
-
-	public static class Filter1 implements FilterFunction<Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Long value) throws Exception {
-			return value > 780;
-		}
-	}
-
-	public static class Reducer1 implements ReduceFunction<Tuple2<Integer, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) {
-			return new Tuple2<>(v1.f0, v1.f1+v2.f1);
-		}
-	}
-
-	public static class Mapper1 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer,
-	Integer>>{
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
-			value.f1 = (value.f1 / 10);
-			return value;
-		}
-
-	}
-
-	@Test
-	public void testHashPartitionByKeyFieldAndDifferentParallelism() throws Exception {
-		/*
-		 * Test hash partition by key field and different parallelism
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(3);
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Long> uniqLongs = ds
-				.partitionByHash(1).setParallelism(4)
-				.mapPartition(new UniqueTupleLongMapper());
-		List<Long> result = uniqLongs.collect();
-
-		String expected = "1\n" +
-				"2\n" +
-				"3\n" +
-				"4\n" +
-				"5\n" +
-				"6\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testRangePartitionByKeyFieldAndDifferentParallelism() throws Exception {
-		/*
-		 * Test range partition by key field and different parallelism
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(3);
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Long> uniqLongs = ds
-			.partitionByRange(1).setParallelism(4)
-			.mapPartition(new UniqueTupleLongMapper());
-		List<Long> result = uniqLongs.collect();
-
-		String expected = "1\n" +
-			"2\n" +
-			"3\n" +
-			"4\n" +
-			"5\n" +
-			"6\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testHashPartitionWithKeyExpression() throws Exception {
-		/*
-		 * Test hash partition with key expression
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(3);
-
-		DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
-		DataSet<Long> uniqLongs = ds
-				.partitionByHash("nestedPojo.longNumber").setParallelism(4)
-				.mapPartition(new UniqueNestedPojoLongMapper());
-		List<Long> result = uniqLongs.collect();
-
-		String expected = "10000\n" +
-				"20000\n" +
-				"30000\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testRangePartitionWithKeyExpression() throws Exception {
-		/*
-		 * Test range partition with key expression
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(3);
-
-		DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
-		DataSet<Long> uniqLongs = ds
-			.partitionByRange("nestedPojo.longNumber").setParallelism(4)
-			.mapPartition(new UniqueNestedPojoLongMapper());
-		List<Long> result = uniqLongs.collect();
-
-		String expected = "10000\n" +
-			"20000\n" +
-			"30000\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class UniqueTupleLongMapper implements MapPartitionFunction<Tuple3<Integer,Long,String>, Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void mapPartition(Iterable<Tuple3<Integer, Long, String>> records, Collector<Long> out) throws Exception {
-			HashSet<Long> uniq = new HashSet<>();
-			for(Tuple3<Integer,Long,String> t : records) {
-				uniq.add(t.f1);
-			}
-			for(Long l : uniq) {
-				out.collect(l);
-			}
-		}
-	}
-
-	public static class UniqueLongMapper implements MapPartitionFunction<Long, Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void mapPartition(Iterable<Long> longs, Collector<Long> out) throws Exception {
-			HashSet<Long> uniq = new HashSet<>();
-			for(Long l : longs) {
-				uniq.add(l);
-			}
-			for(Long l : uniq) {
-				out.collect(l);
-			}
-		}
-	}
-
-	public static class UniqueNestedPojoLongMapper implements MapPartitionFunction<POJO, Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void mapPartition(Iterable<POJO> records, Collector<Long> out) throws Exception {
-			HashSet<Long> uniq = new HashSet<>();
-			for(POJO t : records) {
-				uniq.add(t.nestedPojo.longNumber);
-			}
-			for(Long l : uniq) {
-				out.collect(l);
-			}
-		}
-	}
-
-	public static class PartitionIndexMapper extends RichMapFunction<Long, Tuple2<Integer, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Integer, Integer> map(Long value) throws Exception {
-			return new Tuple2<>(this.getRuntimeContext().getIndexOfThisSubtask(), 1);
-		}
-	}
-
-	@Test
-	public void testRangePartitionerOnSequenceData() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSource<Long> dataSource = env.generateSequence(0, 10000);
-		KeySelector<Long, Long> keyExtractor = new ObjectSelfKeySelector();
-
-		MapPartitionFunction<Long, Tuple2<Long, Long>> MinMaxSelector = new MinMaxSelector<>(new LongComparator(true));
-
-		Comparator<Tuple2<Long, Long>> tuple2Comparator = new Tuple2Comparator(new LongComparator(true));
-
-		List<Tuple2<Long, Long>> collected = dataSource.partitionByRange(keyExtractor).mapPartition(MinMaxSelector).collect();
-		Collections.sort(collected, tuple2Comparator);
-
-		long previousMax = -1;
-		for (Tuple2<Long, Long> tuple2 : collected) {
-			if (previousMax == -1) {
-				previousMax = tuple2.f1;
-			} else {
-				long currentMin = tuple2.f0;
-				assertTrue(tuple2.f0 < tuple2.f1);
-				assertEquals(previousMax + 1, currentMin);
-				previousMax = tuple2.f1;
-			}
-		}
-	}
-
-	@Test(expected = InvalidProgramException.class)
-	public void testRangePartitionInIteration() throws Exception {
-
-		// does not apply for collection execution
-		if (super.mode == TestExecutionMode.COLLECTION) {
-			throw new InvalidProgramException("Does not apply for collection execution");
-		}
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSource<Long> source = env.generateSequence(0, 10000);
-
-		DataSet<Tuple2<Long, String>> tuples = source.map(new MapFunction<Long, Tuple2<Long, String>>() {
-			@Override
-			public Tuple2<Long, String> map(Long v) throws Exception {
-				return new Tuple2<>(v, Long.toString(v));
-			}
-		});
-
-		DeltaIteration<Tuple2<Long, String>, Tuple2<Long, String>> it = tuples.iterateDelta(tuples, 10, 0);
-		DataSet<Tuple2<Long, String>> body = it.getWorkset()
-			.partitionByRange(1) // Verify that range partition is not allowed in iteration
-			.join(it.getSolutionSet())
-			.where(0).equalTo(0).projectFirst(0).projectSecond(1);
-		DataSet<Tuple2<Long, String>> result = it.closeWith(body, body);
-
-		result.collect(); // should fail
-	}
-
-
-
-	@Test
-	public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple2<Long, Long>> dataSet = env.generateSequence(0, 10000)
-				.map(new MapFunction<Long, Tuple2<Long, Long>>() {
-			@Override
-			public Tuple2<Long, Long> map(Long value) throws Exception {
-				return new Tuple2<>(value / 5000, value % 5000);
-			}
-		});
-
-		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
-																			   new LongComparator(false));
-
-		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
-
-		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0, 1)
-				.withOrders(Order.ASCENDING, Order.DESCENDING)
-				.mapPartition(minMaxSelector)
-				.collect();
-
-		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
-
-		Tuple2<Long, Long> previousMax = null;
-		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
-            assertTrue("Min element in each partition should be smaller than max.",
-                    tuple2Comparator.compare(tuple2.f0, tuple2.f1) <= 0);
-			if (previousMax == null) {
-				previousMax = tuple2.f1;
-			} else {
-                assertTrue("Partitions overlap. Previous max should be smaller than current min.",
-                        tuple2Comparator.compare(previousMax, tuple2.f0) < 0);
-				if (previousMax.f0.equals(tuple2.f0.f0)) {
-                    //check that ordering on the second key is correct
-					assertEquals("Ordering on the second field should be continous.",
-                            previousMax.f1 - 1, tuple2.f0.f1.longValue());
-				}
-				previousMax = tuple2.f1;
-			}
-		}
-	}
-
-	@Test
-	public void testRangePartitionerOnSequenceNestedDataWithOrders() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		final DataSet<Tuple2<Tuple2<Long, Long>, Long>> dataSet = env.generateSequence(0, 10000)
-				.map(new MapFunction<Long, Tuple2<Tuple2<Long, Long>, Long>>() {
-					@Override
-					public Tuple2<Tuple2<Long, Long>, Long> map(Long value) throws Exception {
-						return new Tuple2<>(new Tuple2<>(value / 5000, value % 5000), value);
-					}
-				});
-
-		final Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<>(new LongComparator(true),
-				new LongComparator(true));
-		MinMaxSelector<Tuple2<Long, Long>> minMaxSelector = new MinMaxSelector<>(tuple2Comparator);
-
-		final List<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>> collected = dataSet.partitionByRange(0)
-				.withOrders(Order.ASCENDING)
-				.mapPartition(new MapPartitionFunction<Tuple2<Tuple2<Long,Long>,Long>, Tuple2<Long, Long>>() {
-					@Override
-					public void mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>> values,
-											 Collector<Tuple2<Long, Long>> out) throws Exception {
-						for (Tuple2<Tuple2<Long, Long>, Long> value : values) {
-							out.collect(value.f0);
-						}
-					}
-				})
-				.mapPartition(minMaxSelector)
-				.collect();
-
-		Collections.sort(collected, new Tuple2Comparator<>(tuple2Comparator));
-
-		Tuple2<Long, Long> previousMax = null;
-		for (Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> tuple2 : collected) {
-            assertTrue("Min element in each partition should be smaller than max.",
-                    tuple2Comparator.compare(tuple2.f0, tuple2.f1) <= 0);
-			if (previousMax == null) {
-				previousMax = tuple2.f1;
-			} else {
-                assertTrue("Partitions overlap. Previous max should be smaller than current min.",
-                        tuple2Comparator.compare(previousMax, tuple2.f0) < 0);
-				if (previousMax.f0.equals(tuple2.f0.f0)) {
-					assertEquals("Ordering on the second field should be continous.",
-                            previousMax.f1 + 1, tuple2.f0.f1.longValue());
-				}
-				previousMax = tuple2.f1;
-			}
-		}
-	}
-
-	@Test
-	public void testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		final DataSet<Tuple2<ComparablePojo, Long>> dataSet = env.generateSequence(0, 10000)
-				.map(new MapFunction<Long, Tuple2<ComparablePojo, Long>>() {
-					@Override
-					public Tuple2<ComparablePojo, Long> map(Long value) throws Exception {
-						return new Tuple2<>(new ComparablePojo(value / 5000, value % 5000), value);
-					}
-				});
-
-		final List<Tuple2<ComparablePojo, ComparablePojo>> collected = dataSet
-				.partitionByRange(new KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>() {
-					@Override
-					public ComparablePojo getKey(Tuple2<ComparablePojo, Long> value) throws Exception {
-						return value.f0;
-					}
-				})
-				.withOrders(Order.ASCENDING)
-				.mapPartition(new MinMaxSelector<>(new ComparablePojoComparator()))
-				.mapPartition(new ExtractComparablePojo())
-				.collect();
-
-		final Comparator<Tuple2<ComparablePojo, ComparablePojo>> pojoComparator =
-				new Comparator<Tuple2<ComparablePojo, ComparablePojo>>() {
-			@Override
-			public int compare(Tuple2<ComparablePojo, ComparablePojo> o1,
-							   Tuple2<ComparablePojo, ComparablePojo> o2) {
-				return o1.f0.compareTo(o2.f1);
-			}
-		};
-		Collections.sort(collected, pojoComparator);
-
-		ComparablePojo previousMax = null;
-		for (Tuple2<ComparablePojo, ComparablePojo> element : collected) {
-			assertTrue("Min element in each partition should be smaller than max.",
-					element.f0.compareTo(element.f1) <= 0);
-			if (previousMax == null) {
-				previousMax = element.f1;
-			} else {
-				assertTrue("Partitions overlap. Previous max should be smaller than current min.",
-						previousMax.compareTo(element.f0) < 0);
-				if (previousMax.first.equals(element.f0.first)) {
-					assertEquals("Ordering on the second field should be continous.",
-							previousMax.second - 1, element.f0.second.longValue());
-				}
-				previousMax = element.f1;
-			}
-		}
-	}
-
-	private static class ExtractComparablePojo implements MapPartitionFunction<
-			Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>,
-			Tuple2<ComparablePojo, ComparablePojo>> {
-
-		@Override
-		public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>> values,
-								 Collector<Tuple2<ComparablePojo, ComparablePojo>> out) throws Exception {
-			for (Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>> value : values) {
-				out.collect(new Tuple2<>(value.f0.f0, value.f1.f0));
-			}
-		}
-	}
-
-    private static class ComparablePojoComparator implements Comparator<Tuple2<ComparablePojo, Long>>, Serializable {
-
-		@Override
-		public int compare(Tuple2<ComparablePojo, Long> o1,
-						   Tuple2<ComparablePojo, Long> o2) {
-			return o1.f0.compareTo(o2.f0);
-		}
-	}
-
-	private static class ComparablePojo implements Comparable<ComparablePojo> {
-		private Long first;
-		private Long second;
-
-		public Long getFirst() {
-			return first;
-		}
-
-		public void setFirst(Long first) {
-			this.first = first;
-		}
-
-		public Long getSecond() {
-			return second;
-		}
-
-		public void setSecond(Long second) {
-			this.second = second;
-		}
-
-		public ComparablePojo(Long first,
-							  Long second) {
-			this.first = first;
-			this.second = second;
-		}
-
-		public ComparablePojo() {
-		}
-
-		@Override
-		public int compareTo(ComparablePojo o) {
-			final int firstResult = Long.compare(this.first, o.first);
-			if (firstResult == 0) {
-				return (-1) * Long.compare(this.second, o.second);
-			}
-
-			return firstResult;
-		}
-	}
-
-	private static class ObjectSelfKeySelector implements KeySelector<Long, Long> {
-		@Override
-		public Long getKey(Long value) throws Exception {
-			return value;
-		}
-	}
-
-	private static class MinMaxSelector<T> implements MapPartitionFunction<T, Tuple2<T, T>> {
-
-		private final Comparator<T> comparator;
-
-		public MinMaxSelector(Comparator<T> comparator) {
-			this.comparator = comparator;
-		}
-
-		@Override
-		public void mapPartition(Iterable<T> values, Collector<Tuple2<T, T>> out) throws Exception {
-			Iterator<T> itr = values.iterator();
-			T min = itr.next();
-			T max = min;
-			T value;
-			while (itr.hasNext()) {
-				value= itr.next();
-				if (comparator.compare(value, min) < 0) {
-					min = value;
-				}
-				if (comparator.compare(value, max) > 0) {
-					max = value;
-				}
-
-			}
-
-			Tuple2<T, T> result = new Tuple2<>(min, max);
-			out.collect(result);
-		}
-	}
-
-	private static class Tuple2Comparator<T> implements Comparator<Tuple2<T, T>>, Serializable {
-
-		private final Comparator<T> firstComparator;
-		private final Comparator<T> secondComparator;
-
-		public Tuple2Comparator(Comparator<T> comparator) {
-			this(comparator, comparator);
-		}
-
-		public Tuple2Comparator(Comparator<T> firstComparator,
-								Comparator<T> secondComparator) {
-			this.firstComparator = firstComparator;
-			this.secondComparator = secondComparator;
-		}
-
-		@Override
-		public int compare(Tuple2<T, T> first, Tuple2<T, T> second) {
-			long result = firstComparator.compare(first.f0, second.f0);
-			if (result > 0) {
-				return 1;
-			} else if (result < 0) {
-				return -1;
-			}
-
-			result = secondComparator.compare(first.f1, second.f1);
-			if (result > 0) {
-				return 1;
-			} else if (result < 0) {
-				return -1;
-			}
-
-			return 0;
-		}
-	}
-
-	private static class LongComparator implements Comparator<Long>, Serializable {
-
-		private final boolean ascending;
-
-		public LongComparator(boolean ascending) {
-			this.ascending = ascending;
-		}
-
-		@Override
-		public int compare(Long o1, Long o2) {
-			if (ascending) {
-				return Long.compare(o1, o2);
-			} else {
-				return (-1) * Long.compare(o1, o2);
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
deleted file mode 100644
index 1054c62..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-public class ProjectITCase extends JavaProgramTestBase {
-
-	@Override
-	protected void testProgram() throws Exception {
-		/*
-		 * Projection with tuple fields indexes
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple3<String, Long, Integer>> projDs = ds.
-				project(3,4,2);
-		List<Tuple3<String, Long, Integer>> result = projDs.collect();
-
-		String expectedResult = "Hallo,1,0\n" +
-				"Hallo Welt,2,1\n" +
-				"Hallo Welt wie,1,2\n" +
-				"Hallo Welt wie gehts?,2,3\n" +
-				"ABC,2,4\n" +
-				"BCD,3,5\n" +
-				"CDE,2,6\n" +
-				"DEF,1,7\n" +
-				"EFG,1,8\n" +
-				"FGH,2,9\n" +
-				"GHI,1,10\n" +
-				"HIJ,3,11\n" +
-				"IJK,3,12\n" +
-				"JKL,2,13\n" +
-				"KLM,2,14\n";
-
-		compareResultAsTuples(result, expectedResult);
-	}
-
-}