You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:08 UTC

[02/10] flink git commit: [FLINK-2725] Add Max/Min/Sum aggregation for mutable types.

[FLINK-2725] Add Max/Min/Sum aggregation for mutable types.

This closes #1191


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da248b15
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da248b15
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da248b15

Branch: refs/heads/master
Commit: da248b15e1b1dbe09345d3bb186dc815a45e9a3c
Parents: 6491559
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Sep 22 13:01:47 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Oct 19 15:39:28 2015 +0200

----------------------------------------------------------------------
 .../aggregation/MaxAggregationFunction.java     |  83 ++-
 .../aggregation/MinAggregationFunction.java     |  85 ++-
 .../aggregation/SumAggregationFunction.java     | 190 ++++-
 .../flink/api/java/typeutils/ValueTypeInfo.java |  13 +-
 .../test/javaApiOperators/AggregateITCase.java  |  71 ++
 .../util/ValueCollectionDataSets.java           | 730 +++++++++++++++++++
 6 files changed, 1110 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
index f25ca87..59d9e13 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
@@ -18,35 +18,74 @@
 
 package org.apache.flink.api.java.aggregation;
 
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.ResettableValue;
 
-public class MaxAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> {
-	private static final long serialVersionUID = 1L;
 
-	private T value;
+public abstract class MaxAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> {
+	private static final long serialVersionUID = 1L;
 
 	@Override
-	public void initializeAggregate() {
-		value = null;
+	public String toString() {
+		return "MAX";
 	}
 
-	@Override
-	public void aggregate(T val) {
-		if (value != null) {
-			int cmp = value.compareTo(val);
-			value = (cmp > 0) ? value : val;
-		} else {
-			value = val;
+	// --------------------------------------------------------------------------------------------
+
+	public static final class ImmutableMaxAgg<U extends Comparable<U>> extends MaxAggregationFunction<U> {
+		private static final long serialVersionUID = 1L;
+
+		private U value;
+
+		@Override
+		public void initializeAggregate() {
+			value = null;
 		}
-	}
 
-	@Override
-	public T getAggregate() {
-		return value;
+		@Override
+		public void aggregate(U val) {
+			if (value != null) {
+				int cmp = value.compareTo(val);
+				value = (cmp > 0) ? value : val;
+			} else {
+				value = val;
+			}
+		}
+
+		@Override
+		public U getAggregate() {
+			return value;
+		}
 	}
 	
-	@Override
-	public String toString() {
-		return "MAX";
+	// --------------------------------------------------------------------------------------------
+
+	public static final class MutableMaxAgg<U extends Comparable<U> & ResettableValue<U> & CopyableValue<U>> extends MaxAggregationFunction<U> {
+		private static final long serialVersionUID = 1L;
+
+		private U value;
+
+		@Override
+		public void initializeAggregate() {
+			value = null;
+		}
+
+		@Override
+		public void aggregate(U val) {
+			if (value != null) {
+				int cmp = value.compareTo(val);
+				if (cmp < 0) {
+					value.setValue(val);
+				}
+			} else {
+				value = val.copy();
+			}
+		}
+
+		@Override
+		public U getAggregate() {
+			return value;
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -58,7 +97,11 @@ public class MaxAggregationFunction<T extends Comparable<T>> extends Aggregation
 		@Override
 		public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) {
 			if (Comparable.class.isAssignableFrom(type)) {
-				return (AggregationFunction<T>) new MaxAggregationFunction();
+				if (ResettableValue.class.isAssignableFrom(type) & CopyableValue.class.isAssignableFrom(type)) {
+					return (AggregationFunction<T>) new MutableMaxAgg();
+				} else {
+					return (AggregationFunction<T>) new ImmutableMaxAgg();
+				}
 			} else {
 				throw new UnsupportedAggregationTypeException("The type " + type.getName() + 
 					" is not supported for maximum aggregation. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
index faf28a7..b72b0f4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
@@ -18,35 +18,74 @@
 
 package org.apache.flink.api.java.aggregation;
 
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.ResettableValue;
 
-public class MinAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> {
-	private static final long serialVersionUID = 1L;
 
-	private T value;
+public abstract class MinAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> {
+	private static final long serialVersionUID = 1L;
 
 	@Override
-	public void initializeAggregate() {
-		value = null;
+	public String toString() {
+		return "MIN";
 	}
 
-	@Override
-	public void aggregate(T val) {
-		if (value != null) {
-			int cmp = value.compareTo(val);
-			value = (cmp < 0) ? value : val;
-		} else {
-			value = val;
+	// --------------------------------------------------------------------------------------------
+
+	public static final class ImmutableMinAgg<U extends Comparable<U>> extends MinAggregationFunction<U> {
+		private static final long serialVersionUID = 1L;
+
+		private U value;
+
+		@Override
+		public void initializeAggregate() {
+			value = null;
 		}
-	}
 
-	@Override
-	public T getAggregate() {
-		return value;
+		@Override
+		public void aggregate(U val) {
+			if (value != null) {
+				int cmp = value.compareTo(val);
+				value = (cmp < 0) ? value : val;
+			} else {
+				value = val;
+			}
+		}
+
+		@Override
+		public U getAggregate() {
+			return value;
+		}
 	}
-	
-	@Override
-	public String toString() {
-		return "MIN";
+
+	// --------------------------------------------------------------------------------------------
+
+	public static final class MutableMinAgg<U extends Comparable<U> & ResettableValue<U> & CopyableValue<U>> extends MinAggregationFunction<U> {
+		private static final long serialVersionUID = 1L;
+
+		private U value;
+
+		@Override
+		public void initializeAggregate() {
+			value = null;
+		}
+
+		@Override
+		public void aggregate(U val) {
+			if (value != null) {
+				int cmp = value.compareTo(val);
+				if (cmp > 0) {
+					value.setValue(val);
+				}
+			} else {
+				value = val.copy();
+			}
+		}
+
+		@Override
+		public U getAggregate() {
+			return value;
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -58,7 +97,11 @@ public class MinAggregationFunction<T extends Comparable<T>> extends Aggregation
 		@Override
 		public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) {
 			if (Comparable.class.isAssignableFrom(type)) {
-				return (AggregationFunction<T>) new MinAggregationFunction();
+				if (ResettableValue.class.isAssignableFrom(type) & CopyableValue.class.isAssignableFrom(type)) {
+					return (AggregationFunction<T>) new MutableMinAgg();
+				} else {
+					return (AggregationFunction<T>) new ImmutableMinAgg();
+				}
 			} else {
 				throw new UnsupportedAggregationTypeException("The type " + type.getName() + 
 					" is not supported for minimum aggregation. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
index 24e8f31..ad4644b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
@@ -18,21 +18,27 @@
 
 package org.apache.flink.api.java.aggregation;
 
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.ShortValue;
 
 public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	@Override
 	public String toString() {
 		return "SUM";
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public static final class ByteSumAgg extends SumAggregationFunction<Byte> {
 		private static final long serialVersionUID = 1L;
-		
+
 		private long agg;
 
 		@Override
@@ -50,10 +56,31 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
 			return (byte) agg;
 		}
 	}
-	
+
+	public static final class ByteValueSumAgg extends SumAggregationFunction<ByteValue> {
+		private static final long serialVersionUID = 1L;
+
+		private long agg;
+
+		@Override
+		public void initializeAggregate() {
+			agg = 0;
+		}
+
+		@Override
+		public void aggregate(ByteValue value) {
+			agg += value.getValue();
+		}
+
+		@Override
+		public ByteValue getAggregate() {
+			return new ByteValue((byte) agg);
+		}
+	}
+
 	public static final class ShortSumAgg extends SumAggregationFunction<Short> {
 		private static final long serialVersionUID = 1L;
-		
+
 		private long agg;
 
 		@Override
@@ -71,10 +98,31 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
 			return (short) agg;
 		}
 	}
-	
+
+	public static final class ShortValueSumAgg extends SumAggregationFunction<ShortValue> {
+		private static final long serialVersionUID = 1L;
+
+		private long agg;
+
+		@Override
+		public void initializeAggregate() {
+			agg = 0;
+		}
+
+		@Override
+		public void aggregate(ShortValue value) {
+			agg += value.getValue();
+		}
+
+		@Override
+		public ShortValue getAggregate() {
+			return new ShortValue((short) agg);
+		}
+	}
+
 	public static final class IntSumAgg extends SumAggregationFunction<Integer> {
 		private static final long serialVersionUID = 1L;
-		
+
 		private long agg;
 
 		@Override
@@ -92,10 +140,31 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
 			return (int) agg;
 		}
 	}
-	
+
+	public static final class IntValueSumAgg extends SumAggregationFunction<IntValue> {
+		private static final long serialVersionUID = 1L;
+
+		private long agg;
+
+		@Override
+		public void initializeAggregate() {
+			agg = 0;
+		}
+
+		@Override
+		public void aggregate(IntValue value) {
+			agg += value.getValue();
+		}
+
+		@Override
+		public IntValue getAggregate() {
+			return new IntValue((int) agg);
+		}
+	}
+
 	public static final class LongSumAgg extends SumAggregationFunction<Long> {
 		private static final long serialVersionUID = 1L;
-		
+
 		private long agg;
 
 		@Override
@@ -113,11 +182,32 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
 			return agg;
 		}
 	}
-	
+
+	public static final class LongValueSumAgg extends SumAggregationFunction<LongValue> {
+		private static final long serialVersionUID = 1L;
+
+		private long agg;
+
+		@Override
+		public void initializeAggregate() {
+			agg = 0L;
+		}
+
+		@Override
+		public void aggregate(LongValue value) {
+			agg += value.getValue();
+		}
+
+		@Override
+		public LongValue getAggregate() {
+			return new LongValue(agg);
+		}
+	}
+
 	public static final class FloatSumAgg extends SumAggregationFunction<Float> {
 		private static final long serialVersionUID = 1L;
-		
-		private float agg;
+
+		private double agg;
 
 		@Override
 		public void initializeAggregate() {
@@ -131,13 +221,34 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
 
 		@Override
 		public Float getAggregate() {
-			return agg;
+			return (float) agg;
+		}
+	}
+
+	public static final class FloatValueSumAgg extends SumAggregationFunction<FloatValue> {
+		private static final long serialVersionUID = 1L;
+
+		private double agg;
+
+		@Override
+		public void initializeAggregate() {
+			agg = 0.0f;
+		}
+
+		@Override
+		public void aggregate(FloatValue value) {
+			agg += value.getValue();
+		}
+
+		@Override
+		public FloatValue getAggregate() {
+			return new FloatValue((float) agg);
 		}
 	}
-	
+
 	public static final class DoubleSumAgg extends SumAggregationFunction<Double> {
 		private static final long serialVersionUID = 1L;
-		
+
 		private double agg;
 
 		@Override
@@ -155,36 +266,75 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
 			return agg;
 		}
 	}
-	
+
+	public static final class DoubleValueSumAgg extends SumAggregationFunction<DoubleValue> {
+		private static final long serialVersionUID = 1L;
+
+		private double agg;
+
+		@Override
+		public void initializeAggregate() {
+			agg = 0.0;
+		}
+
+		@Override
+		public void aggregate(DoubleValue value) {
+			agg += value.getValue();
+		}
+
+		@Override
+		public DoubleValue getAggregate() {
+			return new DoubleValue(agg);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public static final class SumAggregationFunctionFactory implements AggregationFunctionFactory {
 		private static final long serialVersionUID = 1L;
-		
+
 		@SuppressWarnings("unchecked")
 		@Override
 		public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) {
 			if (type == Long.class) {
 				return (AggregationFunction<T>) new LongSumAgg();
 			}
+			else if (type == LongValue.class) {
+				return (AggregationFunction<T>) new LongValueSumAgg();
+			}
 			else if (type == Integer.class) {
 				return (AggregationFunction<T>) new IntSumAgg();
 			}
+			else if (type == IntValue.class) {
+				return (AggregationFunction<T>) new IntValueSumAgg();
+			}
 			else if (type == Double.class) {
 				return (AggregationFunction<T>) new DoubleSumAgg();
 			}
+			else if (type == DoubleValue.class) {
+				return (AggregationFunction<T>) new DoubleValueSumAgg();
+			}
 			else if (type == Float.class) {
 				return (AggregationFunction<T>) new FloatSumAgg();
 			}
+			else if (type == FloatValue.class) {
+				return (AggregationFunction<T>) new FloatValueSumAgg();
+			}
 			else if (type == Byte.class) {
 				return (AggregationFunction<T>) new ByteSumAgg();
 			}
+			else if (type == ByteValue.class) {
+				return (AggregationFunction<T>) new ByteValueSumAgg();
+			}
 			else if (type == Short.class) {
 				return (AggregationFunction<T>) new ShortSumAgg();
 			}
+			else if (type == ShortValue.class) {
+				return (AggregationFunction<T>) new ShortValueSumAgg();
+			}
 			else {
 				throw new UnsupportedAggregationTypeException("The type " + type.getName() + 
-					" has currently not supported for built-in sum aggregations.");
+					" is currently not supported for built-in sum aggregations.");
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index 0b4823e..5187de7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -51,7 +51,18 @@ import org.apache.flink.types.Value;
 public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implements AtomicType<T> {
 
 	private static final long serialVersionUID = 1L;
-	
+
+	public static final ValueTypeInfo<BooleanValue> BOOLEAN_VALUE_TYPE_INFO = new ValueTypeInfo<>(BooleanValue.class);
+	public static final ValueTypeInfo<ByteValue> BYTE_VALUE_TYPE_INFO = new ValueTypeInfo<>(ByteValue.class);
+	public static final ValueTypeInfo<CharValue> CHAR_VALUE_TYPE_INFO = new ValueTypeInfo<>(CharValue.class);
+	public static final ValueTypeInfo<DoubleValue> DOUBLE_VALUE_TYPE_INFO = new ValueTypeInfo<>(DoubleValue.class);
+	public static final ValueTypeInfo<FloatValue> FLOAT_VALUE_TYPE_INFO = new ValueTypeInfo<>(FloatValue.class);
+	public static final ValueTypeInfo<IntValue> INT_VALUE_TYPE_INFO = new ValueTypeInfo<>(IntValue.class);
+	public static final ValueTypeInfo<LongValue> LONG_VALUE_TYPE_INFO = new ValueTypeInfo<>(LongValue.class);
+	public static final ValueTypeInfo<NullValue> NULL_VALUE_TYPE_INFO = new ValueTypeInfo<>(NullValue.class);
+	public static final ValueTypeInfo<ShortValue> SHORT_VALUE_TYPE_INFO = new ValueTypeInfo<>(ShortValue.class);
+	public static final ValueTypeInfo<StringValue> STRING_VALUE_TYPE_INFO = new ValueTypeInfo<>(StringValue.class);
+
 	private final Class<T> type;
 	
 	public ValueTypeInfo(Class<T> type) {

http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
index d02f228..fc01ce7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
@@ -25,7 +25,11 @@ import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.javaApiOperators.util.ValueCollectionDataSets;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -62,6 +66,27 @@ public class AggregateITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testFullAggregateOfMutableValueTypes() throws Exception {
+		/*
+		 * Full Aggregate of mutable value types
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds
+				.aggregate(Aggregations.SUM, 0)
+				.and(Aggregations.MAX, 1)
+				.project(0, 1);
+
+		List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
+
+		String expected = "231,6\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
 	public void testGroupedAggregate() throws Exception {
 		/*
 		 * Grouped Aggregate
@@ -87,6 +112,31 @@ public class AggregateITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testGroupedAggregateOfMutableValueTypes() throws Exception {
+		/*
+		 * Grouped Aggregate of mutable value types
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds.groupBy(1)
+				.aggregate(Aggregations.SUM, 0)
+				.project(1, 0);
+
+		List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
+
+		String expected = "1,1\n" +
+				"2,5\n" +
+				"3,15\n" +
+				"4,34\n" +
+				"5,65\n" +
+				"6,111\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
 	public void testNestedAggregate() throws Exception {
 		/*
 		 * Nested Aggregate
@@ -106,4 +156,25 @@ public class AggregateITCase extends MultipleProgramsTestBase {
 
 		compareResultAsTuples(result, expected);
 	}
+
+	@Test
+	public void testNestedAggregateOfMutableValueTypes() throws Exception {
+		/*
+		 * Nested Aggregate of mutable value types
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple1<IntValue>> aggregateDs = ds.groupBy(1)
+				.aggregate(Aggregations.MIN, 0)
+				.aggregate(Aggregations.MIN, 0)
+				.project(0);
+
+		List<Tuple1<IntValue>> result = aggregateDs.collect();
+
+		String expected = "1\n";
+
+		compareResultAsTuples(result, expected);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
new file mode 100644
index 0000000..04a7bc5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
@@ -0,0 +1,730 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.util;
+
+import java.io.File;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.apache.hadoop.io.IntWritable;
+
+import scala.math.BigInt;
+
+/**
+ * #######################################################################################################
+ * 
+ * 			BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. 
+ * 			IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING!
+ * 
+ * #######################################################################################################
+ */
+public class ValueCollectionDataSets {
+
+	public static DataSet<Tuple3<IntValue, LongValue, StringValue>> get3TupleDataSet(ExecutionEnvironment env) {
+		List<Tuple3<IntValue, LongValue, StringValue>> data = new ArrayList<>();
+
+		data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new StringValue("Hi")));
+		data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new StringValue("Hello")));
+		data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new StringValue("Hello world")));
+		data.add(new Tuple3<>(new IntValue(4), new LongValue(3l), new StringValue("Hello world, how are you?")));
+		data.add(new Tuple3<>(new IntValue(5), new LongValue(3l), new StringValue("I am fine.")));
+		data.add(new Tuple3<>(new IntValue(6), new LongValue(3l), new StringValue("Luke Skywalker")));
+		data.add(new Tuple3<>(new IntValue(7), new LongValue(4l), new StringValue("Comment#1")));
+		data.add(new Tuple3<>(new IntValue(8), new LongValue(4l), new StringValue("Comment#2")));
+		data.add(new Tuple3<>(new IntValue(9), new LongValue(4l), new StringValue("Comment#3")));
+		data.add(new Tuple3<>(new IntValue(10), new LongValue(4l), new StringValue("Comment#4")));
+		data.add(new Tuple3<>(new IntValue(11), new LongValue(5l), new StringValue("Comment#5")));
+		data.add(new Tuple3<>(new IntValue(12), new LongValue(5l), new StringValue("Comment#6")));
+		data.add(new Tuple3<>(new IntValue(13), new LongValue(5l), new StringValue("Comment#7")));
+		data.add(new Tuple3<>(new IntValue(14), new LongValue(5l), new StringValue("Comment#8")));
+		data.add(new Tuple3<>(new IntValue(15), new LongValue(5l), new StringValue("Comment#9")));
+		data.add(new Tuple3<>(new IntValue(16), new LongValue(6l), new StringValue("Comment#10")));
+		data.add(new Tuple3<>(new IntValue(17), new LongValue(6l), new StringValue("Comment#11")));
+		data.add(new Tuple3<>(new IntValue(18), new LongValue(6l), new StringValue("Comment#12")));
+		data.add(new Tuple3<>(new IntValue(19), new LongValue(6l), new StringValue("Comment#13")));
+		data.add(new Tuple3<>(new IntValue(20), new LongValue(6l), new StringValue("Comment#14")));
+		data.add(new Tuple3<>(new IntValue(21), new LongValue(6l), new StringValue("Comment#15")));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<Tuple3<IntValue, LongValue, StringValue>> getSmall3TupleDataSet(ExecutionEnvironment env) {
+		List<Tuple3<IntValue, LongValue, StringValue>> data = new ArrayList<>();
+
+		data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new StringValue("Hi")));
+		data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new StringValue("Hello")));
+		data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new StringValue("Hello world")));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> get5TupleDataSet(ExecutionEnvironment env) {
+		List<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> data = new ArrayList<>();
+
+		data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new IntValue(0), new StringValue("Hallo"), new LongValue(1l)));
+		data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l)));
+		data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l)));
+		data.add(new Tuple5<>(new IntValue(3), new LongValue(4l), new IntValue(3), new StringValue("Hallo Welt wie gehts?"), new LongValue(2l)));
+		data.add(new Tuple5<>(new IntValue(3), new LongValue(5l), new IntValue(4), new StringValue("ABC"), new LongValue(2l)));
+		data.add(new Tuple5<>(new IntValue(3), new LongValue(6l), new IntValue(5), new StringValue("BCD"), new LongValue(3l)));
+		data.add(new Tuple5<>(new IntValue(4), new LongValue(7l), new IntValue(6), new StringValue("CDE"), new LongValue(2l)));
+		data.add(new Tuple5<>(new IntValue(4), new LongValue(8l), new IntValue(7), new StringValue("DEF"), new LongValue(1l)));
+		data.add(new Tuple5<>(new IntValue(4), new LongValue(9l), new IntValue(8), new StringValue("EFG"), new LongValue(1l)));
+		data.add(new Tuple5<>(new IntValue(4), new LongValue(10l), new IntValue(9), new StringValue("FGH"), new LongValue(2l)));
+		data.add(new Tuple5<>(new IntValue(5), new LongValue(11l), new IntValue(10), new StringValue("GHI"), new LongValue(1l)));
+		data.add(new Tuple5<>(new IntValue(5), new LongValue(12l), new IntValue(11), new StringValue("HIJ"), new LongValue(3l)));
+		data.add(new Tuple5<>(new IntValue(5), new LongValue(13l), new IntValue(12), new StringValue("IJK"), new LongValue(3l)));
+		data.add(new Tuple5<>(new IntValue(5), new LongValue(14l), new IntValue(13), new StringValue("JKL"), new LongValue(2l)));
+		data.add(new Tuple5<>(new IntValue(5), new LongValue(15l), new IntValue(14), new StringValue("KLM"), new LongValue(2l)));
+
+		Collections.shuffle(data);
+
+		TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> type = new
+			TupleTypeInfo<>(
+				ValueTypeInfo.INT_VALUE_TYPE_INFO,
+				ValueTypeInfo.LONG_VALUE_TYPE_INFO,
+				ValueTypeInfo.INT_VALUE_TYPE_INFO,
+				ValueTypeInfo.STRING_VALUE_TYPE_INFO,
+				ValueTypeInfo.LONG_VALUE_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> getSmall5TupleDataSet(ExecutionEnvironment env) {
+		List<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> data = new ArrayList<>();
+
+		data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new IntValue(0), new StringValue("Hallo"), new LongValue(1l)));
+		data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l)));
+		data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l)));
+
+		Collections.shuffle(data);
+
+		TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> type = new
+			TupleTypeInfo<>(
+				ValueTypeInfo.INT_VALUE_TYPE_INFO,
+				ValueTypeInfo.LONG_VALUE_TYPE_INFO,
+				ValueTypeInfo.INT_VALUE_TYPE_INFO,
+				ValueTypeInfo.STRING_VALUE_TYPE_INFO,
+				ValueTypeInfo.LONG_VALUE_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> getSmallNestedTupleDataSet(ExecutionEnvironment env) {
+		List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = new ArrayList<>();
+
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(1)), new StringValue("one")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(2)), new StringValue("two")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(3)), new StringValue("three")));
+
+		TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> type = new
+			TupleTypeInfo<>(
+				new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO),
+				ValueTypeInfo.STRING_VALUE_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) {
+		List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = new ArrayList<>();
+
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(3)), new StringValue("a")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(2)), new StringValue("a")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(1)), new StringValue("a")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(2)), new StringValue("b")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(3)), new StringValue("c")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(6)), new StringValue("c")));
+		data.add(new Tuple2<>(new Tuple2<>(new IntValue(4), new IntValue(9)), new StringValue("c")));
+
+		TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> type = new
+			TupleTypeInfo<>(
+				new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO),
+				ValueTypeInfo.STRING_VALUE_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+
+	public static DataSet<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) {
+		List<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> data = new ArrayList<>();
+
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(1), new IntValue(3)), new StringValue("a"), new IntValue(2)));
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(1), new IntValue(2)), new StringValue("a"), new IntValue(1)));
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(2), new IntValue(1)), new StringValue("a"), new IntValue(3)));
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(2), new IntValue(2)), new StringValue("b"), new IntValue(4)));
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(3), new IntValue(3)), new StringValue("c"), new IntValue(5)));
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(3), new IntValue(6)), new StringValue("c"), new IntValue(6)));
+		data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(4), new IntValue(9)), new StringValue("c"), new IntValue(7)));
+
+		TupleTypeInfo<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> type = new
+			TupleTypeInfo<>(
+				new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO),
+				ValueTypeInfo.STRING_VALUE_TYPE_INFO,
+				ValueTypeInfo.INT_VALUE_TYPE_INFO
+		);
+
+		return env.fromCollection(data, type);
+	}
+	
+	public static DataSet<StringValue> getStringDataSet(ExecutionEnvironment env) {
+		List<StringValue> data = new ArrayList<>();
+
+		data.add(new StringValue("Hi"));
+		data.add(new StringValue("Hello"));
+		data.add(new StringValue("Hello world"));
+		data.add(new StringValue("Hello world, how are you?"));
+		data.add(new StringValue("I am fine."));
+		data.add(new StringValue("Luke Skywalker"));
+		data.add(new StringValue("Random comment"));
+		data.add(new StringValue("LOL"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<IntValue> getIntDataSet(ExecutionEnvironment env) {
+		List<IntValue> data = new ArrayList<>();
+
+		data.add(new IntValue(1));
+		data.add(new IntValue(2));
+		data.add(new IntValue(2));
+		data.add(new IntValue(3));
+		data.add(new IntValue(3));
+		data.add(new IntValue(3));
+		data.add(new IntValue(4));
+		data.add(new IntValue(4));
+		data.add(new IntValue(4));
+		data.add(new IntValue(4));
+		data.add(new IntValue(5));
+		data.add(new IntValue(5));
+		data.add(new IntValue(5));
+		data.add(new IntValue(5));
+		data.add(new IntValue(5));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env) {
+		List<CustomType> data = new ArrayList<CustomType>();
+
+		data.add(new CustomType(1, 0l, "Hi"));
+		data.add(new CustomType(2, 1l, "Hello"));
+		data.add(new CustomType(2, 2l, "Hello world"));
+		data.add(new CustomType(3, 3l, "Hello world, how are you?"));
+		data.add(new CustomType(3, 4l, "I am fine."));
+		data.add(new CustomType(3, 5l, "Luke Skywalker"));
+		data.add(new CustomType(4, 6l, "Comment#1"));
+		data.add(new CustomType(4, 7l, "Comment#2"));
+		data.add(new CustomType(4, 8l, "Comment#3"));
+		data.add(new CustomType(4, 9l, "Comment#4"));
+		data.add(new CustomType(5, 10l, "Comment#5"));
+		data.add(new CustomType(5, 11l, "Comment#6"));
+		data.add(new CustomType(5, 12l, "Comment#7"));
+		data.add(new CustomType(5, 13l, "Comment#8"));
+		data.add(new CustomType(5, 14l, "Comment#9"));
+		data.add(new CustomType(6, 15l, "Comment#10"));
+		data.add(new CustomType(6, 16l, "Comment#11"));
+		data.add(new CustomType(6, 17l, "Comment#12"));
+		data.add(new CustomType(6, 18l, "Comment#13"));
+		data.add(new CustomType(6, 19l, "Comment#14"));
+		data.add(new CustomType(6, 20l, "Comment#15"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env) {
+		List<CustomType> data = new ArrayList<CustomType>();
+
+		data.add(new CustomType(1, 0l, "Hi"));
+		data.add(new CustomType(2, 1l, "Hello"));
+		data.add(new CustomType(2, 2l, "Hello world"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static class CustomType implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public IntValue myInt;
+		public LongValue myLong;
+		public StringValue myString;
+
+		public CustomType() {
+		}
+
+		public CustomType(int i, long l, String s) {
+			myInt = new IntValue(i);
+			myLong = new LongValue(l);
+			myString = new StringValue(s);
+		}
+
+		@Override
+		public String toString() {
+			return myInt + "," + myLong + "," + myString;
+		}
+	}
+
+	public static class CustomTypeComparator implements Comparator<CustomType> {
+
+		@Override
+		public int compare(CustomType o1, CustomType o2) {
+			int diff = o1.myInt.getValue() - o2.myInt.getValue();
+			if (diff != 0) {
+				return diff;
+			}
+			diff = (int) (o1.myLong.getValue() - o2.myLong.getValue());
+			return diff != 0 ? diff : o1.myString.getValue().compareTo(o2.myString.getValue());
+		}
+
+	}
+
+	public static DataSet<Tuple7<IntValue, StringValue, IntValue, IntValue, LongValue, StringValue, LongValue>> getSmallTuplebasedDataSet(ExecutionEnvironment env) {
+		List<Tuple7<IntValue, StringValue, IntValue, IntValue, LongValue, StringValue, LongValue>> data = new ArrayList<>();
+		
+		data.add(new Tuple7<>(new IntValue(1), new StringValue("First"), new IntValue(10), new IntValue(100), new LongValue(1000L), new StringValue("One"), new LongValue(10000L)));
+		data.add(new Tuple7<>(new IntValue(2), new StringValue("Second"), new IntValue(20), new IntValue(200), new LongValue(2000L), new StringValue("Two"), new LongValue(20000L)));
+		data.add(new Tuple7<>(new IntValue(3), new StringValue("Third"), new IntValue(30), new IntValue(300), new LongValue(3000L), new StringValue("Three"), new LongValue(30000L)));
+
+		return env.fromCollection(data);
+	}
+	
+	public static DataSet<Tuple7<LongValue, IntValue, IntValue, LongValue, StringValue, IntValue, StringValue>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) {
+		List<Tuple7<LongValue, IntValue, IntValue, LongValue, StringValue, IntValue, StringValue>> data = new ArrayList<>();
+		
+		data.add(new Tuple7<>(new LongValue(10000L), new IntValue(10), new IntValue(100), new LongValue(1000L), new StringValue("One"), new IntValue(1), new StringValue("First")));
+		data.add(new Tuple7<>(new LongValue(20000L), new IntValue(20), new IntValue(200), new LongValue(2000L), new StringValue("Two"), new IntValue(2), new StringValue("Second")));
+		data.add(new Tuple7<>(new LongValue(30000L), new IntValue(30), new IntValue(300), new LongValue(3000L), new StringValue("Three"), new IntValue(3), new StringValue("Third")));
+		
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env) {
+		List<POJO> data = new ArrayList<POJO>();
+
+		data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L /*nestedPojo.longNumber*/));
+		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
+		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env) {
+		List<POJO> data = new ArrayList<POJO>();
+
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); // 5x
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+		data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
+		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); // 2x
+		data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
+
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env) {
+		List<POJO> data = new ArrayList<POJO>();
+
+		data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); // 5x
+		data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L));
+		data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L));
+		data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L));
+		data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L));
+		data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L));
+		data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 10200L)); // 2x
+		data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 10100L));
+
+		return env.fromCollection(data);
+	}
+
+	public static class POJO {
+		public IntValue number;
+		public StringValue str;
+		public Tuple2<IntValue, CustomType> nestedTupleWithCustom;
+		public NestedPojo nestedPojo;
+		public transient LongValue ignoreMe;
+
+		public POJO(int i0, String s0,
+					int i1, int i2, long l0, String s1,
+					long l1) {
+			this.number = new IntValue(i0);
+			this.str = new StringValue(s0);
+			this.nestedTupleWithCustom = new Tuple2<>(new IntValue(i1), new CustomType(i2, l0, s1));
+			this.nestedPojo = new NestedPojo();
+			this.nestedPojo.longNumber = new LongValue(l1);
+		}
+
+		public POJO() {
+		}
+
+		@Override
+		public String toString() {
+			return number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber;
+		}
+	}
+
+	public static class NestedPojo {
+		public static Object ignoreMe;
+		public LongValue longNumber;
+
+		public NestedPojo() {
+		}
+	}
+
+	public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env) {
+		List<CrazyNested> data = new ArrayList<CrazyNested>();
+
+		data.add(new CrazyNested("aa"));
+		data.add(new CrazyNested("bb"));
+		data.add(new CrazyNested("bb"));
+		data.add(new CrazyNested("cc"));
+		data.add(new CrazyNested("cc"));
+		data.add(new CrazyNested("cc"));
+
+		return env.fromCollection(data);
+	}
+
+	public static class CrazyNested {
+		public CrazyNestedL1 nest_Lvl1;
+		public LongValue something; // test proper null-value handling
+
+		public CrazyNested() {
+		}
+
+		public CrazyNested(String set, String second, long s) { // additional CTor to set all fields to non-null values
+			this(set);
+			something = new LongValue(s);
+			nest_Lvl1.a = new StringValue(second);
+		}
+
+		public CrazyNested(String set) {
+			nest_Lvl1 = new CrazyNestedL1();
+			nest_Lvl1.nest_Lvl2 = new CrazyNestedL2();
+			nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3();
+			nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new CrazyNestedL4();
+			nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = new StringValue(set);
+		}
+	}
+
+	public static class CrazyNestedL1 {
+		public StringValue a;
+		public IntValue b;
+		public CrazyNestedL2 nest_Lvl2;
+	}
+
+	public static class CrazyNestedL2 {
+		public CrazyNestedL3 nest_Lvl3;
+	}
+
+	public static class CrazyNestedL3 {
+		public CrazyNestedL4 nest_Lvl4;
+	}
+
+	public static class CrazyNestedL4 {
+		public StringValue f1nal;
+	}
+
+	// Copied from TypeExtractorTest
+	public static class FromTuple extends Tuple3<StringValue, StringValue, LongValue> {
+		private static final long serialVersionUID = 1L;
+		public IntValue special;
+	}
+
+	public static class FromTupleWithCTor extends FromTuple {
+
+		private static final long serialVersionUID = 1L;
+
+		public FromTupleWithCTor() {}
+
+		public FromTupleWithCTor(int special, long tupleField) {
+			this.special = new IntValue(special);
+			this.setField(new LongValue(tupleField), 2);
+		}
+	}
+
+	public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env) {
+		List<FromTupleWithCTor> data = new ArrayList<>();
+		data.add(new FromTupleWithCTor(1, 10L)); // 3x
+		data.add(new FromTupleWithCTor(1, 10L));
+		data.add(new FromTupleWithCTor(1, 10L));
+		data.add(new FromTupleWithCTor(2, 20L)); // 2x
+		data.add(new FromTupleWithCTor(2, 20L));
+		return env.fromCollection(data);
+	}
+
+	public static class PojoContainingTupleAndWritable {
+		public IntValue someInt;
+		public StringValue someString;
+		public IntWritable hadoopFan;
+		public Tuple2<LongValue, LongValue> theTuple;
+
+		public PojoContainingTupleAndWritable() {
+		}
+
+		public PojoContainingTupleAndWritable(int i, long l1, long l2) {
+			hadoopFan = new IntWritable(i);
+			someInt = new IntValue(i);
+			theTuple = new Tuple2<>(new LongValue(l1), new LongValue(l2));
+		}
+	}
+
+	public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env) {
+		List<PojoContainingTupleAndWritable> data = new ArrayList<>();
+		data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+		return env.fromCollection(data);
+	}
+
+
+
+	public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) {
+		List<PojoContainingTupleAndWritable> data = new ArrayList<>();
+		data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
+		data.add(new PojoContainingTupleAndWritable(2, 20L, 201L));
+		data.add(new PojoContainingTupleAndWritable(2, 30L, 200L));
+		data.add(new PojoContainingTupleAndWritable(2, 30L, 600L));
+		data.add(new PojoContainingTupleAndWritable(2, 30L, 400L));
+		return env.fromCollection(data);
+	}
+
+	public static DataSet<Tuple3<IntValue, CrazyNested, POJO>> getTupleContainingPojos(ExecutionEnvironment env) {
+		List<Tuple3<IntValue, CrazyNested, POJO>> data = new ArrayList<>();
+		data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 3x
+		data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
+		data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
+		// POJO is not initialized according to the first two fields.
+		data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(2), new CrazyNested("two", "duo", 2L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 1x
+		return env.fromCollection(data);
+	}
+
+	public static class Pojo1 {
+		public StringValue a;
+		public StringValue b;
+
+		public Pojo1() {}
+
+		public Pojo1(String a, String b) {
+			this.a = new StringValue(a);
+			this.b = new StringValue(b);
+		}
+	}
+
+	public static class Pojo2 {
+		public StringValue a2;
+		public StringValue b2;
+	}
+
+	public static class PojoWithMultiplePojos {
+		public Pojo1 p1;
+		public Pojo2 p2;
+		public IntValue i0;
+
+		public PojoWithMultiplePojos() {
+		}
+
+		public PojoWithMultiplePojos(String a, String b, String a1, String b1, int i0) {
+			p1 = new Pojo1();
+			p1.a = new StringValue(a);
+			p1.b = new StringValue(b);
+			p2 = new Pojo2();
+			p2.a2 = new StringValue(a1);
+			p2.b2 = new StringValue(b1);
+			this.i0 = new IntValue(i0);
+		}
+	}
+
+	public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env) {
+		List<PojoWithMultiplePojos> data = new ArrayList<>();
+		data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1));
+		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+		data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+		data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
+		data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
+		return env.fromCollection(data);
+	}
+
+	public enum Category {
+		CAT_A, CAT_B;
+	}
+
+	public static class PojoWithDateAndEnum {
+		public StringValue group;
+		public Date date;
+		public Category cat;
+	}
+	
+	public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) {
+		List<PojoWithDateAndEnum> data = new ArrayList<PojoWithDateAndEnum>();
+		
+		PojoWithDateAndEnum one = new PojoWithDateAndEnum();
+		one.group = new StringValue("a");
+		one.date = new Date(666);
+		one.cat = Category.CAT_A;
+		data.add(one);
+		
+		PojoWithDateAndEnum two = new PojoWithDateAndEnum();
+		two.group = new StringValue("a");
+		two.date = new Date(666);
+		two.cat = Category.CAT_A;
+		data.add(two);
+		
+		PojoWithDateAndEnum three = new PojoWithDateAndEnum();
+		three.group = new StringValue("b");
+		three.date = new Date(666);
+		three.cat = Category.CAT_B;
+		data.add(three);
+		
+		return env.fromCollection(data);
+	}
+
+	public static class PojoWithCollection {
+		public List<Pojo1> pojos;
+		public IntValue key;
+		public java.sql.Date sqlDate;
+		public BigInteger bigInt;
+		public BigDecimal bigDecimalKeepItNull;
+		public BigInt scalaBigInt;
+		public List<Object> mixed;
+
+		@Override
+		public String toString() {
+			return "PojoWithCollection{" +
+					"pojos.size()=" + pojos.size() +
+					", key=" + key +
+					", sqlDate=" + sqlDate +
+					", bigInt=" + bigInt +
+					", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
+					", scalaBigInt=" + scalaBigInt +
+					", mixed=" + mixed +
+					'}';
+		}
+	}
+
+	public static class PojoWithCollectionGeneric {
+		public List<Pojo1> pojos;
+		public IntValue key;
+		public java.sql.Date sqlDate;
+		public BigInteger bigInt;
+		public BigDecimal bigDecimalKeepItNull;
+		public BigInt scalaBigInt;
+		public List<Object> mixed;
+		private PojoWithDateAndEnum makeMeGeneric;
+
+		@Override
+		public String toString() {
+			return "PojoWithCollection{" +
+					"pojos.size()=" + pojos.size() +
+					", key=" + key +
+					", sqlDate=" + sqlDate +
+					", bigInt=" + bigInt +
+					", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
+					", scalaBigInt=" + scalaBigInt +
+					", mixed=" + mixed +
+					'}';
+		}
+	}
+
+	public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env) {
+		List<PojoWithCollection> data = new ArrayList<>();
+
+		List<Pojo1> pojosList1 = new ArrayList<>();
+		pojosList1.add(new Pojo1("a", "aa"));
+		pojosList1.add(new Pojo1("b", "bb"));
+
+		List<Pojo1> pojosList2 = new ArrayList<>();
+		pojosList2.add(new Pojo1("a2", "aa2"));
+		pojosList2.add(new Pojo1("b2", "bb2"));
+
+		PojoWithCollection pwc1 = new PojoWithCollection();
+		pwc1.pojos = pojosList1;
+		pwc1.key = new IntValue(0);
+		pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+		pwc1.scalaBigInt = BigInt.int2bigInt(10);
+		pwc1.bigDecimalKeepItNull = null;
+		
+		// use calendar to make it stable across time zones
+		GregorianCalendar gcl1 = new GregorianCalendar(2033, 04, 18);
+		pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
+		pwc1.mixed = new ArrayList<Object>();
+		Map<StringValue, IntValue> map = new HashMap<>();
+		map.put(new StringValue("someKey"), new IntValue(1));
+		pwc1.mixed.add(map);
+		pwc1.mixed.add(new File("/this/is/wrong"));
+		pwc1.mixed.add("uhlala");
+
+		PojoWithCollection pwc2 = new PojoWithCollection();
+		pwc2.pojos = pojosList2;
+		pwc2.key = new IntValue(0);
+		pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+		pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
+		pwc2.bigDecimalKeepItNull = null;
+		
+		GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+		pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976
+
+		data.add(pwc1);
+		data.add(pwc2);
+
+		return env.fromCollection(data);
+	}
+
+}
+