You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:16 UTC

[16/22] flink git commit: [FLINK-6731] [tests] Activate strict checkstyle for flink-tests

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
deleted file mode 100644
index 4ca0f3e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ /dev/null
@@ -1,992 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class CoGroupITCase extends MultipleProgramsTestBase {
-
-	public CoGroupITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	/*
-	 * CoGroup on tuples with key field selector
-	 */
-	@Test
-	public void testCoGroupTuplesWithKeyFieldSelector() throws Exception {
-		
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple2<Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroup());
-
-		List<Tuple2<Integer, Integer>> result = coGroupDs.collect();
-
-		String expected = "1,0\n" +
-				"2,6\n" +
-				"3,24\n" +
-				"4,60\n" +
-				"5,120\n";
-		
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCoGroupOnTwoCustomTypeInputsWithKeyExtractors() throws Exception {
-		/*
-		 * CoGroup on two custom type inputs with key extractors
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
-		DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where(new KeySelector4()).equalTo(new
-				KeySelector5()).with(new CustomTypeCoGroup());
-
-		List<CustomType> result = coGroupDs.collect();
-
-		String expected = "1,0,test\n" +
-				"2,6,test\n" +
-				"3,24,test\n" +
-				"4,60,test\n" +
-				"5,120,test\n" +
-				"6,210,test\n";
-		
-		compareResultAsText(result, expected);
-	}
-
-	public static class KeySelector4 implements KeySelector<CustomType, Integer> {
-		private static final long serialVersionUID = 1L;
-		@Override
-		public Integer getKey(CustomType in) {
-			return in.myInt;
-		}
-	}
-
-	public static class KeySelector5 implements KeySelector<CustomType, Integer> {
-		private static final long serialVersionUID = 1L;
-		@Override
-		public Integer getKey(CustomType in) {
-			return in.myInt;
-		}
-	}
-
-	@Test
-	public void testCorrectnessOfCoGroupIfUDFReturnsLeftInputObjects() throws Exception {
-		/*
-		 * check correctness of cogroup if UDF returns left input objects
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple3ReturnLeft());
-
-		List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
-
-		String expected = "1,1,Hi\n" +
-				"2,2,Hello\n" +
-				"3,2,Hello world\n" +
-				"4,3,Hello world, how are you?\n" +
-				"5,3,I am fine.\n";
-		
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfCoGroupIfUDFReturnsRightInputObjects() throws Exception {
-		/*
-		 * check correctness of cogroup if UDF returns right input objects
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5ReturnRight());
-		
-		List<Tuple5<Integer, Long, Integer, String, Long>> result = coGroupDs.collect();
-
-		String expected = "1,1,0,Hallo,1\n" +
-				"2,2,1,Hallo Welt,2\n" +
-				"2,3,2,Hallo Welt wie,1\n" +
-				"3,4,3,Hallo Welt wie gehts?,2\n" +
-				"3,5,4,ABC,2\n" +
-				"3,6,5,BCD,3\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCoGroupWithBroadcastSet() throws Exception {
-		/*
-		 * Reduce with broadcast set
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple3<Integer, Integer, Integer>> coGroupDs = ds.coGroup(ds2).where(0).equalTo(0).with(new Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
-
-		List<Tuple3<Integer, Integer, Integer>> result = coGroupDs.collect();
-
-		String expected = "1,0,55\n" +
-				"2,6,55\n" +
-				"3,24,55\n" +
-				"4,60,55\n" +
-				"5,120,55\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCoGroupOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor()
-	throws Exception {
-		/*
-		 * CoGroup on a tuple input with key field selector and a custom type input with key extractor
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds.coGroup(ds2).where(2).equalTo(new
-				KeySelector2()).with(new MixedCoGroup());
-
-		List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
-
-		String expected = "0,1,test\n" +
-				"1,2,test\n" +
-				"2,5,test\n" +
-				"3,15,test\n" +
-				"4,33,test\n" +
-				"5,63,test\n" +
-				"6,109,test\n" +
-				"7,4,test\n" +
-				"8,4,test\n" +
-				"9,4,test\n" +
-				"10,5,test\n" +
-				"11,5,test\n" +
-				"12,5,test\n" +
-				"13,5,test\n" +
-				"14,5,test\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class KeySelector2 implements KeySelector<CustomType, Integer> {
-		private static final long serialVersionUID = 1L;
-		@Override
-		public Integer getKey(CustomType in) {
-			return in.myInt;
-		}
-	}
-
-	@Test
-	public void testCoGroupOnACustomTypeWithKeyExtractorAndATupleInputWithKeyFieldSelector()
-			throws Exception {
-		/*
-		 * CoGroup on a tuple input with key field selector and a custom type input with key extractor
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
-		DataSet<CustomType> coGroupDs = ds2.coGroup(ds).where(new KeySelector3()).equalTo(2).with
-				(new MixedCoGroup2());
-
-		List<CustomType> result = coGroupDs.collect();
-
-		String expected = "0,1,test\n" +
-				"1,2,test\n" +
-				"2,5,test\n" +
-				"3,15,test\n" +
-				"4,33,test\n" +
-				"5,63,test\n" +
-				"6,109,test\n" +
-				"7,4,test\n" +
-				"8,4,test\n" +
-				"9,4,test\n" +
-				"10,5,test\n" +
-				"11,5,test\n" +
-				"12,5,test\n" +
-				"13,5,test\n" +
-				"14,5,test\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class KeySelector3 implements KeySelector<CustomType, Integer> {
-		private static final long serialVersionUID = 1L;
-		@Override
-		public Integer getKey(CustomType in) {
-			return in.myInt;
-		}
-	}
-
-	@Test
-	public void testCoGroupWithMultipleKeyFieldsWithFieldSelector() throws Exception {
-		/*
-		 * CoGroup with multiple key fields
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
-				where(0, 4).equalTo(0, 1).with(new Tuple5Tuple3CoGroup());
-
-		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
-		
-		String expected = "1,1,Hallo\n" +
-				"2,2,Hallo Welt\n" +
-				"3,2,Hallo Welt wie gehts?\n" +
-				"3,2,ABC\n" +
-				"5,3,HIJ\n" +
-				"5,3,IJK\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCoGroupWithMultipleKeyFieldsWithStaticClassKeyExtractor() throws Exception {
-		/*
-		 * CoGroup with multiple key fields
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
-				where(new KeySelector7()).
-				equalTo(new KeySelector8()).with(new Tuple5Tuple3CoGroup());
-
-		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
-		
-		String expected = "1,1,Hallo\n" +
-				"2,2,Hallo Welt\n" +
-				"3,2,Hallo Welt wie gehts?\n" +
-				"3,2,ABC\n" +
-				"5,3,HIJ\n" +
-				"5,3,IJK\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner() throws Exception {
-		/*
-		 * CoGroup with multiple key fields, test working closure cleaner for inner classes
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
-				where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
-						Tuple2<Integer, Long>>() {
-					@Override
-					public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
-						return new Tuple2<Integer, Long>(t.f0, t.f4);
-					}
-				}).
-				equalTo(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
-
-					@Override
-					public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
-						return new Tuple2<Integer, Long>(t.f0, t.f1);
-					}
-				}).
-				with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
-					@Override
-					public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
-					                    Iterable<Tuple3<Integer, Long, String>> second,
-					                    Collector<Tuple3<Integer, Long, String>> out)
-					{
-						List<String> strs = new ArrayList<String>();
-
-						for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
-							strs.add(t.f3);
-						}
-
-						for(Tuple3<Integer, Long, String> t : second) {
-							for(String s : strs) {
-								out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
-							}
-						}
-					}
-				});
-
-		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
-
-		String expected = "1,1,Hallo\n" +
-				"2,2,Hallo Welt\n" +
-				"3,2,Hallo Welt wie gehts?\n" +
-				"3,2,ABC\n" +
-				"5,3,HIJ\n" +
-				"5,3,IJK\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithoutClosureCleaner() throws Exception {
-		/*
-		 * CoGroup with multiple key fields, test that disabling closure cleaner leads to an exception when using inner
-		 * classes.
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().disableClosureCleaner();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-		boolean correctExceptionTriggered = false;
-		try {
-			DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
-					where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
-							Tuple2<Integer, Long>>() {
-						@Override
-						public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
-							return new Tuple2<Integer, Long>(t.f0, t.f4);
-						}
-					}).
-					equalTo(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() {
-
-						@Override
-						public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
-							return new Tuple2<Integer, Long>(t.f0, t.f1);
-						}
-					}).
-					with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
-						@Override
-						public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
-						                    Iterable<Tuple3<Integer, Long, String>> second,
-						                    Collector<Tuple3<Integer, Long, String>> out) {
-							List<String> strs = new ArrayList<String>();
-
-							for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
-								strs.add(t.f3);
-							}
-
-							for (Tuple3<Integer, Long, String> t : second) {
-								for (String s : strs) {
-									out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
-								}
-							}
-						}
-					});
-		} catch (InvalidProgramException ex) {
-			correctExceptionTriggered = (ex.getCause() instanceof java.io.NotSerializableException);
-		}
-		Assert.assertTrue(correctExceptionTriggered);
-
-	}
-
-	public static class KeySelector7 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>,
-	Tuple2<Integer, Long>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
-			return new Tuple2<Integer, Long>(t.f0, t.f4);
-		}
-	}
-
-	public static class KeySelector8 implements KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
-			return new Tuple2<Integer, Long>(t.f0, t.f1);
-		}
-	}
-
-	@Test
-	public void testCoGroupTwoCustomTypeInputsWithExpressionKeys() throws Exception {
-		/*
-		 * CoGroup on two custom type inputs using expression keys
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-		DataSet<CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
-		DataSet<CustomType> coGroupDs = ds.coGroup(ds2).where("myInt").equalTo("myInt").with(new CustomTypeCoGroup());
-
-		List<CustomType> result = coGroupDs.collect();
-		
-		String expected = "1,0,test\n" +
-				"2,6,test\n" +
-				"3,24,test\n" +
-				"4,60,test\n" +
-				"5,120,test\n" +
-				"6,210,test\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testCoGroupOnTwoCustomTypeInputsWithExpressionKeyAndFieldSelector() throws
-			Exception {
-		/*
-		 * CoGroup on two custom type inputs using expression keys
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
-		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-		DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
-				.where("nestedPojo.longNumber").equalTo(6).with(new CoGroup1());
-
-		List<CustomType> result = coGroupDs.collect();
-		
-		String expected = 	"-1,20000,Flink\n" +
-				"-1,10000,Flink\n" +
-				"-1,30000,Flink\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class CoGroup1 implements CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(
-				Iterable<POJO> first,
-				Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
-				Collector<CustomType> out) throws Exception {
-			for(POJO p : first) {
-				for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
-					Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
-					out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
-				}
-			}
-		}
-	}
-
-	@Test
-	public void testCoGroupFieldSelectorAndComplicatedKeySelector() throws Exception {
-		/*
-		 * CoGroup field-selector (expression keys) + key selector function
-		 * The key selector is unnecessary complicated (Tuple1) ;)
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
-		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-		DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
-				.where(new KeySelector6()).equalTo(6).with(new CoGroup3());
-
-		List<CustomType> result = coGroupDs.collect();
-		
-		String expected = 	"-1,20000,Flink\n" +
-				"-1,10000,Flink\n" +
-				"-1,30000,Flink\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class KeySelector6 implements KeySelector<POJO, Tuple1<Long>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple1<Long> getKey(POJO value)
-		throws Exception {
-			return new Tuple1<Long>(value.nestedPojo.longNumber);
-		}
-	}
-
-	public static class CoGroup3 implements CoGroupFunction<POJO, Tuple7<Integer,
-			String, Integer, Integer, Long, String, Long>, CustomType> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(
-				Iterable<POJO> first,
-				Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
-				Collector<CustomType> out) throws Exception {
-			for(POJO p : first) {
-				for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
-					Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
-					out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
-				}
-			}
-		}
-	}
-
-	@Test
-	public void testCoGroupFieldSelectorAndKeySelector() throws Exception {
-		/*
-		 * CoGroup field-selector (expression keys) + key selector function
-		 * The key selector is simple here
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
-		DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
-		DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
-				.where(new KeySelector1()).equalTo(6).with(new CoGroup2());
-
-		List<CustomType> result = coGroupDs.collect();
-		
-		String expected = "-1,20000,Flink\n" +
-				"-1,10000,Flink\n" +
-				"-1,30000,Flink\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testCoGroupWithAtomicType1() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Integer> ds2 = env.fromElements(0, 1, 2);
-
-		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where(0).equalTo("*").with(new CoGroupAtomic1());
-
-		List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
-		
-		String expected = "(1,1,Hi)\n" +
-			"(2,2,Hello)";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testCoGroupWithAtomicType2() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Integer> ds1 = env.fromElements(0, 1, 2);
-		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
-
-		DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where("*").equalTo(0).with(new CoGroupAtomic2());
-		
-		List<Tuple3<Integer, Long, String>> result = coGroupDs.collect();
-		
-		String expected = "(1,1,Hi)\n" +
-			"(2,2,Hello)";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testCoGroupWithRangePartitioning() throws Exception {
-		/*
-		 * Test coGroup on tuples with multiple key field positions and same customized distribution
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
-
-		env.setParallelism(4);
-		TestDistribution testDis = new TestDistribution();
-		DataSet<Tuple3<Integer, Long, String>> coGrouped =
-				DataSetUtils.partitionByRange(ds1, testDis, 0, 4)
-						.coGroup(DataSetUtils.partitionByRange(ds2, testDis, 0, 1))
-						.where(0, 4)
-						.equalTo(0, 1)
-						.with(new Tuple5Tuple3CoGroup());
-
-		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
-
-		String expected = "1,1,Hallo\n" +
-				"2,2,Hallo Welt\n" +
-				"3,2,Hallo Welt wie gehts?\n" +
-				"3,2,ABC\n" +
-				"5,3,HIJ\n" +
-				"5,3,IJK\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-
-	// --------------------------------------------------------------------------------------------
-	//  UDF classes
-	// --------------------------------------------------------------------------------------------
-	
-	public static class KeySelector1 implements KeySelector<POJO, Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Long getKey(POJO value)
-		throws Exception {
-			return value.nestedPojo.longNumber;
-		}
-	}
-
-	public static class CoGroup2 implements CoGroupFunction<POJO, Tuple7<Integer, String,
-			Integer, Integer, Long, String, Long>, CustomType> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(
-				Iterable<POJO> first,
-				Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
-				Collector<CustomType> out) throws Exception {
-			for(POJO p : first) {
-				for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
-					Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
-					out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
-				}
-			}
-		}
-	}
-
-	public static class Tuple5CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(
-				Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
-				Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
-				Collector<Tuple2<Integer, Integer>> out)
-		{
-			int sum = 0;
-			int id = 0;
-			
-			for ( Tuple5<Integer, Long, Integer, String, Long> element : first ) {
-				sum += element.f2;
-				id = element.f0;
-			}
-			
-			for ( Tuple5<Integer, Long, Integer, String, Long> element : second ) {
-				sum += element.f2;
-				id = element.f0;
-			}
-			
-			out.collect(new Tuple2<Integer, Integer>(id, sum));
-		}
-	}
-	
-	public static class CustomTypeCoGroup implements CoGroupFunction<CustomType, CustomType, CustomType> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(Iterable<CustomType> first, Iterable<CustomType> second, Collector<CustomType> out) {
-			
-			CustomType o = new CustomType(0,0,"test");
-			
-			for ( CustomType element : first ) {
-				o.myInt = element.myInt;
-				o.myLong += element.myLong;
-			}
-			
-			for ( CustomType element : second ) {
-				o.myInt = element.myInt;
-				o.myLong += element.myLong;
-			}
-			
-			out.collect(o);
-		}
-	}
-	
-	public static class MixedCoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(
-				Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
-				Iterable<CustomType> second,
-				Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-			
-			long sum = 0;
-			int id = 0;
-			
-			for ( Tuple5<Integer, Long, Integer, String, Long> element : first ) {
-				sum += element.f0;
-				id = element.f2;
-			}
-			
-			for (CustomType element : second) {
-				id = element.myInt;
-				sum += element.myLong;
-			}
-			
-			out.collect(new Tuple3<Integer, Long, String>(id, sum, "test"));
-		}
-		
-	}
-	
-	public static class MixedCoGroup2 implements CoGroupFunction<CustomType, Tuple5<Integer, Long, Integer, String, Long>, CustomType> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(Iterable<CustomType> first,
-				Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
-				Collector<CustomType> out)
-		{
-			CustomType o = new CustomType(0,0,"test");
-			
-			for (CustomType element : first) {
-				o.myInt = element.myInt;
-				o.myLong += element.myLong;
-			}
-			
-			for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
-				o.myInt = element.f2;
-				o.myLong += element.f0;
-			}
-			
-			out.collect(o);
-			
-		}
-		
-	}
-	
-	public static class Tuple3ReturnLeft implements CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
-		
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(Iterable<Tuple3<Integer, Long, String>> first,
-				Iterable<Tuple3<Integer, Long, String>> second,
-				Collector<Tuple3<Integer, Long, String>> out)
-		{
-			for (Tuple3<Integer, Long, String> element : first) {
-				if(element.f0 < 6) {
-					out.collect(element);
-				}
-			}
-		}
-	}
-	
-	public static class Tuple5ReturnRight implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
-		
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(
-				Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
-				Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
-				Collector<Tuple5<Integer, Long, Integer, String, Long>> out)
-		{
-			for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
-				if(element.f0 < 4) {
-					out.collect(element);
-				}
-			}
-		}
-	}
-	
-	public static class Tuple5CoGroupBC extends RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
-
-		private static final long serialVersionUID = 1L;
-		
-		private int broadcast = 42;
-		
-		@Override
-		public void open(Configuration config) {
-			
-			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
-			int sum = 0;
-			for(Integer i : ints) {
-				sum += i;
-			}
-			broadcast = sum;
-			
-		}
-
-		@Override
-		public void coGroup(
-				Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
-				Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
-				Collector<Tuple3<Integer, Integer, Integer>> out)
-		{
-			int sum = 0;
-			int id = 0;
-			
-			for (Tuple5<Integer, Long, Integer, String, Long> element : first) {
-				sum += element.f2;
-				id = element.f0;
-			}
-			
-			for (Tuple5<Integer, Long, Integer, String, Long> element : second) {
-				sum += element.f2;
-				id = element.f0;
-			}
-			
-			out.collect(new Tuple3<Integer, Integer, Integer>(id, sum, broadcast));
-		}
-	}
-	
-	public static class Tuple5Tuple3CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
-		
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
-				Iterable<Tuple3<Integer, Long, String>> second,
-				Collector<Tuple3<Integer, Long, String>> out)
-		{
-			List<String> strs = new ArrayList<String>();
-			
-			for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
-				strs.add(t.f3);
-			}
-			
-			for(Tuple3<Integer, Long, String> t : second) {
-				for(String s : strs) {
-					out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
-				}
-			}
-		}
-	}
-
-	public static class CoGroupAtomic1 implements CoGroupFunction<Tuple3<Integer, Long, String>, Integer, Tuple3<Integer, Long, String>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(Iterable<Tuple3<Integer, Long, String>> first, Iterable<Integer> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-			List<Integer> ints = new ArrayList<Integer>();
-
-			for (Integer i : second) {
-				ints.add(i);
-			}
-
-			for (Tuple3<Integer, Long, String> t : first) {
-				for (Integer i : ints) {
-					if (t.f0.equals(i)) {
-						out.collect(t);
-					}
-				}
-			}
-		}
-	}
-
-	public static class CoGroupAtomic2 implements CoGroupFunction<Integer, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(Iterable<Integer> first, Iterable<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
-			List<Integer> ints = new ArrayList<Integer>();
-
-			for (Integer i : first) {
-				ints.add(i);
-			}
-
-			for (Tuple3<Integer, Long, String> t : second) {
-				for (Integer i : ints) {
-					if (t.f0.equals(i)) {
-						out.collect(t);
-					}
-				}
-			}
-		}
-	}
-
-	public static class TestDistribution implements DataDistribution {
-		public Object[][] boundaries = new Object[][]{
-				new Object[]{2, 2L},
-				new Object[]{5, 4L},
-				new Object[]{10, 12L},
-				new Object[]{21, 6L}
-		};
-
-		public TestDistribution() {}
-
-		@Override
-		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
-			return boundaries[bucketNum];
-		}
-
-		@Override
-		public int getNumberOfFields() {
-			return 2;
-		}
-
-		@Override
-		public TypeInformation[] getKeyTypes() {
-			return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			return obj instanceof TestDistribution; 
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
deleted file mode 100644
index 63d1ec7..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.RichCrossFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-@RunWith(Parameterized.class)
-public class CrossITCase extends MultipleProgramsTestBase {
-
-	public CrossITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test
-	public void testCorretnessOfCrossOnTwoTupleInputs() throws Exception {
-		/*
-		 * check correctness of cross on two tuple inputs
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple2<Integer, String>> crossDs = ds.cross(ds2).with(new Tuple5Cross());
-
-		List<Tuple2<Integer, String>> result = crossDs.collect();
-
-		String expected = "0,HalloHallo\n" +
-				"1,HalloHallo Welt\n" +
-				"2,HalloHallo Welt wie\n" +
-				"1,Hallo WeltHallo\n" +
-				"2,Hallo WeltHallo Welt\n" +
-				"3,Hallo WeltHallo Welt wie\n" +
-				"2,Hallo Welt wieHallo\n" +
-				"3,Hallo Welt wieHallo Welt\n" +
-				"4,Hallo Welt wieHallo Welt wie\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfCrossIfUDFReturnsLeftInputObject() throws Exception {
-		/*
-		 * check correctness of cross if UDF returns left input object
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new Tuple3ReturnLeft());
-
-		List<Tuple3<Integer, Long, String>> result = crossDs.collect();
-
-		String expected = "1,1,Hi\n" +
-				"1,1,Hi\n" +
-				"1,1,Hi\n" +
-				"2,2,Hello\n" +
-				"2,2,Hello\n" +
-				"2,2,Hello\n" +
-				"3,2,Hello world\n" +
-				"3,2,Hello world\n" +
-				"3,2,Hello world\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfCrossIfUDFReturnsRightInputObject() throws Exception {
-		/*
-		 * check correctness of cross if UDF returns right input object
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> crossDs = ds.cross(ds2).with(new Tuple5ReturnRight());
-
-		List<Tuple5<Integer, Long, Integer, String, Long>> result = crossDs
-				.collect();
-
-		String expected = "1,1,0,Hallo,1\n" +
-				"1,1,0,Hallo,1\n" +
-				"1,1,0,Hallo,1\n" +
-				"2,2,1,Hallo Welt,2\n" +
-				"2,2,1,Hallo Welt,2\n" +
-				"2,2,1,Hallo Welt,2\n" +
-				"2,3,2,Hallo Welt wie,1\n" +
-				"2,3,2,Hallo Welt wie,1\n" +
-				"2,3,2,Hallo Welt wie,1\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfCrossWithBroadcastSet() throws Exception {
-		/*
-		 * check correctness of cross with broadcast set
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple3<Integer, Integer, Integer>> crossDs = ds.cross(ds2).with(new Tuple5CrossBC()).withBroadcastSet(intDs, "ints");
-
-		List<Tuple3<Integer, Integer, Integer>> result = crossDs.collect();
-
-		String expected = "2,0,55\n" +
-				"3,0,55\n" +
-				"3,0,55\n" +
-				"3,0,55\n" +
-				"4,1,55\n" +
-				"4,2,55\n" +
-				"3,0,55\n" +
-				"4,2,55\n" +
-				"4,4,55\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfCrossWithHuge() throws Exception {
-		/*
-		 * check correctness of crossWithHuge (only correctness of result -> should be the same as with normal cross)
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithHuge(ds2).with(new Tuple5Cross());
-
-		List<Tuple2<Integer, String>> result = crossDs.collect();
-
-		String expected = "0,HalloHallo\n" +
-				"1,HalloHallo Welt\n" +
-				"2,HalloHallo Welt wie\n" +
-				"1,Hallo WeltHallo\n" +
-				"2,Hallo WeltHallo Welt\n" +
-				"3,Hallo WeltHallo Welt wie\n" +
-				"2,Hallo Welt wieHallo\n" +
-				"3,Hallo Welt wieHallo Welt\n" +
-				"4,Hallo Welt wieHallo Welt wie\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfCrossWithTiny() throws Exception {
-		/*
-		 * check correctness of crossWithTiny (only correctness of result -> should be the same as with normal cross)
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple2<Integer, String>> crossDs = ds.crossWithTiny(ds2).with(new Tuple5Cross());
-
-		List<Tuple2<Integer, String>> result = crossDs.collect();
-
-		String expected = "0,HalloHallo\n" +
-				"1,HalloHallo Welt\n" +
-				"2,HalloHallo Welt wie\n" +
-				"1,Hallo WeltHallo\n" +
-				"2,Hallo WeltHallo Welt\n" +
-				"3,Hallo WeltHallo Welt wie\n" +
-				"2,Hallo Welt wieHallo\n" +
-				"3,Hallo Welt wieHallo Welt\n" +
-				"4,Hallo Welt wieHallo Welt wie\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testProjectCrossOnATupleInput1() throws Exception{
-		/*
-		 * project cross on a tuple input 1
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple6<String, Long, String, Integer, Long, Long>> crossDs = ds.cross(ds2)
-				.projectFirst(2, 1)
-				.projectSecond(3)
-				.projectFirst(0)
-				.projectSecond(4,1);
-
-		List<Tuple6<String, Long, String, Integer, Long, Long>> result = crossDs.collect();
-
-		String expected = "Hi,1,Hallo,1,1,1\n" +
-				"Hi,1,Hallo Welt,1,2,2\n" +
-				"Hi,1,Hallo Welt wie,1,1,3\n" +
-				"Hello,2,Hallo,2,1,1\n" +
-				"Hello,2,Hallo Welt,2,2,2\n" +
-				"Hello,2,Hallo Welt wie,2,1,3\n" +
-				"Hello world,2,Hallo,3,1,1\n" +
-				"Hello world,2,Hallo Welt,3,2,2\n" +
-				"Hello world,2,Hallo Welt wie,3,1,3\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testProjectCrossOnATupleInput2() throws Exception {
-		/*
-		 * project cross on a tuple input 2
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple6<String, String, Long, Long, Long,Integer>> crossDs = ds.cross(ds2)
-				.projectSecond(3)
-				.projectFirst(2, 1)
-				.projectSecond(4,1)
-				.projectFirst(0);
-
-		List<Tuple6<String, String, Long, Long, Long, Integer>> result = crossDs.collect();
-
-		String expected = "Hallo,Hi,1,1,1,1\n" +
-				"Hallo Welt,Hi,1,2,2,1\n" +
-				"Hallo Welt wie,Hi,1,1,3,1\n" +
-				"Hallo,Hello,2,1,1,2\n" +
-				"Hallo Welt,Hello,2,2,2,2\n" +
-				"Hallo Welt wie,Hello,2,1,3,2\n" +
-				"Hallo,Hello world,2,1,1,3\n" +
-				"Hallo Welt,Hello world,2,2,2,3\n" +
-				"Hallo Welt wie,Hello world,2,1,3,3\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfDefaultCross() throws Exception {
-		/*
-		 * check correctness of default cross
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> crossDs = ds.cross(ds2);
-
-		List<Tuple2<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>>> result = crossDs.collect();
-
-		String expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n"
-				+
-				"(1,1,Hi),(1,1,0,Hallo,1)\n" +
-				"(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n" +
-				"(2,2,Hello),(2,2,1,Hallo Welt,2)\n" +
-				"(2,2,Hello),(1,1,0,Hallo,1)\n" +
-				"(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n" +
-				"(3,2,Hello world),(2,2,1,Hallo Welt,2)\n" +
-				"(3,2,Hello world),(1,1,0,Hallo,1)\n" +
-				"(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfCrossOnTwoCustomTypeInputs() throws Exception {
-		/*
-		 * check correctness of cross on two custom type inputs
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<CustomType> ds = CollectionDataSets.getSmallCustomTypeDataSet(env);
-		DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
-		DataSet<CustomType> crossDs = ds.cross(ds2).with(new CustomTypeCross());
-
-		List<CustomType> result = crossDs.collect();
-
-		String expected = "1,0,HiHi\n"
-				+ "2,1,HiHello\n"
-				+ "2,2,HiHello world\n"
-				+ "2,1,HelloHi\n"
-				+ "4,2,HelloHello\n"
-				+ "4,3,HelloHello world\n"
-				+ "2,2,Hello worldHi\n"
-				+ "4,3,Hello worldHello\n"
-				+ "4,4,Hello worldHello world";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfCrossATupleInputAndACustomTypeInput() throws Exception {
-		/*
-		 * check correctness of cross a tuple input and a custom type input
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> crossDs = ds.cross(ds2).with(new MixedCross());
-
-		List<Tuple3<Integer, Long, String>> result = crossDs.collect();
-
-		String expected = "2,0,HalloHi\n" +
-				"3,0,HalloHello\n" +
-				"3,0,HalloHello world\n" +
-				"3,0,Hallo WeltHi\n" +
-				"4,1,Hallo WeltHello\n" +
-				"4,2,Hallo WeltHello world\n" +
-				"3,0,Hallo Welt wieHi\n" +
-				"4,2,Hallo Welt wieHello\n" +
-				"4,4,Hallo Welt wieHello world\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class Tuple5Cross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> {
-
-		private static final long serialVersionUID = 1L;
-
-
-		@Override
-		public Tuple2<Integer, String> cross(
-				Tuple5<Integer, Long, Integer, String, Long> first,
-				Tuple5<Integer, Long, Integer, String, Long> second)
-				throws Exception {
-
-				return new Tuple2<Integer, String>(first.f2+second.f2, first.f3+second.f3);
-		}
-
-	}
-
-	public static class CustomTypeCross implements CrossFunction<CustomType, CustomType, CustomType> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public CustomType cross(CustomType first, CustomType second)
-				throws Exception {
-
-			return new CustomType(first.myInt * second.myInt, first.myLong + second.myLong, first.myString + second.myString);
-		}
-
-	}
-
-	public static class MixedCross implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple3<Integer, Long, String> cross(
-				Tuple5<Integer, Long, Integer, String, Long> first,
-				CustomType second) throws Exception {
-
-			return new Tuple3<Integer, Long, String>(first.f0 + second.myInt, first.f2 * second.myLong, first.f3 + second.myString);
-		}
-
-	}
-
-
-	public static class Tuple3ReturnLeft implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple3<Integer, Long, String> cross(
-				Tuple3<Integer, Long, String> first,
-				Tuple5<Integer, Long, Integer, String, Long> second) throws Exception {
-
-			return first;
-		}
-	}
-
-	public static class Tuple5ReturnRight implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple5<Integer, Long, Integer, String, Long> cross(
-				Tuple3<Integer, Long, String> first,
-				Tuple5<Integer, Long, Integer, String, Long> second)
-				throws Exception {
-
-			return second;
-		}
-
-
-	}
-
-	public static class Tuple5CrossBC extends RichCrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		private int broadcast = 42;
-
-		@Override
-		public void open(Configuration config) {
-
-			Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
-			int sum = 0;
-			for(Integer i : ints) {
-				sum += i;
-			}
-			broadcast = sum;
-
-		}
-
-		@Override
-		public Tuple3<Integer, Integer, Integer> cross(
-				Tuple5<Integer, Long, Integer, String, Long> first,
-				Tuple5<Integer, Long, Integer, String, Long> second)
-				throws Exception {
-
-			return new Tuple3<Integer, Integer, Integer>(first.f0 + second.f0, first.f2 * second.f2, broadcast);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
deleted file mode 100644
index 24a09ba..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.test.util.TestEnvironment;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.fail;
-
-@SuppressWarnings("serial")
-public class CustomDistributionITCase extends TestLogger {
-
-	// ------------------------------------------------------------------------
-	//  The mini cluster that is shared across tests
-	// ------------------------------------------------------------------------
-
-	private static LocalFlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void setup() throws Exception {
-		cluster = TestBaseUtils.startCluster(1, 8, false, false, true);
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-	}
-
-	@Before
-	public void prepare() {
-		TestEnvironment clusterEnv = new TestEnvironment(cluster, 1, false);
-		clusterEnv.setAsContext();
-	}
-
-	@After
-	public void cleanup() {
-		TestEnvironment.unsetAsContext();
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Test the record partitioned rightly with one field according to the customized data distribution
-	 */
-	@Test
-	public void testPartitionWithDistribution1() throws Exception {
-		final TestDataDist1 dist = new TestDataDist1();
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(dist.getParallelism());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		DataSet<Boolean> result = DataSetUtils
-			.partitionByRange(input, dist, 0)
-			.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
-
-				@Override
-				public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
-					int pIdx = getRuntimeContext().getIndexOfThisSubtask();
-
-					for (Tuple3<Integer, Long, String> s : values) {
-						boolean correctlyPartitioned = true;
-						if (pIdx == 0) {
-							Integer[] upper = dist.boundaries[0];
-							if (s.f0.compareTo(upper[0]) > 0) {
-								correctlyPartitioned = false;
-							}
-						}
-						else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) {
-							Integer[] lower = dist.boundaries[pIdx - 1];
-							Integer[] upper = dist.boundaries[pIdx];
-							if (s.f0.compareTo(upper[0]) > 0 || (s.f0.compareTo(lower[0]) <= 0)) {
-								correctlyPartitioned = false;
-							}
-						}
-						else {
-							Integer[] lower = dist.boundaries[pIdx - 1];
-							if ((s.f0.compareTo(lower[0]) <= 0)) {
-								correctlyPartitioned = false;
-							}
-						}
-
-						if (!correctlyPartitioned) {
-							fail("Record was not correctly partitioned: " + s.toString());
-						}
-					}
-				}
-			}
-			);
-
-		result.output(new DiscardingOutputFormat<Boolean>());
-		env.execute();
-	}
-
-	/**
-	 * Test the record partitioned rightly with two fields according to the customized data distribution
-	 */
-	@Test
-	public void testRangeWithDistribution2() throws Exception {
-		final TestDataDist2 dist = new TestDataDist2();
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(dist.getParallelism());
-
-		DataSet<Tuple3<Integer, Integer, String>> input = env.fromElements(
-						new Tuple3<>(1, 5, "Hi"),
-						new Tuple3<>(1, 6, "Hi"),
-						new Tuple3<>(1, 7, "Hi"),
-						new Tuple3<>(1, 11, "Hello"),
-						new Tuple3<>(2, 3, "World"),
-						new Tuple3<>(2, 4, "World"),
-						new Tuple3<>(2, 5, "World"),
-						new Tuple3<>(2, 13, "Hello World"),
-						new Tuple3<>(3, 8, "Say"),
-						new Tuple3<>(4, 0, "Why"),
-						new Tuple3<>(4, 2, "Java"),
-						new Tuple3<>(4, 11, "Say Hello"),
-						new Tuple3<>(5, 1, "Hi Java!"),
-						new Tuple3<>(5, 2, "Hi Java?"),
-						new Tuple3<>(5, 3, "Hi Java again")
-			);
-
-		DataSet<Boolean> result = DataSetUtils
-			.partitionByRange(input, dist, 0, 1)
-			.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
-
-				@Override
-				public void mapPartition(Iterable<Tuple3<Integer, Integer, String>> values, Collector<Boolean> out) throws Exception {
-					int pIdx = getRuntimeContext().getIndexOfThisSubtask();
-					boolean correctlyPartitioned = true;
-
-					for (Tuple3<Integer, Integer, String> s : values) {
-
-						if (pIdx == 0) {
-							Integer[] upper = dist.boundaries[0];
-							if (s.f0.compareTo(upper[0]) > 0 ||
-								(s.f0.compareTo(upper[0]) == 0 && s.f1.compareTo(upper[1]) > 0)) {
-								correctlyPartitioned = false;
-							}
-						}
-						else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) {
-							Integer[] lower = dist.boundaries[pIdx - 1];
-							Integer[] upper = dist.boundaries[pIdx];
-
-							if (s.f0.compareTo(upper[0]) > 0 ||
-								(s.f0.compareTo(upper[0]) == 0 && s.f1.compareTo(upper[1]) > 0) ||
-								(s.f0.compareTo(lower[0]) < 0) ||
-								(s.f0.compareTo(lower[0]) == 0 && s.f1.compareTo(lower[1]) <= 0)) {
-								correctlyPartitioned = false;
-							}
-						}
-						else {
-							Integer[] lower = dist.boundaries[pIdx - 1];
-							if ((s.f0.compareTo(lower[0]) < 0) ||
-								(s.f0.compareTo(lower[0]) == 0 && s.f1.compareTo(lower[1]) <= 0)) {
-								correctlyPartitioned = false;
-							}
-						}
-
-						if (!correctlyPartitioned) {
-							fail("Record was not correctly partitioned: " + s.toString());
-						}
-					}
-				}
-			}
-			);
-
-		result.output(new DiscardingOutputFormat<Boolean>());
-		env.execute();
-	}
-
-	/*
-	 * Test the number of partition keys less than the number of distribution fields
-	 */
-	@Test
-	public void testPartitionKeyLessDistribution() throws Exception {
-		final TestDataDist2 dist = new TestDataDist2();
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(dist.getParallelism());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		DataSet<Boolean> result = DataSetUtils
-			.partitionByRange(input, dist, 0)
-			.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
-
-				@Override
-				public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
-					int pIdx = getRuntimeContext().getIndexOfThisSubtask();
-
-					for (Tuple3<Integer, Long, String> s : values) {
-						boolean correctlyPartitioned = true;
-						if (pIdx == 0) {
-							Integer[] upper = dist.boundaries[0];
-							if (s.f0.compareTo(upper[0]) > 0) {
-								correctlyPartitioned = false;
-							}
-						}
-						else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) {
-							Integer[] lower = dist.boundaries[pIdx - 1];
-							Integer[] upper = dist.boundaries[pIdx];
-							if (s.f0.compareTo(upper[0]) > 0 || (s.f0.compareTo(lower[0]) <= 0)) {
-								correctlyPartitioned = false;
-							}
-						}
-						else {
-							Integer[] lower = dist.boundaries[pIdx - 1];
-							if ((s.f0.compareTo(lower[0]) <= 0)) {
-								correctlyPartitioned = false;
-							}
-						}
-
-						if (!correctlyPartitioned) {
-							fail("Record was not correctly partitioned: " + s.toString());
-						}
-					}
-				}
-			}
-			);
-
-		result.output(new DiscardingOutputFormat<Boolean>());
-		env.execute();
-	}
-
-	/*
-	 * Test the number of partition keys larger than the number of distribution fields
-	 */
-	@Test(expected = IllegalArgumentException.class)
-	public void testPartitionMoreThanDistribution() throws Exception {
-		final TestDataDist2 dist = new TestDataDist2();
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		DataSetUtils.partitionByRange(input, dist, 0, 1, 2);
-	}
-	
-	/**
-	 * The class is used to do the tests of range partition with one key.
-	 */
-	public static class TestDataDist1 implements DataDistribution {
-
-		public Integer boundaries[][] = new Integer[][]{
-			new Integer[]{4},
-			new Integer[]{9},
-			new Integer[]{13},
-			new Integer[]{18}
-		};
-
-		public TestDataDist1() {}
-
-		public int getParallelism() {
-			return boundaries.length;
-		}
-
-		@Override
-		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
-			return boundaries[bucketNum];
-		}
-
-		@Override
-		public int getNumberOfFields() {
-			return 1;
-		}
-
-		@Override
-		public TypeInformation[] getKeyTypes() {
-			return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO};
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {}
-
-		@Override
-		public void read(DataInputView in) throws IOException {}
-	}
-
-	/**
-	 * The class is used to do the tests of range partition with two keys.
-	 */
-	public static class TestDataDist2 implements DataDistribution {
-
-		public Integer boundaries[][] = new Integer[][]{
-			new Integer[]{1, 6},
-			new Integer[]{2, 4},
-			new Integer[]{3, 9},
-			new Integer[]{4, 1},
-			new Integer[]{5, 2}
-		};
-
-		public TestDataDist2() {}
-
-		public int getParallelism() {
-			return boundaries.length;
-		}
-		
-		@Override
-		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
-			return boundaries[bucketNum];
-		}
-
-		@Override
-		public int getNumberOfFields() {
-			return 2;
-		}
-
-		@Override
-		public TypeInformation[] getKeyTypes() {
-			return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {}
-
-		@Override
-		public void read(DataInputView in) throws IOException {}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
deleted file mode 100644
index fb62ccd..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSinkITCase.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.BufferedReader;
-import java.util.Random;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for data sinks
- */
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class DataSinkITCase extends MultipleProgramsTestBase {
-
-	public DataSinkITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	private String resultPath;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@Test
-	public void testIntSortingParallelism1() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env);
-		ds.writeAsText(resultPath).sortLocalOutput("*", Order.DESCENDING).setParallelism(1);
-
-		env.execute();
-
-		String expected = "5\n5\n5\n5\n5\n4\n4\n4\n4\n3\n3\n3\n2\n2\n1\n";
-		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
-
-	}
-
-	@Test
-	public void testStringSortingParallelism1() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
-		ds.writeAsText(resultPath).sortLocalOutput("*", Order.ASCENDING).setParallelism(1);
-
-		env.execute();
-
-		String expected = "Hello\n" +
-				"Hello world\n" +
-				"Hello world, how are you?\n" +
-				"Hi\n" +
-				"I am fine.\n" +
-				"LOL\n" +
-				"Luke Skywalker\n" +
-				"Random comment\n";
-
-		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
-	}
-
-	@Test
-	public void testTupleSortingSingleAscParallelism1() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		ds.writeAsCsv(resultPath).sortLocalOutput(0, Order.ASCENDING).setParallelism(1);
-
-		env.execute();
-
-		String expected = "1,1,Hi\n" +
-				"2,2,Hello\n" +
-				"3,2,Hello world\n" +
-				"4,3,Hello world, how are you?\n" +
-				"5,3,I am fine.\n" +
-				"6,3,Luke Skywalker\n" +
-				"7,4,Comment#1\n" +
-				"8,4,Comment#2\n" +
-				"9,4,Comment#3\n" +
-				"10,4,Comment#4\n" +
-				"11,5,Comment#5\n" +
-				"12,5,Comment#6\n" +
-				"13,5,Comment#7\n" +
-				"14,5,Comment#8\n" +
-				"15,5,Comment#9\n" +
-				"16,6,Comment#10\n" +
-				"17,6,Comment#11\n" +
-				"18,6,Comment#12\n" +
-				"19,6,Comment#13\n" +
-				"20,6,Comment#14\n" +
-				"21,6,Comment#15\n";
-
-		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
-	}
-
-	@Test
-	public void testTupleSortingSingleDescParallelism1() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		ds.writeAsCsv(resultPath).sortLocalOutput(0, Order.DESCENDING).setParallelism(1);
-
-		env.execute();
-
-		String expected = "21,6,Comment#15\n" +
-				"20,6,Comment#14\n" +
-				"19,6,Comment#13\n" +
-				"18,6,Comment#12\n" +
-				"17,6,Comment#11\n" +
-				"16,6,Comment#10\n" +
-				"15,5,Comment#9\n" +
-				"14,5,Comment#8\n" +
-				"13,5,Comment#7\n" +
-				"12,5,Comment#6\n" +
-				"11,5,Comment#5\n" +
-				"10,4,Comment#4\n" +
-				"9,4,Comment#3\n" +
-				"8,4,Comment#2\n" +
-				"7,4,Comment#1\n" +
-				"6,3,Luke Skywalker\n" +
-				"5,3,I am fine.\n" +
-				"4,3,Hello world, how are you?\n" +
-				"3,2,Hello world\n" +
-				"2,2,Hello\n" +
-				"1,1,Hi\n";
-
-		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
-	}
-
-	@Test
-	public void testTupleSortingDualParallelism1() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		ds.writeAsCsv(resultPath)
-			.sortLocalOutput(1, Order.DESCENDING).sortLocalOutput(0, Order.ASCENDING)
-			.setParallelism(1);
-
-		env.execute();
-
-		String expected = "16,6,Comment#10\n" +
-				"17,6,Comment#11\n" +
-				"18,6,Comment#12\n" +
-				"19,6,Comment#13\n" +
-				"20,6,Comment#14\n" +
-				"21,6,Comment#15\n" +
-				"11,5,Comment#5\n" +
-				"12,5,Comment#6\n" +
-				"13,5,Comment#7\n" +
-				"14,5,Comment#8\n" +
-				"15,5,Comment#9\n" +
-				"7,4,Comment#1\n" +
-				"8,4,Comment#2\n" +
-				"9,4,Comment#3\n" +
-				"10,4,Comment#4\n" +
-				"4,3,Hello world, how are you?\n" +
-				"5,3,I am fine.\n" +
-				"6,3,Luke Skywalker\n" +
-				"2,2,Hello\n" +
-				"3,2,Hello world\n" +
-				"1,1,Hi\n";
-
-		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
-	}
-
-	@Test
-	public void testTupleSortingNestedParallelism1() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds =
-				CollectionDataSets.getGroupSortedNestedTupleDataSet2(env);
-		ds.writeAsText(resultPath)
-			.sortLocalOutput("f0.f1", Order.ASCENDING)
-			.sortLocalOutput("f1", Order.DESCENDING)
-			.setParallelism(1);
-
-		env.execute();
-
-		String expected =
-				"((2,1),a,3)\n" +
-				"((2,2),b,4)\n" +
-				"((1,2),a,1)\n" +
-				"((3,3),c,5)\n" +
-				"((1,3),a,2)\n" +
-				"((3,6),c,6)\n" +
-				"((4,9),c,7)\n";
-
-		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
-	}
-
-	@Test
-	public void testTupleSortingNestedParallelism1_2() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds =
-				CollectionDataSets.getGroupSortedNestedTupleDataSet2(env);
-		ds.writeAsText(resultPath)
-			.sortLocalOutput(1, Order.ASCENDING)
-			.sortLocalOutput(2, Order.DESCENDING)
-			.setParallelism(1);
-
-		env.execute();
-
-		String expected =
-				"((2,1),a,3)\n" +
-				"((1,3),a,2)\n" +
-				"((1,2),a,1)\n" +
-				"((2,2),b,4)\n" +
-				"((4,9),c,7)\n" +
-				"((3,6),c,6)\n" +
-				"((3,3),c,5)\n";
-
-		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
-	}
-
-	@Test
-	public void testPojoSortingSingleParallelism1() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
-		ds.writeAsText(resultPath).sortLocalOutput("number", Order.ASCENDING).setParallelism(1);
-
-		env.execute();
-
-		String expected = "1 First (10,100,1000,One) 10100\n" +
-				"2 First_ (10,105,1000,One) 10200\n" +
-				"3 First (11,102,3000,One) 10200\n" +
-				"4 First_ (11,106,1000,One) 10300\n" +
-				"5 First (11,102,2000,One) 10100\n" +
-				"6 Second_ (20,200,2000,Two) 10100\n" +
-				"7 Third (31,301,2000,Three) 10200\n" +
-				"8 Third_ (30,300,1000,Three) 10100\n";
-
-		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
-	}
-
-	@Test
-	public void testPojoSortingDualParallelism1() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
-		ds.writeAsText(resultPath)
-			.sortLocalOutput("str", Order.ASCENDING)
-			.sortLocalOutput("number", Order.DESCENDING)
-			.setParallelism(1);
-
-		env.execute();
-
-		String expected =
-				"5 First (11,102,2000,One) 10100\n" +
-				"3 First (11,102,3000,One) 10200\n" +
-				"1 First (10,100,1000,One) 10100\n" +
-				"4 First_ (11,106,1000,One) 10300\n" +
-				"2 First_ (10,105,1000,One) 10200\n" +
-				"6 Second_ (20,200,2000,Two) 10100\n" +
-				"7 Third (31,301,2000,Three) 10200\n" +
-				"8 Third_ (30,300,1000,Three) 10100\n";
-
-		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
-
-	}
-
-	@Test
-	public void testPojoSortingNestedParallelism1() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getMixedPojoDataSet(env);
-		ds.writeAsText(resultPath)
-			.sortLocalOutput("nestedTupleWithCustom.f0", Order.ASCENDING)
-			.sortLocalOutput("nestedTupleWithCustom.f1.myInt", Order.DESCENDING)
-			.sortLocalOutput("nestedPojo.longNumber", Order.ASCENDING)
-			.setParallelism(1);
-
-		env.execute();
-
-		String expected =
-				"2 First_ (10,105,1000,One) 10200\n" +
-				"1 First (10,100,1000,One) 10100\n" +
-				"4 First_ (11,106,1000,One) 10300\n" +
-				"5 First (11,102,2000,One) 10100\n" +
-				"3 First (11,102,3000,One) 10200\n" +
-				"6 Second_ (20,200,2000,Two) 10100\n" +
-				"8 Third_ (30,300,1000,Three) 10100\n" +
-				"7 Third (31,301,2000,Three) 10200\n";
-
-		compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
-	}
-
-	@Test
-	public void testSortingParallelism4() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Long> ds = env.generateSequence(0, 1000);
-		// randomize
-		ds.map(new MapFunction<Long, Long>() {
-
-			Random rand = new Random(1234L);
-			@Override
-			public Long map(Long value) throws Exception {
-				return rand.nextLong();
-			}
-		}).writeAsText(resultPath)
-			.sortLocalOutput("*", Order.ASCENDING)
-			.setParallelism(4);
-
-		env.execute();
-
-		BufferedReader[] resReaders = getResultReader(resultPath);
-		for (BufferedReader br : resReaders) {
-			long cmp = Long.MIN_VALUE;
-			while(br.ready()) {
-				long cur = Long.parseLong(br.readLine());
-				assertTrue("Invalid order of sorted output", cmp <= cur);
-				cmp = cur;
-			}
-			br.close();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
deleted file mode 100644
index aa40754..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DataSourceITCase.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.TextInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.Assert;
-
-/**
- * Tests for the DataSource
- */
-public class DataSourceITCase extends JavaProgramTestBase {
-
-	private String inputPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		inputPath = createTempFile("input", "ab\n"
-				+ "cd\n"
-				+ "ef\n");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		/*
-		 * Test passing a configuration object to an input format
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Configuration ifConf = new Configuration();
-		ifConf.setString("prepend", "test");
-
-		DataSet<String> ds = env.createInput(new TestInputFormat(new Path(inputPath))).withParameters(ifConf);
-		List<String> result = ds.collect();
-
-		String expectedResult = "ab\n"
-				+ "cd\n"
-				+ "ef\n";
-
-		compareResultAsText(result, expectedResult);
-	}
-
-	private static class TestInputFormat extends TextInputFormat {
-		private static final long serialVersionUID = 1L;
-
-		public TestInputFormat(Path filePath) {
-			super(filePath);
-		}
-
-		@Override
-		public void configure(Configuration parameters) {
-			super.configure(parameters);
-
-			Assert.assertNotNull(parameters.getString("prepend", null));
-			Assert.assertEquals("test", parameters.getString("prepend", null));
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
deleted file mode 100644
index 6e30fac..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators;
-
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@SuppressWarnings("serial")
-@RunWith(Parameterized.class)
-public class DistinctITCase extends MultipleProgramsTestBase {
-
-	public DistinctITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test
-	public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelector() throws Exception {
-		/*
-		 * check correctness of distinct on tuples with key field selector
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct(0, 1, 2);
-
-		List<Tuple3<Integer, Long, String>> result = distinctDs.collect();
-
-		String expected = "1,1,Hi\n" +
-				"2,2,Hello\n" +
-				"3,2,Hello world\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorWithNotAllFieldsSelected() throws Exception{
-		/*
-		 * check correctness of distinct on tuples with key field selector with not all fields selected
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple1<Integer>> distinctDs = ds.union(ds).distinct(0).project(0);
-
-		List<Tuple1<Integer>> result = distinctDs.collect();
-
-		String expected = "1\n" +
-				"2\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfDistinctOnTuplesWithKeyExtractorFunction() throws Exception {
-		/*
-		 * check correctness of distinct on tuples with key extractor function
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple1<Integer>> reduceDs = ds.union(ds).distinct(new KeySelector1()).project(0);
-
-		List<Tuple1<Integer>> result = reduceDs.collect();
-
-		String expected = "1\n" + "2\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class KeySelector1 implements KeySelector<Tuple5<Integer, Long,  Integer, String, Long>, Integer> {
-		private static final long serialVersionUID = 1L;
-		@Override
-		public Integer getKey(Tuple5<Integer, Long,  Integer, String, Long> in) {
-			return in.f0;
-		}
-	}
-
-	@Test
-	public void testCorrectnessOfDistinctOnCustomTypeWithTypeExtractor() throws Exception {
-		/*
-		 * check correctness of distinct on custom type with type extractor
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
-		DataSet<Tuple1<Integer>> reduceDs = ds
-				.distinct(new KeySelector3())
-				.map(new Mapper3());
-
-		List<Tuple1<Integer>> result = reduceDs.collect();
-
-		String expected = "1\n" +
-				"2\n" +
-				"3\n" +
-				"4\n" +
-				"5\n" +
-				"6\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class Mapper3 extends RichMapFunction<CustomType, Tuple1<Integer>> {
-		@Override
-		public Tuple1<Integer> map(CustomType value) throws Exception {
-			return new Tuple1<Integer>(value.myInt);
-		}
-	}
-
-	public static class KeySelector3 implements KeySelector<CustomType, Integer> {
-		private static final long serialVersionUID = 1L;
-		@Override
-		public Integer getKey(CustomType in) {
-			return in.myInt;
-		}
-	}
-
-	@Test
-	public void testCorrectnessOfDistinctOnTuples() throws Exception{
-		/*
-		 * check correctness of distinct on tuples
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct();
-
-		List<Tuple3<Integer, Long, String>> result = distinctDs.collect();
-
-		String expected = "1,1,Hi\n" +
-				"2,2,Hello\n" +
-				"3,2,Hello world\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor() throws Exception{
-		/*
-		 * check correctness of distinct on custom type with tuple-returning type extractor
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-		DataSet<Tuple2<Integer, Long>> reduceDs = ds
-				.distinct(new KeySelector2())
-				.project(0,4);
-
-		List<Tuple2<Integer, Long>> result = reduceDs.collect();
-
-		String expected = "1,1\n" +
-				"2,1\n" +
-				"2,2\n" +
-				"3,2\n" +
-				"3,3\n" +
-				"4,1\n" +
-				"4,2\n" +
-				"5,1\n" +
-				"5,2\n" +
-				"5,3\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	public static class KeySelector2 implements KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>> {
-		private static final long serialVersionUID = 1L;
-		@Override
-		public Tuple2<Integer,Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
-			return new Tuple2<Integer, Long>(t.f0, t.f4);
-		}
-	}
-
-	@Test
-	public void testCorrectnessOfDistinctOnTuplesWithFieldExpressions() throws Exception {
-		/*
-		 * check correctness of distinct on tuples with field expressions
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-		DataSet<Tuple1<Integer>> reduceDs = ds.union(ds).distinct("f0").project(0);
-
-		List<Tuple1<Integer>> result = reduceDs.collect();
-
-		String expected = "1\n" +
-				"2\n";
-
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfDistinctOnPojos() throws Exception {
-		/*
-		 * check correctness of distinct on Pojos
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
-		DataSet<Integer> reduceDs = ds.distinct("nestedPojo.longNumber").map(new Mapper2());
-
-		List<Integer> result = reduceDs.collect();
-
-		String expected = "10000\n20000\n30000\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class Mapper2 implements MapFunction<CollectionDataSets.POJO, Integer> {
-		@Override
-		public Integer map(POJO value) throws Exception {
-			return (int) value.nestedPojo.longNumber;
-		}
-	}
-
-	@Test
-	public void testDistinctOnFullPojo() throws Exception {
-		/*
-		 * distinct on full Pojo
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
-		DataSet<Integer> reduceDs = ds.distinct().map(new Mapper1());
-
-		List<Integer> result = reduceDs.collect();
-
-		String expected = "10000\n20000\n30000\n";
-
-		compareResultAsText(result, expected);
-	}
-
-	public static class Mapper1 implements MapFunction<CollectionDataSets.POJO, Integer> {
-		@Override
-		public Integer map(POJO value) throws Exception {
-			return (int) value.nestedPojo.longNumber;
-		}
-	}
-
-	@Test
-	public void testCorrectnessOfDistinctOnAtomic() throws Exception {
-		/*
-		 * check correctness of distinct on Integers
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env);
-		DataSet<Integer> reduceDs = ds.distinct();
-
-		List<Integer> result = reduceDs.collect();
-
-		String expected = "1\n2\n3\n4\n5";
-
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testCorrectnessOfDistinctOnAtomicWithSelectAllChar() throws Exception {
-		/*
-		 * check correctness of distinct on Strings, using Keys.ExpressionKeys.SELECT_ALL_CHAR
-		 */
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
-		DataSet<String> reduceDs = ds.union(ds).distinct("*");
-
-		List<String> result = reduceDs.collect();
-
-		String expected = "I am fine.\n" +
-				"Luke Skywalker\n" +
-				"LOL\n" +
-				"Hello world, how are you?\n" +
-				"Hi\n" +
-				"Hello world\n" +
-				"Hello\n" +
-				"Random comment\n";
-
-		compareResultAsText(result, expected);
-	}
-}