You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/12 23:44:11 UTC

[11/22] flink git commit: [FLINK-6731] [tests] Activate strict checkstyle for flink-tests

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
deleted file mode 100644
index ba48e12..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ /dev/null
@@ -1,725 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.util;
-
-import java.io.File;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.hadoop.io.IntWritable;
-
-import scala.math.BigInt;
-
-/**
- * #######################################################################################################
- * 
- * 			BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. 
- * 			IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING!
- * 
- * #######################################################################################################
- */
-public class CollectionDataSets {
-
-	public static DataSet<Tuple3<Integer, Long, String>> get3TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
-		data.add(new Tuple3<>(1, 1L, "Hi"));
-		data.add(new Tuple3<>(2, 2L, "Hello"));
-		data.add(new Tuple3<>(3, 2L, "Hello world"));
-		data.add(new Tuple3<>(4, 3L, "Hello world, how are you?"));
-		data.add(new Tuple3<>(5, 3L, "I am fine."));
-		data.add(new Tuple3<>(6, 3L, "Luke Skywalker"));
-		data.add(new Tuple3<>(7, 4L, "Comment#1"));
-		data.add(new Tuple3<>(8, 4L, "Comment#2"));
-		data.add(new Tuple3<>(9, 4L, "Comment#3"));
-		data.add(new Tuple3<>(10, 4L, "Comment#4"));
-		data.add(new Tuple3<>(11, 5L, "Comment#5"));
-		data.add(new Tuple3<>(12, 5L, "Comment#6"));
-		data.add(new Tuple3<>(13, 5L, "Comment#7"));
-		data.add(new Tuple3<>(14, 5L, "Comment#8"));
-		data.add(new Tuple3<>(15, 5L, "Comment#9"));
-		data.add(new Tuple3<>(16, 6L, "Comment#10"));
-		data.add(new Tuple3<>(17, 6L, "Comment#11"));
-		data.add(new Tuple3<>(18, 6L, "Comment#12"));
-		data.add(new Tuple3<>(19, 6L, "Comment#13"));
-		data.add(new Tuple3<>(20, 6L, "Comment#14"));
-		data.add(new Tuple3<>(21, 6L, "Comment#15"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
-		data.add(new Tuple3<>(1, 1L, "Hi"));
-		data.add(new Tuple3<>(2, 2L, "Hello"));
-		data.add(new Tuple3<>(3, 2L, "Hello world"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
-		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
-		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
-		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
-		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
-		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
-		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
-		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
-		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
-		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
-		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
-		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
-		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
-		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
-		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
-		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
-
-		Collections.shuffle(data);
-
-		TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new TupleTypeInfo<>(
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO,
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-
-	public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> getSmall5TupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
-		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
-		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
-		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
-
-		Collections.shuffle(data);
-
-		TupleTypeInfo<Tuple5<Integer, Long, Integer, String, Long>> type = new TupleTypeInfo<>(
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO,
-				BasicTypeInfo.INT_TYPE_INFO,
-				BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.LONG_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-
-	public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getSmallNestedTupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple2<Tuple2<Integer, Integer>, String>> data = new ArrayList<>();
-		data.add(new Tuple2<>(new Tuple2<>(1, 1), "one"));
-		data.add(new Tuple2<>(new Tuple2<>(2, 2), "two"));
-		data.add(new Tuple2<>(new Tuple2<>(3, 3), "three"));
-
-		TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new TupleTypeInfo<>(
-				new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
-				BasicTypeInfo.STRING_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-
-	public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) {
-
-		List<Tuple2<Tuple2<Integer, Integer>, String>> data = new ArrayList<>();
-		data.add(new Tuple2<>(new Tuple2<>(1, 3), "a"));
-		data.add(new Tuple2<>(new Tuple2<>(1, 2), "a"));
-		data.add(new Tuple2<>(new Tuple2<>(2, 1), "a"));
-		data.add(new Tuple2<>(new Tuple2<>(2, 2), "b"));
-		data.add(new Tuple2<>(new Tuple2<>(3, 3), "c"));
-		data.add(new Tuple2<>(new Tuple2<>(3, 6), "c"));
-		data.add(new Tuple2<>(new Tuple2<>(4, 9), "c"));
-
-		TupleTypeInfo<Tuple2<Tuple2<Integer, Integer>, String>> type = new TupleTypeInfo<>(
-				new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
-				BasicTypeInfo.STRING_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-
-	public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) {
-
-		List<Tuple3<Tuple2<Integer, Integer>, String, Integer>> data = new ArrayList<>();
-		data.add(new Tuple3<>(new Tuple2<>(1, 3), "a", 2));
-		data.add(new Tuple3<>(new Tuple2<>(1, 2), "a", 1));
-		data.add(new Tuple3<>(new Tuple2<>(2, 1), "a", 3));
-		data.add(new Tuple3<>(new Tuple2<>(2, 2), "b", 4));
-		data.add(new Tuple3<>(new Tuple2<>(3, 3), "c", 5));
-		data.add(new Tuple3<>(new Tuple2<>(3, 6), "c", 6));
-		data.add(new Tuple3<>(new Tuple2<>(4, 9), "c", 7));
-
-		TupleTypeInfo<Tuple3<Tuple2<Integer, Integer>, String, Integer>> type = new TupleTypeInfo<>(
-				new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
-				BasicTypeInfo.STRING_TYPE_INFO,
-				BasicTypeInfo.INT_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-	
-	public static DataSet<Tuple2<byte[], Integer>> getTuple2WithByteArrayDataSet(ExecutionEnvironment env) {
-		List<Tuple2<byte[], Integer>> data = new ArrayList<>();
-		data.add(new Tuple2<>(new byte[]{0, 4}, 1));
-		data.add(new Tuple2<>(new byte[]{2, 0}, 1));
-		data.add(new Tuple2<>(new byte[]{2, 0, 4}, 4));
-		data.add(new Tuple2<>(new byte[]{2, 1}, 3));
-		data.add(new Tuple2<>(new byte[]{0}, 0));
-		data.add(new Tuple2<>(new byte[]{2, 0}, 1));
-				
-		TupleTypeInfo<Tuple2<byte[], Integer>> type = new TupleTypeInfo<>(
-				PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
-				BasicTypeInfo.INT_TYPE_INFO
-		);
-		
-		return env.fromCollection(data, type);
-	}
-
-	public static DataSet<String> getStringDataSet(ExecutionEnvironment env) {
-
-		List<String> data = new ArrayList<>();
-		data.add("Hi");
-		data.add("Hello");
-		data.add("Hello world");
-		data.add("Hello world, how are you?");
-		data.add("I am fine.");
-		data.add("Luke Skywalker");
-		data.add("Random comment");
-		data.add("LOL");
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<Integer> getIntegerDataSet(ExecutionEnvironment env) {
-
-		List<Integer> data = new ArrayList<>();
-		data.add(1);
-		data.add(2);
-		data.add(2);
-		data.add(3);
-		data.add(3);
-		data.add(3);
-		data.add(4);
-		data.add(4);
-		data.add(4);
-		data.add(4);
-		data.add(5);
-		data.add(5);
-		data.add(5);
-		data.add(5);
-		data.add(5);
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env) {
-
-		List<CustomType> data = new ArrayList<>();
-		data.add(new CustomType(1, 0L, "Hi"));
-		data.add(new CustomType(2, 1L, "Hello"));
-		data.add(new CustomType(2, 2L, "Hello world"));
-		data.add(new CustomType(3, 3L, "Hello world, how are you?"));
-		data.add(new CustomType(3, 4L, "I am fine."));
-		data.add(new CustomType(3, 5L, "Luke Skywalker"));
-		data.add(new CustomType(4, 6L, "Comment#1"));
-		data.add(new CustomType(4, 7L, "Comment#2"));
-		data.add(new CustomType(4, 8L, "Comment#3"));
-		data.add(new CustomType(4, 9L, "Comment#4"));
-		data.add(new CustomType(5, 10L, "Comment#5"));
-		data.add(new CustomType(5, 11L, "Comment#6"));
-		data.add(new CustomType(5, 12L, "Comment#7"));
-		data.add(new CustomType(5, 13L, "Comment#8"));
-		data.add(new CustomType(5, 14L, "Comment#9"));
-		data.add(new CustomType(6, 15L, "Comment#10"));
-		data.add(new CustomType(6, 16L, "Comment#11"));
-		data.add(new CustomType(6, 17L, "Comment#12"));
-		data.add(new CustomType(6, 18L, "Comment#13"));
-		data.add(new CustomType(6, 19L, "Comment#14"));
-		data.add(new CustomType(6, 20L, "Comment#15"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-
-	}
-
-	public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env) {
-
-		List<CustomType> data = new ArrayList<>();
-		data.add(new CustomType(1, 0L, "Hi"));
-		data.add(new CustomType(2, 1L, "Hello"));
-		data.add(new CustomType(2, 2L, "Hello world"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-
-	}
-
-	public static class CustomType implements Serializable {
-
-		private static final long serialVersionUID = 1L;
-
-		public int myInt;
-		public long myLong;
-		public String myString;
-
-		public CustomType() {
-		}
-
-		public CustomType(int i, long l, String s) {
-			myInt = i;
-			myLong = l;
-			myString = s;
-		}
-
-		@Override
-		public String toString() {
-			return myInt + "," + myLong + "," + myString;
-		}
-	}
-
-	public static class CustomTypeComparator implements Comparator<CustomType> {
-		@Override
-		public int compare(CustomType o1, CustomType o2) {
-			int diff = o1.myInt - o2.myInt;
-			if (diff != 0) {
-				return diff;
-			}
-			diff = (int) (o1.myLong - o2.myLong);
-			return diff != 0 ? diff : o1.myString.compareTo(o2.myString);
-		}
-
-	}
-
-	public static DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> getSmallTuplebasedDataSet(ExecutionEnvironment env) {
-		List<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> data = new ArrayList<>();
-		data.add(new Tuple7<>(1, "First", 10, 100, 1000L, "One", 10000L));
-		data.add(new Tuple7<>(2, "Second", 20, 200, 2000L, "Two", 20000L));
-		data.add(new Tuple7<>(3, "Third", 30, 300, 3000L, "Three", 30000L));
-		return env.fromCollection(data);
-	}
-	
-	public static DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) {
-		List<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> data = new ArrayList<>();
-		data.add(new Tuple7<>(10000L, 10, 100, 1000L, "One", 1, "First"));
-		data.add(new Tuple7<>(20000L, 20, 200, 2000L, "Two", 2, "Second"));
-		data.add(new Tuple7<>(30000L, 30, 300, 3000L, "Three", 3, "Third"));
-		
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env) {
-		List<POJO> data = new ArrayList<>();
-		data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L /*nestedPojo.longNumber*/));
-		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
-		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env) {
-		List<POJO> data = new ArrayList<>();
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); // 5x
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
-		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); // 2x
-		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env) {
-		List<POJO> data = new ArrayList<>();
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); // 5x
-		data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L));
-		data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L));
-		data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L));
-		data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L));
-		data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L));
-		data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 10200L)); // 2x
-		data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 10100L));
-		return env.fromCollection(data);
-	}
-
-	public static class POJO {
-		public int number;
-		public String str;
-		public Tuple2<Integer, CustomType> nestedTupleWithCustom;
-		public NestedPojo nestedPojo;
-		public transient Long ignoreMe;
-
-		public POJO(int i0, String s0,
-					int i1, int i2, long l0, String s1,
-					long l1) {
-			this.number = i0;
-			this.str = s0;
-			this.nestedTupleWithCustom = new Tuple2<>(i1, new CustomType(i2, l0, s1));
-			this.nestedPojo = new NestedPojo();
-			this.nestedPojo.longNumber = l1;
-		}
-
-		public POJO() {
-		}
-
-		@Override
-		public String toString() {
-			return number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber;
-		}
-	}
-
-	public static class NestedPojo {
-		public static Object ignoreMe;
-		public long longNumber;
-
-		public NestedPojo() {
-		}
-	}
-
-	public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env) {
-		List<CrazyNested> data = new ArrayList<>();
-		data.add(new CrazyNested("aa"));
-		data.add(new CrazyNested("bb"));
-		data.add(new CrazyNested("bb"));
-		data.add(new CrazyNested("cc"));
-		data.add(new CrazyNested("cc"));
-		data.add(new CrazyNested("cc"));
-		return env.fromCollection(data);
-	}
-
-	public static class CrazyNested {
-		public CrazyNestedL1 nest_Lvl1;
-		public Long something; // test proper null-value handling
-
-		public CrazyNested() {
-		}
-
-		public CrazyNested(String set, String second, long s) { // additional CTor to set all fields to non-null values
-			this(set);
-			something = s;
-			nest_Lvl1.a = second;
-		}
-
-		public CrazyNested(String set) {
-			nest_Lvl1 = new CrazyNestedL1();
-			nest_Lvl1.nest_Lvl2 = new CrazyNestedL2();
-			nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3();
-			nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new CrazyNestedL4();
-			nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = set;
-		}
-	}
-
-	public static class CrazyNestedL1 {
-		public String a;
-		public int b;
-		public CrazyNestedL2 nest_Lvl2;
-	}
-
-	public static class CrazyNestedL2 {
-		public CrazyNestedL3 nest_Lvl3;
-	}
-
-	public static class CrazyNestedL3 {
-		public CrazyNestedL4 nest_Lvl4;
-	}
-
-	public static class CrazyNestedL4 {
-		public String f1nal;
-	}
-
-	// Copied from TypeExtractorTest
-	public static class FromTuple extends Tuple3<String, String, Long> {
-		private static final long serialVersionUID = 1L;
-		public int special;
-	}
-
-	public static class FromTupleWithCTor extends FromTuple {
-
-		private static final long serialVersionUID = 1L;
-
-		public FromTupleWithCTor() {}
-
-		public FromTupleWithCTor(int special, long tupleField) {
-			this.special = special;
-			this.setField(tupleField, 2);
-		}
-	}
-
-	public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env) {
-		List<FromTupleWithCTor> data = new ArrayList<>();
-		data.add(new FromTupleWithCTor(1, 10L)); // 3x
-		data.add(new FromTupleWithCTor(1, 10L));
-		data.add(new FromTupleWithCTor(1, 10L));
-		data.add(new FromTupleWithCTor(2, 20L)); // 2x
-		data.add(new FromTupleWithCTor(2, 20L));
-		return env.fromCollection(data);
-	}
-
-	public static class PojoContainingTupleAndWritable {
-		public int someInt;
-		public String someString;
-		public IntWritable hadoopFan;
-		public Tuple2<Long, Long> theTuple;
-
-		public PojoContainingTupleAndWritable() {
-		}
-
-		public PojoContainingTupleAndWritable(int i, long l1, long l2) {
-			hadoopFan = new IntWritable(i);
-			someInt = i;
-			theTuple = new Tuple2<>(l1, l2);
-		}
-	}
-
-	public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env) {
-		List<PojoContainingTupleAndWritable> data = new ArrayList<>();
-		data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-		return env.fromCollection(data);
-	}
-
-
-
-	public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) {
-		List<PojoContainingTupleAndWritable> data = new ArrayList<>();
-		data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 201L));
-		data.add(new PojoContainingTupleAndWritable(2, 30L, 200L));
-		data.add(new PojoContainingTupleAndWritable(2, 30L, 600L));
-		data.add(new PojoContainingTupleAndWritable(2, 30L, 400L));
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<Tuple3<Integer, CrazyNested, POJO>> getTupleContainingPojos(ExecutionEnvironment env) {
-		List<Tuple3<Integer, CrazyNested, POJO>> data = new ArrayList<>();
-		data.add(new Tuple3<>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 3x
-		data.add(new Tuple3<>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
-		data.add(new Tuple3<>(1, new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
-		// POJO is not initialized according to the first two fields.
-		data.add(new Tuple3<>(2, new CrazyNested("two", "duo", 2L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 1x
-		return env.fromCollection(data);
-	}
-
-	public static class Pojo1 {
-		public String a;
-		public String b;
-
-		public Pojo1() {}
-
-		public Pojo1(String a, String b) {
-			this.a = a;
-			this.b = b;
-		}
-	}
-
-	public static class Pojo2 {
-		public String a2;
-		public String b2;
-	}
-
-	public static class PojoWithMultiplePojos {
-		public Pojo1 p1;
-		public Pojo2 p2;
-		public Integer i0;
-
-		public PojoWithMultiplePojos() {
-		}
-
-		public PojoWithMultiplePojos(String a, String b, String a1, String b1, Integer i0) {
-			p1 = new Pojo1();
-			p1.a = a;
-			p1.b = b;
-			p2 = new Pojo2();
-			p2.a2 = a1;
-			p2.b2 = b1;
-			this.i0 = i0;
-		}
-	}
-
-	public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env) {
-		List<PojoWithMultiplePojos> data = new ArrayList<>();
-		data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1));
-		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
-		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
-		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
-		data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
-		data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
-		return env.fromCollection(data);
-	}
-
-	public enum Category {
-		CAT_A, CAT_B
-	}
-
-	public static class PojoWithDateAndEnum {
-		public String group;
-		public Date date;
-		public Category cat;
-	}
-	
-	public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) {
-		List<PojoWithDateAndEnum> data = new ArrayList<>();
-		
-		PojoWithDateAndEnum one = new PojoWithDateAndEnum();
-		one.group = "a"; one.date = new Date(666); one.cat = Category.CAT_A;
-		data.add(one);
-		
-		PojoWithDateAndEnum two = new PojoWithDateAndEnum();
-		two.group = "a"; two.date = new Date(666); two.cat = Category.CAT_A;
-		data.add(two);
-		
-		PojoWithDateAndEnum three = new PojoWithDateAndEnum();
-		three.group = "b"; three.date = new Date(666); three.cat = Category.CAT_B;
-		data.add(three);
-		
-		return env.fromCollection(data);
-	}
-
-	public static class PojoWithCollection {
-		public List<Pojo1> pojos;
-		public int key;
-		public java.sql.Date sqlDate;
-		public BigInteger bigInt;
-		public BigDecimal bigDecimalKeepItNull;
-		public BigInt scalaBigInt;
-		public List<Object> mixed;
-
-		@Override
-		public String toString() {
-			return "PojoWithCollection{" +
-					"pojos.size()=" + pojos.size() +
-					", key=" + key +
-					", sqlDate=" + sqlDate +
-					", bigInt=" + bigInt +
-					", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
-					", scalaBigInt=" + scalaBigInt +
-					", mixed=" + mixed +
-					'}';
-		}
-	}
-
-	public static class PojoWithCollectionGeneric {
-		public List<Pojo1> pojos;
-		public int key;
-		public java.sql.Date sqlDate;
-		public BigInteger bigInt;
-		public BigDecimal bigDecimalKeepItNull;
-		public BigInt scalaBigInt;
-		public List<Object> mixed;
-		private PojoWithDateAndEnum makeMeGeneric;
-
-		@Override
-		public String toString() {
-			return "PojoWithCollection{" +
-					"pojos.size()=" + pojos.size() +
-					", key=" + key +
-					", sqlDate=" + sqlDate +
-					", bigInt=" + bigInt +
-					", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
-					", scalaBigInt=" + scalaBigInt +
-					", mixed=" + mixed +
-					'}';
-		}
-	}
-
-	public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env) {
-		List<PojoWithCollection> data = new ArrayList<>();
-
-		List<Pojo1> pojosList1 = new ArrayList<>();
-		pojosList1.add(new Pojo1("a", "aa"));
-		pojosList1.add(new Pojo1("b", "bb"));
-
-		List<Pojo1> pojosList2 = new ArrayList<>();
-		pojosList2.add(new Pojo1("a2", "aa2"));
-		pojosList2.add(new Pojo1("b2", "bb2"));
-
-		PojoWithCollection pwc1 = new PojoWithCollection();
-		pwc1.pojos = pojosList1;
-		pwc1.key = 0;
-		pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
-		pwc1.scalaBigInt = BigInt.int2bigInt(10);
-		pwc1.bigDecimalKeepItNull = null;
-		
-		// use calendar to make it stable across time zones
-		GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18);
-		pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
-		pwc1.mixed = new ArrayList<>();
-		Map<String, Integer> map = new HashMap<>();
-		map.put("someKey", 1); // map.put("anotherKey", 2); map.put("third", 3);
-		pwc1.mixed.add(map);
-		pwc1.mixed.add(new File("/this/is/wrong"));
-		pwc1.mixed.add("uhlala");
-
-		PojoWithCollection pwc2 = new PojoWithCollection();
-		pwc2.pojos = pojosList2;
-		pwc2.key = 0;
-		pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
-		pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
-		pwc2.bigDecimalKeepItNull = null;
-		
-		GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
-		pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976
-
-
-		data.add(pwc1);
-		data.add(pwc2);
-
-		return env.fromCollection(data);
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
deleted file mode 100644
index 04a7bc5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
+++ /dev/null
@@ -1,730 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.javaApiOperators.util;
-
-import java.io.File;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.StringValue;
-import org.apache.hadoop.io.IntWritable;
-
-import scala.math.BigInt;
-
-/**
- * #######################################################################################################
- * 
- * 			BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. 
- * 			IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING!
- * 
- * #######################################################################################################
- */
-public class ValueCollectionDataSets {
-
-	public static DataSet<Tuple3<IntValue, LongValue, StringValue>> get3TupleDataSet(ExecutionEnvironment env) {
-		List<Tuple3<IntValue, LongValue, StringValue>> data = new ArrayList<>();
-
-		data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new StringValue("Hi")));
-		data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new StringValue("Hello")));
-		data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new StringValue("Hello world")));
-		data.add(new Tuple3<>(new IntValue(4), new LongValue(3l), new StringValue("Hello world, how are you?")));
-		data.add(new Tuple3<>(new IntValue(5), new LongValue(3l), new StringValue("I am fine.")));
-		data.add(new Tuple3<>(new IntValue(6), new LongValue(3l), new StringValue("Luke Skywalker")));
-		data.add(new Tuple3<>(new IntValue(7), new LongValue(4l), new StringValue("Comment#1")));
-		data.add(new Tuple3<>(new IntValue(8), new LongValue(4l), new StringValue("Comment#2")));
-		data.add(new Tuple3<>(new IntValue(9), new LongValue(4l), new StringValue("Comment#3")));
-		data.add(new Tuple3<>(new IntValue(10), new LongValue(4l), new StringValue("Comment#4")));
-		data.add(new Tuple3<>(new IntValue(11), new LongValue(5l), new StringValue("Comment#5")));
-		data.add(new Tuple3<>(new IntValue(12), new LongValue(5l), new StringValue("Comment#6")));
-		data.add(new Tuple3<>(new IntValue(13), new LongValue(5l), new StringValue("Comment#7")));
-		data.add(new Tuple3<>(new IntValue(14), new LongValue(5l), new StringValue("Comment#8")));
-		data.add(new Tuple3<>(new IntValue(15), new LongValue(5l), new StringValue("Comment#9")));
-		data.add(new Tuple3<>(new IntValue(16), new LongValue(6l), new StringValue("Comment#10")));
-		data.add(new Tuple3<>(new IntValue(17), new LongValue(6l), new StringValue("Comment#11")));
-		data.add(new Tuple3<>(new IntValue(18), new LongValue(6l), new StringValue("Comment#12")));
-		data.add(new Tuple3<>(new IntValue(19), new LongValue(6l), new StringValue("Comment#13")));
-		data.add(new Tuple3<>(new IntValue(20), new LongValue(6l), new StringValue("Comment#14")));
-		data.add(new Tuple3<>(new IntValue(21), new LongValue(6l), new StringValue("Comment#15")));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<Tuple3<IntValue, LongValue, StringValue>> getSmall3TupleDataSet(ExecutionEnvironment env) {
-		List<Tuple3<IntValue, LongValue, StringValue>> data = new ArrayList<>();
-
-		data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new StringValue("Hi")));
-		data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new StringValue("Hello")));
-		data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new StringValue("Hello world")));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> get5TupleDataSet(ExecutionEnvironment env) {
-		List<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> data = new ArrayList<>();
-
-		data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new IntValue(0), new StringValue("Hallo"), new LongValue(1l)));
-		data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l)));
-		data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l)));
-		data.add(new Tuple5<>(new IntValue(3), new LongValue(4l), new IntValue(3), new StringValue("Hallo Welt wie gehts?"), new LongValue(2l)));
-		data.add(new Tuple5<>(new IntValue(3), new LongValue(5l), new IntValue(4), new StringValue("ABC"), new LongValue(2l)));
-		data.add(new Tuple5<>(new IntValue(3), new LongValue(6l), new IntValue(5), new StringValue("BCD"), new LongValue(3l)));
-		data.add(new Tuple5<>(new IntValue(4), new LongValue(7l), new IntValue(6), new StringValue("CDE"), new LongValue(2l)));
-		data.add(new Tuple5<>(new IntValue(4), new LongValue(8l), new IntValue(7), new StringValue("DEF"), new LongValue(1l)));
-		data.add(new Tuple5<>(new IntValue(4), new LongValue(9l), new IntValue(8), new StringValue("EFG"), new LongValue(1l)));
-		data.add(new Tuple5<>(new IntValue(4), new LongValue(10l), new IntValue(9), new StringValue("FGH"), new LongValue(2l)));
-		data.add(new Tuple5<>(new IntValue(5), new LongValue(11l), new IntValue(10), new StringValue("GHI"), new LongValue(1l)));
-		data.add(new Tuple5<>(new IntValue(5), new LongValue(12l), new IntValue(11), new StringValue("HIJ"), new LongValue(3l)));
-		data.add(new Tuple5<>(new IntValue(5), new LongValue(13l), new IntValue(12), new StringValue("IJK"), new LongValue(3l)));
-		data.add(new Tuple5<>(new IntValue(5), new LongValue(14l), new IntValue(13), new StringValue("JKL"), new LongValue(2l)));
-		data.add(new Tuple5<>(new IntValue(5), new LongValue(15l), new IntValue(14), new StringValue("KLM"), new LongValue(2l)));
-
-		Collections.shuffle(data);
-
-		TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> type = new
-			TupleTypeInfo<>(
-				ValueTypeInfo.INT_VALUE_TYPE_INFO,
-				ValueTypeInfo.LONG_VALUE_TYPE_INFO,
-				ValueTypeInfo.INT_VALUE_TYPE_INFO,
-				ValueTypeInfo.STRING_VALUE_TYPE_INFO,
-				ValueTypeInfo.LONG_VALUE_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-
-	public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> getSmall5TupleDataSet(ExecutionEnvironment env) {
-		List<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> data = new ArrayList<>();
-
-		data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new IntValue(0), new StringValue("Hallo"), new LongValue(1l)));
-		data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l)));
-		data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l)));
-
-		Collections.shuffle(data);
-
-		TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> type = new
-			TupleTypeInfo<>(
-				ValueTypeInfo.INT_VALUE_TYPE_INFO,
-				ValueTypeInfo.LONG_VALUE_TYPE_INFO,
-				ValueTypeInfo.INT_VALUE_TYPE_INFO,
-				ValueTypeInfo.STRING_VALUE_TYPE_INFO,
-				ValueTypeInfo.LONG_VALUE_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-
-	public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> getSmallNestedTupleDataSet(ExecutionEnvironment env) {
-		List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = new ArrayList<>();
-
-		data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(1)), new StringValue("one")));
-		data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(2)), new StringValue("two")));
-		data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(3)), new StringValue("three")));
-
-		TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> type = new
-			TupleTypeInfo<>(
-				new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO),
-				ValueTypeInfo.STRING_VALUE_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-
-	public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) {
-		List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = new ArrayList<>();
-
-		data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(3)), new StringValue("a")));
-		data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(2)), new StringValue("a")));
-		data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(1)), new StringValue("a")));
-		data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(2)), new StringValue("b")));
-		data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(3)), new StringValue("c")));
-		data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(6)), new StringValue("c")));
-		data.add(new Tuple2<>(new Tuple2<>(new IntValue(4), new IntValue(9)), new StringValue("c")));
-
-		TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> type = new
-			TupleTypeInfo<>(
-				new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO),
-				ValueTypeInfo.STRING_VALUE_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-
-	public static DataSet<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) {
-		List<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> data = new ArrayList<>();
-
-		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(1), new IntValue(3)), new StringValue("a"), new IntValue(2)));
-		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(1), new IntValue(2)), new StringValue("a"), new IntValue(1)));
-		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(2), new IntValue(1)), new StringValue("a"), new IntValue(3)));
-		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(2), new IntValue(2)), new StringValue("b"), new IntValue(4)));
-		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(3), new IntValue(3)), new StringValue("c"), new IntValue(5)));
-		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(3), new IntValue(6)), new StringValue("c"), new IntValue(6)));
-		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(4), new IntValue(9)), new StringValue("c"), new IntValue(7)));
-
-		TupleTypeInfo<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> type = new
-			TupleTypeInfo<>(
-				new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO),
-				ValueTypeInfo.STRING_VALUE_TYPE_INFO,
-				ValueTypeInfo.INT_VALUE_TYPE_INFO
-		);
-
-		return env.fromCollection(data, type);
-	}
-	
-	public static DataSet<StringValue> getStringDataSet(ExecutionEnvironment env) {
-		List<StringValue> data = new ArrayList<>();
-
-		data.add(new StringValue("Hi"));
-		data.add(new StringValue("Hello"));
-		data.add(new StringValue("Hello world"));
-		data.add(new StringValue("Hello world, how are you?"));
-		data.add(new StringValue("I am fine."));
-		data.add(new StringValue("Luke Skywalker"));
-		data.add(new StringValue("Random comment"));
-		data.add(new StringValue("LOL"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<IntValue> getIntDataSet(ExecutionEnvironment env) {
-		List<IntValue> data = new ArrayList<>();
-
-		data.add(new IntValue(1));
-		data.add(new IntValue(2));
-		data.add(new IntValue(2));
-		data.add(new IntValue(3));
-		data.add(new IntValue(3));
-		data.add(new IntValue(3));
-		data.add(new IntValue(4));
-		data.add(new IntValue(4));
-		data.add(new IntValue(4));
-		data.add(new IntValue(4));
-		data.add(new IntValue(5));
-		data.add(new IntValue(5));
-		data.add(new IntValue(5));
-		data.add(new IntValue(5));
-		data.add(new IntValue(5));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env) {
-		List<CustomType> data = new ArrayList<CustomType>();
-
-		data.add(new CustomType(1, 0l, "Hi"));
-		data.add(new CustomType(2, 1l, "Hello"));
-		data.add(new CustomType(2, 2l, "Hello world"));
-		data.add(new CustomType(3, 3l, "Hello world, how are you?"));
-		data.add(new CustomType(3, 4l, "I am fine."));
-		data.add(new CustomType(3, 5l, "Luke Skywalker"));
-		data.add(new CustomType(4, 6l, "Comment#1"));
-		data.add(new CustomType(4, 7l, "Comment#2"));
-		data.add(new CustomType(4, 8l, "Comment#3"));
-		data.add(new CustomType(4, 9l, "Comment#4"));
-		data.add(new CustomType(5, 10l, "Comment#5"));
-		data.add(new CustomType(5, 11l, "Comment#6"));
-		data.add(new CustomType(5, 12l, "Comment#7"));
-		data.add(new CustomType(5, 13l, "Comment#8"));
-		data.add(new CustomType(5, 14l, "Comment#9"));
-		data.add(new CustomType(6, 15l, "Comment#10"));
-		data.add(new CustomType(6, 16l, "Comment#11"));
-		data.add(new CustomType(6, 17l, "Comment#12"));
-		data.add(new CustomType(6, 18l, "Comment#13"));
-		data.add(new CustomType(6, 19l, "Comment#14"));
-		data.add(new CustomType(6, 20l, "Comment#15"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env) {
-		List<CustomType> data = new ArrayList<CustomType>();
-
-		data.add(new CustomType(1, 0l, "Hi"));
-		data.add(new CustomType(2, 1l, "Hello"));
-		data.add(new CustomType(2, 2l, "Hello world"));
-
-		Collections.shuffle(data);
-
-		return env.fromCollection(data);
-	}
-
-	public static class CustomType implements Serializable {
-
-		private static final long serialVersionUID = 1L;
-
-		public IntValue myInt;
-		public LongValue myLong;
-		public StringValue myString;
-
-		public CustomType() {
-		}
-
-		public CustomType(int i, long l, String s) {
-			myInt = new IntValue(i);
-			myLong = new LongValue(l);
-			myString = new StringValue(s);
-		}
-
-		@Override
-		public String toString() {
-			return myInt + "," + myLong + "," + myString;
-		}
-	}
-
-	public static class CustomTypeComparator implements Comparator<CustomType> {
-
-		@Override
-		public int compare(CustomType o1, CustomType o2) {
-			int diff = o1.myInt.getValue() - o2.myInt.getValue();
-			if (diff != 0) {
-				return diff;
-			}
-			diff = (int) (o1.myLong.getValue() - o2.myLong.getValue());
-			return diff != 0 ? diff : o1.myString.getValue().compareTo(o2.myString.getValue());
-		}
-
-	}
-
-	public static DataSet<Tuple7<IntValue, StringValue, IntValue, IntValue, LongValue, StringValue, LongValue>> getSmallTuplebasedDataSet(ExecutionEnvironment env) {
-		List<Tuple7<IntValue, StringValue, IntValue, IntValue, LongValue, StringValue, LongValue>> data = new ArrayList<>();
-		
-		data.add(new Tuple7<>(new IntValue(1), new StringValue("First"), new IntValue(10), new IntValue(100), new LongValue(1000L), new StringValue("One"), new LongValue(10000L)));
-		data.add(new Tuple7<>(new IntValue(2), new StringValue("Second"), new IntValue(20), new IntValue(200), new LongValue(2000L), new StringValue("Two"), new LongValue(20000L)));
-		data.add(new Tuple7<>(new IntValue(3), new StringValue("Third"), new IntValue(30), new IntValue(300), new LongValue(3000L), new StringValue("Three"), new LongValue(30000L)));
-
-		return env.fromCollection(data);
-	}
-	
-	public static DataSet<Tuple7<LongValue, IntValue, IntValue, LongValue, StringValue, IntValue, StringValue>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) {
-		List<Tuple7<LongValue, IntValue, IntValue, LongValue, StringValue, IntValue, StringValue>> data = new ArrayList<>();
-		
-		data.add(new Tuple7<>(new LongValue(10000L), new IntValue(10), new IntValue(100), new LongValue(1000L), new StringValue("One"), new IntValue(1), new StringValue("First")));
-		data.add(new Tuple7<>(new LongValue(20000L), new IntValue(20), new IntValue(200), new LongValue(2000L), new StringValue("Two"), new IntValue(2), new StringValue("Second")));
-		data.add(new Tuple7<>(new LongValue(30000L), new IntValue(30), new IntValue(300), new LongValue(3000L), new StringValue("Three"), new IntValue(3), new StringValue("Third")));
-		
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env) {
-		List<POJO> data = new ArrayList<POJO>();
-
-		data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L /*nestedPojo.longNumber*/));
-		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
-		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
-
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env) {
-		List<POJO> data = new ArrayList<POJO>();
-
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); // 5x
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
-		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
-		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); // 2x
-		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
-
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env) {
-		List<POJO> data = new ArrayList<POJO>();
-
-		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); // 5x
-		data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L));
-		data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L));
-		data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L));
-		data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L));
-		data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L));
-		data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 10200L)); // 2x
-		data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 10100L));
-
-		return env.fromCollection(data);
-	}
-
-	public static class POJO {
-		public IntValue number;
-		public StringValue str;
-		public Tuple2<IntValue, CustomType> nestedTupleWithCustom;
-		public NestedPojo nestedPojo;
-		public transient LongValue ignoreMe;
-
-		public POJO(int i0, String s0,
-					int i1, int i2, long l0, String s1,
-					long l1) {
-			this.number = new IntValue(i0);
-			this.str = new StringValue(s0);
-			this.nestedTupleWithCustom = new Tuple2<>(new IntValue(i1), new CustomType(i2, l0, s1));
-			this.nestedPojo = new NestedPojo();
-			this.nestedPojo.longNumber = new LongValue(l1);
-		}
-
-		public POJO() {
-		}
-
-		@Override
-		public String toString() {
-			return number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber;
-		}
-	}
-
-	public static class NestedPojo {
-		public static Object ignoreMe;
-		public LongValue longNumber;
-
-		public NestedPojo() {
-		}
-	}
-
-	public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env) {
-		List<CrazyNested> data = new ArrayList<CrazyNested>();
-
-		data.add(new CrazyNested("aa"));
-		data.add(new CrazyNested("bb"));
-		data.add(new CrazyNested("bb"));
-		data.add(new CrazyNested("cc"));
-		data.add(new CrazyNested("cc"));
-		data.add(new CrazyNested("cc"));
-
-		return env.fromCollection(data);
-	}
-
-	public static class CrazyNested {
-		public CrazyNestedL1 nest_Lvl1;
-		public LongValue something; // test proper null-value handling
-
-		public CrazyNested() {
-		}
-
-		public CrazyNested(String set, String second, long s) { // additional CTor to set all fields to non-null values
-			this(set);
-			something = new LongValue(s);
-			nest_Lvl1.a = new StringValue(second);
-		}
-
-		public CrazyNested(String set) {
-			nest_Lvl1 = new CrazyNestedL1();
-			nest_Lvl1.nest_Lvl2 = new CrazyNestedL2();
-			nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3();
-			nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new CrazyNestedL4();
-			nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = new StringValue(set);
-		}
-	}
-
-	public static class CrazyNestedL1 {
-		public StringValue a;
-		public IntValue b;
-		public CrazyNestedL2 nest_Lvl2;
-	}
-
-	public static class CrazyNestedL2 {
-		public CrazyNestedL3 nest_Lvl3;
-	}
-
-	public static class CrazyNestedL3 {
-		public CrazyNestedL4 nest_Lvl4;
-	}
-
-	public static class CrazyNestedL4 {
-		public StringValue f1nal;
-	}
-
-	// Copied from TypeExtractorTest
-	public static class FromTuple extends Tuple3<StringValue, StringValue, LongValue> {
-		private static final long serialVersionUID = 1L;
-		public IntValue special;
-	}
-
-	public static class FromTupleWithCTor extends FromTuple {
-
-		private static final long serialVersionUID = 1L;
-
-		public FromTupleWithCTor() {}
-
-		public FromTupleWithCTor(int special, long tupleField) {
-			this.special = new IntValue(special);
-			this.setField(new LongValue(tupleField), 2);
-		}
-	}
-
-	public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env) {
-		List<FromTupleWithCTor> data = new ArrayList<>();
-		data.add(new FromTupleWithCTor(1, 10L)); // 3x
-		data.add(new FromTupleWithCTor(1, 10L));
-		data.add(new FromTupleWithCTor(1, 10L));
-		data.add(new FromTupleWithCTor(2, 20L)); // 2x
-		data.add(new FromTupleWithCTor(2, 20L));
-		return env.fromCollection(data);
-	}
-
-	public static class PojoContainingTupleAndWritable {
-		public IntValue someInt;
-		public StringValue someString;
-		public IntWritable hadoopFan;
-		public Tuple2<LongValue, LongValue> theTuple;
-
-		public PojoContainingTupleAndWritable() {
-		}
-
-		public PojoContainingTupleAndWritable(int i, long l1, long l2) {
-			hadoopFan = new IntWritable(i);
-			someInt = new IntValue(i);
-			theTuple = new Tuple2<>(new LongValue(l1), new LongValue(l2));
-		}
-	}
-
-	public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env) {
-		List<PojoContainingTupleAndWritable> data = new ArrayList<>();
-		data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
-		return env.fromCollection(data);
-	}
-
-
-
-	public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) {
-		List<PojoContainingTupleAndWritable> data = new ArrayList<>();
-		data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
-		data.add(new PojoContainingTupleAndWritable(2, 20L, 201L));
-		data.add(new PojoContainingTupleAndWritable(2, 30L, 200L));
-		data.add(new PojoContainingTupleAndWritable(2, 30L, 600L));
-		data.add(new PojoContainingTupleAndWritable(2, 30L, 400L));
-		return env.fromCollection(data);
-	}
-
-	public static DataSet<Tuple3<IntValue, CrazyNested, POJO>> getTupleContainingPojos(ExecutionEnvironment env) {
-		List<Tuple3<IntValue, CrazyNested, POJO>> data = new ArrayList<>();
-		data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 3x
-		data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
-		data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
-		// POJO is not initialized according to the first two fields.
-		data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(2), new CrazyNested("two", "duo", 2L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 1x
-		return env.fromCollection(data);
-	}
-
-	public static class Pojo1 {
-		public StringValue a;
-		public StringValue b;
-
-		public Pojo1() {}
-
-		public Pojo1(String a, String b) {
-			this.a = new StringValue(a);
-			this.b = new StringValue(b);
-		}
-	}
-
-	public static class Pojo2 {
-		public StringValue a2;
-		public StringValue b2;
-	}
-
-	public static class PojoWithMultiplePojos {
-		public Pojo1 p1;
-		public Pojo2 p2;
-		public IntValue i0;
-
-		public PojoWithMultiplePojos() {
-		}
-
-		public PojoWithMultiplePojos(String a, String b, String a1, String b1, int i0) {
-			p1 = new Pojo1();
-			p1.a = new StringValue(a);
-			p1.b = new StringValue(b);
-			p2 = new Pojo2();
-			p2.a2 = new StringValue(a1);
-			p2.b2 = new StringValue(b1);
-			this.i0 = new IntValue(i0);
-		}
-	}
-
-	public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env) {
-		List<PojoWithMultiplePojos> data = new ArrayList<>();
-		data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1));
-		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
-		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
-		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
-		data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
-		data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
-		return env.fromCollection(data);
-	}
-
-	public enum Category {
-		CAT_A, CAT_B;
-	}
-
-	public static class PojoWithDateAndEnum {
-		public StringValue group;
-		public Date date;
-		public Category cat;
-	}
-	
-	public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) {
-		List<PojoWithDateAndEnum> data = new ArrayList<PojoWithDateAndEnum>();
-		
-		PojoWithDateAndEnum one = new PojoWithDateAndEnum();
-		one.group = new StringValue("a");
-		one.date = new Date(666);
-		one.cat = Category.CAT_A;
-		data.add(one);
-		
-		PojoWithDateAndEnum two = new PojoWithDateAndEnum();
-		two.group = new StringValue("a");
-		two.date = new Date(666);
-		two.cat = Category.CAT_A;
-		data.add(two);
-		
-		PojoWithDateAndEnum three = new PojoWithDateAndEnum();
-		three.group = new StringValue("b");
-		three.date = new Date(666);
-		three.cat = Category.CAT_B;
-		data.add(three);
-		
-		return env.fromCollection(data);
-	}
-
-	public static class PojoWithCollection {
-		public List<Pojo1> pojos;
-		public IntValue key;
-		public java.sql.Date sqlDate;
-		public BigInteger bigInt;
-		public BigDecimal bigDecimalKeepItNull;
-		public BigInt scalaBigInt;
-		public List<Object> mixed;
-
-		@Override
-		public String toString() {
-			return "PojoWithCollection{" +
-					"pojos.size()=" + pojos.size() +
-					", key=" + key +
-					", sqlDate=" + sqlDate +
-					", bigInt=" + bigInt +
-					", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
-					", scalaBigInt=" + scalaBigInt +
-					", mixed=" + mixed +
-					'}';
-		}
-	}
-
-	public static class PojoWithCollectionGeneric {
-		public List<Pojo1> pojos;
-		public IntValue key;
-		public java.sql.Date sqlDate;
-		public BigInteger bigInt;
-		public BigDecimal bigDecimalKeepItNull;
-		public BigInt scalaBigInt;
-		public List<Object> mixed;
-		private PojoWithDateAndEnum makeMeGeneric;
-
-		@Override
-		public String toString() {
-			return "PojoWithCollection{" +
-					"pojos.size()=" + pojos.size() +
-					", key=" + key +
-					", sqlDate=" + sqlDate +
-					", bigInt=" + bigInt +
-					", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
-					", scalaBigInt=" + scalaBigInt +
-					", mixed=" + mixed +
-					'}';
-		}
-	}
-
-	public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env) {
-		List<PojoWithCollection> data = new ArrayList<>();
-
-		List<Pojo1> pojosList1 = new ArrayList<>();
-		pojosList1.add(new Pojo1("a", "aa"));
-		pojosList1.add(new Pojo1("b", "bb"));
-
-		List<Pojo1> pojosList2 = new ArrayList<>();
-		pojosList2.add(new Pojo1("a2", "aa2"));
-		pojosList2.add(new Pojo1("b2", "bb2"));
-
-		PojoWithCollection pwc1 = new PojoWithCollection();
-		pwc1.pojos = pojosList1;
-		pwc1.key = new IntValue(0);
-		pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
-		pwc1.scalaBigInt = BigInt.int2bigInt(10);
-		pwc1.bigDecimalKeepItNull = null;
-		
-		// use calendar to make it stable across time zones
-		GregorianCalendar gcl1 = new GregorianCalendar(2033, 04, 18);
-		pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
-		pwc1.mixed = new ArrayList<Object>();
-		Map<StringValue, IntValue> map = new HashMap<>();
-		map.put(new StringValue("someKey"), new IntValue(1));
-		pwc1.mixed.add(map);
-		pwc1.mixed.add(new File("/this/is/wrong"));
-		pwc1.mixed.add("uhlala");
-
-		PojoWithCollection pwc2 = new PojoWithCollection();
-		pwc2.pojos = pojosList2;
-		pwc2.key = new IntValue(0);
-		pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
-		pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
-		pwc2.bigDecimalKeepItNull = null;
-		
-		GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
-		pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976
-
-		data.add(pwc1);
-		data.add(pwc2);
-
-		return env.fromCollection(data);
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
index aabe7c0..6413a3b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckForbiddenMethodsUsage.java
@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.manual;
 
 import org.apache.flink.types.parser.FieldParserTest;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import org.reflections.Reflections;
 import org.reflections.scanners.MemberUsageScanner;
 import org.reflections.util.ClasspathHelper;
@@ -40,7 +40,7 @@ import static org.junit.Assert.assertEquals;
 
 /**
  * Tests via reflection that certain methods are not called in Flink.
- * 
+ *
  * <p>Forbidden calls include:
  *   - Byte / String conversions that do not specify an explicit charset
  *     because they produce different results in different locales
@@ -116,11 +116,10 @@ public class CheckForbiddenMethodsUsage {
 			.addUrls(ClasspathHelper.forPackage("org.apache.flink"))
 			.addScanners(new MemberUsageScanner()));
 
-
 		for (ForbiddenCall forbiddenCall : forbiddenCalls) {
 			final Set<Member> methodUsages = forbiddenCall.getUsages(reflections);
 			methodUsages.removeAll(forbiddenCall.getExclusions());
-			assertEquals("Unexpected calls: " + methodUsages,0, methodUsages.size());
+			assertEquals("Unexpected calls: " + methodUsages, 0, methodUsages.size());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
index 0692196..f02cf1c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
@@ -40,23 +40,26 @@ import java.util.List;
 
 import static org.junit.Assert.fail;
 
+/**
+ * Manual test for growing hash tables.
+ */
 public class HashTableRecordWidthCombinations {
 
 	public static void main(String[] args) throws Exception {
 
 		@SuppressWarnings("unchecked")
-		final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer = 
+		final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer =
 				new TupleSerializer<Tuple2<Long, byte[]>>(
 						(Class<Tuple2<Long, byte[]>>) (Class<?>) Tuple2.class,
 						new TypeSerializer<?>[] { LongSerializer.INSTANCE, BytePrimitiveArraySerializer.INSTANCE });
-		
+
 		final TypeSerializer<Long> probeSerializer = LongSerializer.INSTANCE;
 
 		final TypeComparator<Tuple2<Long, byte[]>> buildComparator = new TupleComparator<Tuple2<Long, byte[]>>(
 				new int[] {0},
 				new TypeComparator<?>[] { new LongComparator(true) },
 				new TypeSerializer<?>[] { LongSerializer.INSTANCE });
-		
+
 		final TypeComparator<Long> probeComparator = new LongComparator(true);
 
 		final TypePairComparator<Long, Tuple2<Long, byte[]>> pairComparator = new TypePairComparator<Long, Tuple2<Long, byte[]>>() {
@@ -85,7 +88,7 @@ public class HashTableRecordWidthCombinations {
 		final IOManager ioMan = new IOManagerAsync();
 
 		try {
-			final int pageSize = 32*1024;
+			final int pageSize = 32 * 1024;
 			final int numSegments = 34;
 
 			for (int num = 3400; num < 3550; num++) {
@@ -151,7 +154,7 @@ public class HashTableRecordWidthCombinations {
 					try {
 						while (table.nextRecord()) {
 							MutableObjectIterator<Tuple2<Long, byte[]>> matches = table.getBuildSideIterator();
-							while (matches.next() != null);
+							while (matches.next() != null) {}
 						}
 					}
 					catch (RuntimeException e) {
@@ -176,11 +179,11 @@ public class HashTableRecordWidthCombinations {
 			ioMan.shutdown();
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
 		ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
 		for (int i = 0; i < numSegments; i++) {
@@ -188,7 +191,7 @@ public class HashTableRecordWidthCombinations {
 		}
 		return list;
 	}
-	
+
 	private static void checkNoTempFilesRemain(IOManager ioManager) {
 		for (File dir : ioManager.getSpillingDirectories()) {
 			for (String file : dir.list()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
index 9821b05..c69e6fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.test.manual;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Random;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -41,13 +33,24 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.Assert;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * Test {@link UnilateralSortMerger} on a large set of {@code String}.
+ */
 public class MassiveStringSorting {
 
 	private static final long SEED = 347569784659278346L;
-	
-	
+
 	public void testStringSorting() {
 		File input = null;
 		File sorted = null;
@@ -55,12 +58,12 @@ public class MassiveStringSorting {
 		try {
 			// the source file
 			input = generateFileWithStrings(300000, "http://some-uri.com/that/is/a/common/prefix/to/all");
-			
+
 			// the sorted file
 			sorted = File.createTempFile("sorted_strings", "txt");
-			
-			String[] command = {"/bin/bash","-c","export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""};
-			
+
+			String[] command = {"/bin/bash", "-c", "export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""};
+
 			Process p = null;
 			try {
 				p = Runtime.getRuntime().exec(command);
@@ -74,37 +77,37 @@ public class MassiveStringSorting {
 					p.destroy();
 				}
 			}
-			
+
 			// sort the data
 			UnilateralSortMerger<String> sorter = null;
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
-			
+
 			try {
 				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
 				IOManager ioMan = new IOManagerAsync();
-					
+
 				TypeSerializer<String> serializer = StringSerializer.INSTANCE;
 				TypeComparator<String> comparator = new StringComparator(true);
-				
+
 				reader = new BufferedReader(new FileReader(input));
 				MutableObjectIterator<String> inputIterator = new StringReaderMutableObjectIterator(reader);
-				
+
 				sorter = new UnilateralSortMerger<String>(mm, ioMan, inputIterator, new DummyInvokable(),
 						new RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f,
 						true /* use large record handler */, false);
 
 				MutableObjectIterator<String> sortedData = sorter.getIterator();
-				
+
 				reader.close();
-				
+
 				// verify
 				verifyReader = new BufferedReader(new FileReader(sorted));
 				String next;
-				
+
 				while ((next = verifyReader.readLine()) != null) {
 					String nextFromStratoSort = sortedData.next("");
-					
+
 					Assert.assertNotNull(nextFromStratoSort);
 					Assert.assertEquals(next, nextFromStratoSort);
 				}
@@ -135,23 +138,23 @@ public class MassiveStringSorting {
 			}
 		}
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	public void testStringTuplesSorting() {
-		final int NUM_STRINGS = 300000;
-		
+		final int numStrings = 300000;
+
 		File input = null;
 		File sorted = null;
 
 		try {
 			// the source file
-			input = generateFileWithStringTuples(NUM_STRINGS, "http://some-uri.com/that/is/a/common/prefix/to/all");
-			
+			input = generateFileWithStringTuples(numStrings, "http://some-uri.com/that/is/a/common/prefix/to/all");
+
 			// the sorted file
 			sorted = File.createTempFile("sorted_strings", "txt");
-			
-			String[] command = {"/bin/bash","-c","export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""};
-			
+
+			String[] command = {"/bin/bash", "-c", "export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""};
+
 			Process p = null;
 			try {
 				p = Runtime.getRuntime().exec(command);
@@ -165,33 +168,31 @@ public class MassiveStringSorting {
 					p.destroy();
 				}
 			}
-			
+
 			// sort the data
 			UnilateralSortMerger<Tuple2<String, String[]>> sorter = null;
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
-			
+
 			try {
 				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
 				IOManager ioMan = new IOManagerAsync();
-					
-				TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>) 
+
+				TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>)
 						TypeInfoParser.<Tuple2<String, String[]>>parse("Tuple2<String, String[]>");
 
 				TypeSerializer<Tuple2<String, String[]>> serializer = typeInfo.createSerializer(new ExecutionConfig());
 				TypeComparator<Tuple2<String, String[]>> comparator = typeInfo.createComparator(new int[] { 0 }, new boolean[] { true }, 0, new ExecutionConfig());
-				
+
 				reader = new BufferedReader(new FileReader(input));
 				MutableObjectIterator<Tuple2<String, String[]>> inputIterator = new StringTupleReaderMutableObjectIterator(reader);
-				
+
 				sorter = new UnilateralSortMerger<Tuple2<String, String[]>>(mm, ioMan, inputIterator, new DummyInvokable(),
 						new RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f,
 						true /* use large record handler */, false);
 
-				
-				
 				// use this part to verify that all if good when sorting in memory
-				
+
 //				List<MemorySegment> memory = mm.allocatePages(new DummyInvokable(), mm.computeNumberOfPages(1024*1024*1024));
 //				NormalizedKeySorter<Tuple2<String, String[]>> nks = new NormalizedKeySorter<Tuple2<String,String[]>>(serializer, comparator, memory);
 //
@@ -200,36 +201,36 @@ public class MassiveStringSorting {
 //					while ((wi = inputIterator.next(wi)) != null) {
 //						Assert.assertTrue(nks.write(wi));
 //					}
-//					
+//
 //					new QuickSort().sort(nks);
 //				}
-//				
+//
 //				MutableObjectIterator<Tuple2<String, String[]>> sortedData = nks.getIterator();
-				
+
 				MutableObjectIterator<Tuple2<String, String[]>> sortedData = sorter.getIterator();
 				reader.close();
-				
+
 				// verify
 				verifyReader = new BufferedReader(new FileReader(sorted));
 				MutableObjectIterator<Tuple2<String, String[]>> verifyIterator = new StringTupleReaderMutableObjectIterator(verifyReader);
-				
+
 				Tuple2<String, String[]> next = new Tuple2<String, String[]>("", new String[0]);
 				Tuple2<String, String[]> nextFromStratoSort = new Tuple2<String, String[]>("", new String[0]);
-				
+
 				int num = 0;
-				
+
 				while ((next = verifyIterator.next(next)) != null) {
 					num++;
-					
+
 					nextFromStratoSort = sortedData.next(nextFromStratoSort);
 					Assert.assertNotNull(nextFromStratoSort);
-					
+
 					Assert.assertEquals(next.f0, nextFromStratoSort.f0);
 					Assert.assertArrayEquals(next.f1, nextFromStratoSort.f1);
 				}
-				
+
 				Assert.assertNull(sortedData.next(nextFromStratoSort));
-				Assert.assertEquals(NUM_STRINGS, num);
+				Assert.assertEquals(numStrings, num);
 
 			}
 			finally {
@@ -260,15 +261,15 @@ public class MassiveStringSorting {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	private static final class StringReaderMutableObjectIterator implements MutableObjectIterator<String> {
-		
+
 		private final BufferedReader reader;
 
 		public StringReaderMutableObjectIterator(BufferedReader reader) {
 			this.reader = reader;
 		}
-		
+
 		@Override
 		public String next(String reuse) throws IOException {
 			return reader.readLine();
@@ -279,22 +280,22 @@ public class MassiveStringSorting {
 			return reader.readLine();
 		}
 	}
-	
+
 	private static final class StringTupleReaderMutableObjectIterator implements MutableObjectIterator<Tuple2<String, String[]>> {
-		
+
 		private final BufferedReader reader;
 
 		public StringTupleReaderMutableObjectIterator(BufferedReader reader) {
 			this.reader = reader;
 		}
-		
+
 		@Override
 		public Tuple2<String, String[]> next(Tuple2<String, String[]> reuse) throws IOException {
 			String line = reader.readLine();
 			if (line == null) {
 				return null;
 			}
-			
+
 			String[] parts = line.split(" ");
 			reuse.f0 = parts[0];
 			reuse.f1 = parts;
@@ -306,31 +307,31 @@ public class MassiveStringSorting {
 			return next(new Tuple2<String, String[]>());
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	private File generateFileWithStrings(int numStrings, String prefix) throws IOException {
 		final Random rnd = new Random(SEED);
-		
+
 		final StringBuilder bld = new StringBuilder();
 		final int resetValue = prefix.length();
-		
+
 		bld.append(prefix);
-		
+
 		File f = File.createTempFile("strings", "txt");
 		BufferedWriter wrt = null;
 		try {
 			wrt = new BufferedWriter(new FileWriter(f));
-		
-			for (int i = 0 ; i < numStrings; i++) {
+
+			for (int i = 0; i < numStrings; i++) {
 				bld.setLength(resetValue);
-				
+
 				int len = rnd.nextInt(20) + 300;
 				for (int k = 0; k < len; k++) {
 					char c = (char) (rnd.nextInt(80) + 40);
 					bld.append(c);
 				}
-				
+
 				String str = bld.toString();
 				wrt.write(str);
 				wrt.newLine();
@@ -338,52 +339,52 @@ public class MassiveStringSorting {
 		} finally {
 			wrt.close();
 		}
-		
+
 		return f;
 	}
-	
+
 	private File generateFileWithStringTuples(int numStrings, String prefix) throws IOException {
 		final Random rnd = new Random(SEED);
-		
+
 		final StringBuilder bld = new StringBuilder();
 
 		File f = File.createTempFile("strings", "txt");
 		BufferedWriter wrt = null;
 		try {
 			wrt = new BufferedWriter(new FileWriter(f));
-		
-			for (int i = 0 ; i < numStrings; i++) {
+
+			for (int i = 0; i < numStrings; i++) {
 				bld.setLength(0);
-				
+
 				int numComps = rnd.nextInt(5) + 1;
-				
+
 				for (int z = 0; z < numComps; z++) {
 					if (z > 0) {
 						bld.append(' ');
 					}
 					bld.append(prefix);
-				
+
 					int len = rnd.nextInt(20) + 10;
 					for (int k = 0; k < len; k++) {
 						char c = (char) (rnd.nextInt(80) + 40);
 						bld.append(c);
 					}
 				}
-				
+
 				String str = bld.toString();
-				
+
 				wrt.write(str);
 				wrt.newLine();
 			}
 		} finally {
 			wrt.close();
 		}
-		
+
 		return f;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public static void main(String[] args) {
 		new MassiveStringSorting().testStringSorting();
 		new MassiveStringSorting().testStringTuplesSorting();

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
index 9e37b79..453aa14 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.test.manual;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Random;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -42,12 +34,24 @@ import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.Assert;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * Test {@link UnilateralSortMerger} on a large set of {@link StringValue}.
+ */
 public class MassiveStringValueSorting {
 
 	private static final long SEED = 347569784659278346L;
-	
+
 	public void testStringValueSorting() {
 		File input = null;
 		File sorted = null;
@@ -55,12 +59,12 @@ public class MassiveStringValueSorting {
 		try {
 			// the source file
 			input = generateFileWithStrings(300000, "http://some-uri.com/that/is/a/common/prefix/to/all");
-			
+
 			// the sorted file
 			sorted = File.createTempFile("sorted_strings", "txt");
-			
-			String[] command = {"/bin/bash","-c","export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""};
-			
+
+			String[] command = {"/bin/bash", "-c", "export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""};
+
 			Process p = null;
 			try {
 				p = Runtime.getRuntime().exec(command);
@@ -74,38 +78,38 @@ public class MassiveStringValueSorting {
 					p.destroy();
 				}
 			}
-			
+
 			// sort the data
 			UnilateralSortMerger<StringValue> sorter = null;
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
-			
+
 			try {
 				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
 				IOManager ioMan = new IOManagerAsync();
-					
+
 				TypeSerializer<StringValue> serializer = new CopyableValueSerializer<StringValue>(StringValue.class);
 				TypeComparator<StringValue> comparator = new CopyableValueComparator<StringValue>(true, StringValue.class);
-				
+
 				reader = new BufferedReader(new FileReader(input));
 				MutableObjectIterator<StringValue> inputIterator = new StringValueReaderMutableObjectIterator(reader);
-				
+
 				sorter = new UnilateralSortMerger<StringValue>(mm, ioMan, inputIterator, new DummyInvokable(),
 						new RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f,
 						true /* use large record handler */, true);
 
 				MutableObjectIterator<StringValue> sortedData = sorter.getIterator();
-				
+
 				reader.close();
-				
+
 				// verify
 				verifyReader = new BufferedReader(new FileReader(sorted));
 				String nextVerify;
 				StringValue nextFromFlinkSort = new StringValue();
-				
+
 				while ((nextVerify = verifyReader.readLine()) != null) {
 					nextFromFlinkSort = sortedData.next(nextFromFlinkSort);
-					
+
 					Assert.assertNotNull(nextFromFlinkSort);
 					Assert.assertEquals(nextVerify, nextFromFlinkSort.getValue());
 				}
@@ -138,23 +142,23 @@ public class MassiveStringValueSorting {
 			}
 		}
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	public void testStringValueTuplesSorting() {
-		final int NUM_STRINGS = 300000;
-		
+		final int numStrings = 300000;
+
 		File input = null;
 		File sorted = null;
 
 		try {
 			// the source file
-			input = generateFileWithStringTuples(NUM_STRINGS, "http://some-uri.com/that/is/a/common/prefix/to/all");
-			
+			input = generateFileWithStringTuples(numStrings, "http://some-uri.com/that/is/a/common/prefix/to/all");
+
 			// the sorted file
 			sorted = File.createTempFile("sorted_strings", "txt");
-			
-			String[] command = {"/bin/bash","-c","export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""};
-			
+
+			String[] command = {"/bin/bash", "-c", "export LC_ALL=\"C\" && cat \"" + input.getAbsolutePath() + "\" | sort > \"" + sorted.getAbsolutePath() + "\""};
+
 			Process p = null;
 			try {
 				p = Runtime.getRuntime().exec(command);
@@ -168,33 +172,31 @@ public class MassiveStringValueSorting {
 					p.destroy();
 				}
 			}
-			
+
 			// sort the data
 			UnilateralSortMerger<Tuple2<StringValue, StringValue[]>> sorter = null;
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
-			
+
 			try {
 				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
 				IOManager ioMan = new IOManagerAsync();
-					
+
 				TupleTypeInfo<Tuple2<StringValue, StringValue[]>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, StringValue[]>>)
 						TypeInfoParser.<Tuple2<StringValue, StringValue[]>>parse("Tuple2<org.apache.flink.types.StringValue, org.apache.flink.types.StringValue[]>");
 
 				TypeSerializer<Tuple2<StringValue, StringValue[]>> serializer = typeInfo.createSerializer(new ExecutionConfig());
 				TypeComparator<Tuple2<StringValue, StringValue[]>> comparator = typeInfo.createComparator(new int[] { 0 }, new boolean[] { true }, 0, new ExecutionConfig());
-				
+
 				reader = new BufferedReader(new FileReader(input));
 				MutableObjectIterator<Tuple2<StringValue, StringValue[]>> inputIterator = new StringValueTupleReaderMutableObjectIterator(reader);
-				
+
 				sorter = new UnilateralSortMerger<Tuple2<StringValue, StringValue[]>>(mm, ioMan, inputIterator, new DummyInvokable(),
 						new RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f,
 						true /* use large record handler */, false);
 
-				
-				
 				// use this part to verify that all if good when sorting in memory
-				
+
 //				List<MemorySegment> memory = mm.allocatePages(new DummyInvokable(), mm.computeNumberOfPages(1024*1024*1024));
 //				NormalizedKeySorter<Tuple2<String, String[]>> nks = new NormalizedKeySorter<Tuple2<String,String[]>>(serializer, comparator, memory);
 //
@@ -203,36 +205,36 @@ public class MassiveStringValueSorting {
 //					while ((wi = inputIterator.next(wi)) != null) {
 //						Assert.assertTrue(nks.write(wi));
 //					}
-//					
+//
 //					new QuickSort().sort(nks);
 //				}
-//				
+//
 //				MutableObjectIterator<Tuple2<String, String[]>> sortedData = nks.getIterator();
-				
+
 				MutableObjectIterator<Tuple2<StringValue, StringValue[]>> sortedData = sorter.getIterator();
 				reader.close();
-				
+
 				// verify
 				verifyReader = new BufferedReader(new FileReader(sorted));
 				MutableObjectIterator<Tuple2<StringValue, StringValue[]>> verifyIterator = new StringValueTupleReaderMutableObjectIterator(verifyReader);
-				
+
 				Tuple2<StringValue, StringValue[]> nextVerify = new Tuple2<StringValue, StringValue[]>(new StringValue(), new StringValue[0]);
 				Tuple2<StringValue, StringValue[]> nextFromFlinkSort = new Tuple2<StringValue, StringValue[]>(new StringValue(), new StringValue[0]);
-				
+
 				int num = 0;
-				
+
 				while ((nextVerify = verifyIterator.next(nextVerify)) != null) {
 					num++;
-					
+
 					nextFromFlinkSort = sortedData.next(nextFromFlinkSort);
 					Assert.assertNotNull(nextFromFlinkSort);
-					
+
 					Assert.assertEquals(nextVerify.f0, nextFromFlinkSort.f0);
 					Assert.assertArrayEquals(nextVerify.f1, nextFromFlinkSort.f1);
 				}
-				
+
 				Assert.assertNull(sortedData.next(nextFromFlinkSort));
-				Assert.assertEquals(NUM_STRINGS, num);
+				Assert.assertEquals(numStrings, num);
 
 			}
 			finally {
@@ -265,23 +267,23 @@ public class MassiveStringValueSorting {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	private static final class StringValueReaderMutableObjectIterator implements MutableObjectIterator<StringValue> {
-		
+
 		private final BufferedReader reader;
 
 		public StringValueReaderMutableObjectIterator(BufferedReader reader) {
 			this.reader = reader;
 		}
-		
+
 		@Override
 		public StringValue next(StringValue reuse) throws IOException {
 			String line = reader.readLine();
-			
+
 			if (line == null) {
 				return null;
 			}
-			
+
 			reuse.setValue(line);
 			return reuse;
 		}
@@ -291,30 +293,30 @@ public class MassiveStringValueSorting {
 			return next(new StringValue());
 		}
 	}
-	
+
 	private static final class StringValueTupleReaderMutableObjectIterator implements MutableObjectIterator<Tuple2<StringValue, StringValue[]>> {
-		
+
 		private final BufferedReader reader;
 
 		public StringValueTupleReaderMutableObjectIterator(BufferedReader reader) {
 			this.reader = reader;
 		}
-		
+
 		@Override
 		public Tuple2<StringValue, StringValue[]> next(Tuple2<StringValue, StringValue[]> reuse) throws IOException {
 			String line = reader.readLine();
 			if (line == null) {
 				return null;
 			}
-			
+
 			String[] parts = line.split(" ");
 			reuse.f0.setValue(parts[0]);
 			reuse.f1 = new StringValue[parts.length];
-			
+
 			for (int i = 0; i < parts.length; i++) {
 				reuse.f1[i] = new StringValue(parts[i]);
 			}
-			
+
 			return reuse;
 		}
 
@@ -323,31 +325,31 @@ public class MassiveStringValueSorting {
 			return next(new Tuple2<StringValue, StringValue[]>(new StringValue(), new StringValue[0]));
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	private File generateFileWithStrings(int numStrings, String prefix) throws IOException {
 		final Random rnd = new Random(SEED);
-		
+
 		final StringBuilder bld = new StringBuilder();
 		final int resetValue = prefix.length();
-		
+
 		bld.append(prefix);
-		
+
 		File f = File.createTempFile("strings", "txt");
 		BufferedWriter wrt = null;
 		try {
 			wrt = new BufferedWriter(new FileWriter(f));
-		
-			for (int i = 0 ; i < numStrings; i++) {
+
+			for (int i = 0; i < numStrings; i++) {
 				bld.setLength(resetValue);
-				
+
 				int len = rnd.nextInt(20) + 300;
 				for (int k = 0; k < len; k++) {
 					char c = (char) (rnd.nextInt(80) + 40);
 					bld.append(c);
 				}
-				
+
 				String str = bld.toString();
 				wrt.write(str);
 				wrt.newLine();
@@ -357,40 +359,40 @@ public class MassiveStringValueSorting {
 				wrt.close();
 			}
 		}
-		
+
 		return f;
 	}
-	
+
 	private File generateFileWithStringTuples(int numStrings, String prefix) throws IOException {
 		final Random rnd = new Random(SEED);
-		
+
 		final StringBuilder bld = new StringBuilder();
 
 		File f = File.createTempFile("strings", "txt");
 		BufferedWriter wrt = null;
 		try {
 			wrt = new BufferedWriter(new FileWriter(f));
-		
-			for (int i = 0 ; i < numStrings; i++) {
+
+			for (int i = 0; i < numStrings; i++) {
 				bld.setLength(0);
-				
+
 				int numComps = rnd.nextInt(5) + 1;
-				
+
 				for (int z = 0; z < numComps; z++) {
 					if (z > 0) {
 						bld.append(' ');
 					}
 					bld.append(prefix);
-				
+
 					int len = rnd.nextInt(20) + 10;
 					for (int k = 0; k < len; k++) {
 						char c = (char) (rnd.nextInt(80) + 40);
 						bld.append(c);
 					}
 				}
-				
+
 				String str = bld.toString();
-				
+
 				wrt.write(str);
 				wrt.newLine();
 			}
@@ -399,12 +401,12 @@ public class MassiveStringValueSorting {
 				wrt.close();
 			}
 		}
-		
+
 		return f;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public static void main(String[] args) {
 		new MassiveStringValueSorting().testStringValueSorting();
 		new MassiveStringValueSorting().testStringValueTuplesSorting();

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
index ee3b4b2..0b8fd1c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
@@ -38,14 +38,14 @@ import static org.junit.Assert.fail;
  * with a parallelism of 100.
  */
 public class NotSoMiniClusterIterations {
-	
+
 	private static final int PARALLELISM = 100;
-	
+
 	public static void main(String[] args) {
 		if ((Runtime.getRuntime().maxMemory() >>> 20) < 5000) {
 			throw new RuntimeException("This test program needs to run with at least 5GB of heap space.");
 		}
-		
+
 		LocalFlinkMiniCluster cluster = null;
 
 		try {
@@ -55,7 +55,7 @@ public class NotSoMiniClusterIterations {
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
 			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1000);
 			config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 8 * 1024);
-			
+
 			config.setInteger("taskmanager.net.server.numThreads", 1);
 			config.setInteger("taskmanager.net.client.numThreads", 1);