You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:08 UTC
[02/10] flink git commit: [FLINK-2725] Add Max/Min/Sum aggregation
for mutable types.
[FLINK-2725] Add Max/Min/Sum aggregation for mutable types.
This closes #1191
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da248b15
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da248b15
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da248b15
Branch: refs/heads/master
Commit: da248b15e1b1dbe09345d3bb186dc815a45e9a3c
Parents: 6491559
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Sep 22 13:01:47 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Oct 19 15:39:28 2015 +0200
----------------------------------------------------------------------
.../aggregation/MaxAggregationFunction.java | 83 ++-
.../aggregation/MinAggregationFunction.java | 85 ++-
.../aggregation/SumAggregationFunction.java | 190 ++++-
.../flink/api/java/typeutils/ValueTypeInfo.java | 13 +-
.../test/javaApiOperators/AggregateITCase.java | 71 ++
.../util/ValueCollectionDataSets.java | 730 +++++++++++++++++++
6 files changed, 1110 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
index f25ca87..59d9e13 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MaxAggregationFunction.java
@@ -18,35 +18,74 @@
package org.apache.flink.api.java.aggregation;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.ResettableValue;
-public class MaxAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> {
- private static final long serialVersionUID = 1L;
- private T value;
+public abstract class MaxAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> {
+ private static final long serialVersionUID = 1L;
@Override
- public void initializeAggregate() {
- value = null;
+ public String toString() {
+ return "MAX";
}
- @Override
- public void aggregate(T val) {
- if (value != null) {
- int cmp = value.compareTo(val);
- value = (cmp > 0) ? value : val;
- } else {
- value = val;
+ // --------------------------------------------------------------------------------------------
+
+ public static final class ImmutableMaxAgg<U extends Comparable<U>> extends MaxAggregationFunction<U> {
+ private static final long serialVersionUID = 1L;
+
+ private U value;
+
+ @Override
+ public void initializeAggregate() {
+ value = null;
}
- }
- @Override
- public T getAggregate() {
- return value;
+ @Override
+ public void aggregate(U val) {
+ if (value != null) {
+ int cmp = value.compareTo(val);
+ value = (cmp > 0) ? value : val;
+ } else {
+ value = val;
+ }
+ }
+
+ @Override
+ public U getAggregate() {
+ return value;
+ }
}
- @Override
- public String toString() {
- return "MAX";
+ // --------------------------------------------------------------------------------------------
+
+ public static final class MutableMaxAgg<U extends Comparable<U> & ResettableValue<U> & CopyableValue<U>> extends MaxAggregationFunction<U> {
+ private static final long serialVersionUID = 1L;
+
+ private U value;
+
+ @Override
+ public void initializeAggregate() {
+ value = null;
+ }
+
+ @Override
+ public void aggregate(U val) {
+ if (value != null) {
+ int cmp = value.compareTo(val);
+ if (cmp < 0) {
+ value.setValue(val);
+ }
+ } else {
+ value = val.copy();
+ }
+ }
+
+ @Override
+ public U getAggregate() {
+ return value;
+ }
}
// --------------------------------------------------------------------------------------------
@@ -58,7 +97,11 @@ public class MaxAggregationFunction<T extends Comparable<T>> extends Aggregation
@Override
public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) {
if (Comparable.class.isAssignableFrom(type)) {
- return (AggregationFunction<T>) new MaxAggregationFunction();
+ if (ResettableValue.class.isAssignableFrom(type) & CopyableValue.class.isAssignableFrom(type)) {
+ return (AggregationFunction<T>) new MutableMaxAgg();
+ } else {
+ return (AggregationFunction<T>) new ImmutableMaxAgg();
+ }
} else {
throw new UnsupportedAggregationTypeException("The type " + type.getName() +
" is not supported for maximum aggregation. " +
http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
index faf28a7..b72b0f4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/MinAggregationFunction.java
@@ -18,35 +18,74 @@
package org.apache.flink.api.java.aggregation;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.ResettableValue;
-public class MinAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> {
- private static final long serialVersionUID = 1L;
- private T value;
+public abstract class MinAggregationFunction<T extends Comparable<T>> extends AggregationFunction<T> {
+ private static final long serialVersionUID = 1L;
@Override
- public void initializeAggregate() {
- value = null;
+ public String toString() {
+ return "MIN";
}
- @Override
- public void aggregate(T val) {
- if (value != null) {
- int cmp = value.compareTo(val);
- value = (cmp < 0) ? value : val;
- } else {
- value = val;
+ // --------------------------------------------------------------------------------------------
+
+ public static final class ImmutableMinAgg<U extends Comparable<U>> extends MinAggregationFunction<U> {
+ private static final long serialVersionUID = 1L;
+
+ private U value;
+
+ @Override
+ public void initializeAggregate() {
+ value = null;
}
- }
- @Override
- public T getAggregate() {
- return value;
+ @Override
+ public void aggregate(U val) {
+ if (value != null) {
+ int cmp = value.compareTo(val);
+ value = (cmp < 0) ? value : val;
+ } else {
+ value = val;
+ }
+ }
+
+ @Override
+ public U getAggregate() {
+ return value;
+ }
}
-
- @Override
- public String toString() {
- return "MIN";
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final class MutableMinAgg<U extends Comparable<U> & ResettableValue<U> & CopyableValue<U>> extends MinAggregationFunction<U> {
+ private static final long serialVersionUID = 1L;
+
+ private U value;
+
+ @Override
+ public void initializeAggregate() {
+ value = null;
+ }
+
+ @Override
+ public void aggregate(U val) {
+ if (value != null) {
+ int cmp = value.compareTo(val);
+ if (cmp > 0) {
+ value.setValue(val);
+ }
+ } else {
+ value = val.copy();
+ }
+ }
+
+ @Override
+ public U getAggregate() {
+ return value;
+ }
}
// --------------------------------------------------------------------------------------------
@@ -58,7 +97,11 @@ public class MinAggregationFunction<T extends Comparable<T>> extends Aggregation
@Override
public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) {
if (Comparable.class.isAssignableFrom(type)) {
- return (AggregationFunction<T>) new MinAggregationFunction();
+ if (ResettableValue.class.isAssignableFrom(type) & CopyableValue.class.isAssignableFrom(type)) {
+ return (AggregationFunction<T>) new MutableMinAgg();
+ } else {
+ return (AggregationFunction<T>) new ImmutableMinAgg();
+ }
} else {
throw new UnsupportedAggregationTypeException("The type " + type.getName() +
" is not supported for minimum aggregation. " +
http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
index 24e8f31..ad4644b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/SumAggregationFunction.java
@@ -18,21 +18,27 @@
package org.apache.flink.api.java.aggregation;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.ShortValue;
public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
-
+
private static final long serialVersionUID = 1L;
@Override
public String toString() {
return "SUM";
}
-
+
// --------------------------------------------------------------------------------------------
-
+
public static final class ByteSumAgg extends SumAggregationFunction<Byte> {
private static final long serialVersionUID = 1L;
-
+
private long agg;
@Override
@@ -50,10 +56,31 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
return (byte) agg;
}
}
-
+
+ public static final class ByteValueSumAgg extends SumAggregationFunction<ByteValue> {
+ private static final long serialVersionUID = 1L;
+
+ private long agg;
+
+ @Override
+ public void initializeAggregate() {
+ agg = 0;
+ }
+
+ @Override
+ public void aggregate(ByteValue value) {
+ agg += value.getValue();
+ }
+
+ @Override
+ public ByteValue getAggregate() {
+ return new ByteValue((byte) agg);
+ }
+ }
+
public static final class ShortSumAgg extends SumAggregationFunction<Short> {
private static final long serialVersionUID = 1L;
-
+
private long agg;
@Override
@@ -71,10 +98,31 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
return (short) agg;
}
}
-
+
+ public static final class ShortValueSumAgg extends SumAggregationFunction<ShortValue> {
+ private static final long serialVersionUID = 1L;
+
+ private long agg;
+
+ @Override
+ public void initializeAggregate() {
+ agg = 0;
+ }
+
+ @Override
+ public void aggregate(ShortValue value) {
+ agg += value.getValue();
+ }
+
+ @Override
+ public ShortValue getAggregate() {
+ return new ShortValue((short) agg);
+ }
+ }
+
public static final class IntSumAgg extends SumAggregationFunction<Integer> {
private static final long serialVersionUID = 1L;
-
+
private long agg;
@Override
@@ -92,10 +140,31 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
return (int) agg;
}
}
-
+
+ public static final class IntValueSumAgg extends SumAggregationFunction<IntValue> {
+ private static final long serialVersionUID = 1L;
+
+ private long agg;
+
+ @Override
+ public void initializeAggregate() {
+ agg = 0;
+ }
+
+ @Override
+ public void aggregate(IntValue value) {
+ agg += value.getValue();
+ }
+
+ @Override
+ public IntValue getAggregate() {
+ return new IntValue((int) agg);
+ }
+ }
+
public static final class LongSumAgg extends SumAggregationFunction<Long> {
private static final long serialVersionUID = 1L;
-
+
private long agg;
@Override
@@ -113,11 +182,32 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
return agg;
}
}
-
+
+ public static final class LongValueSumAgg extends SumAggregationFunction<LongValue> {
+ private static final long serialVersionUID = 1L;
+
+ private long agg;
+
+ @Override
+ public void initializeAggregate() {
+ agg = 0L;
+ }
+
+ @Override
+ public void aggregate(LongValue value) {
+ agg += value.getValue();
+ }
+
+ @Override
+ public LongValue getAggregate() {
+ return new LongValue(agg);
+ }
+ }
+
public static final class FloatSumAgg extends SumAggregationFunction<Float> {
private static final long serialVersionUID = 1L;
-
- private float agg;
+
+ private double agg;
@Override
public void initializeAggregate() {
@@ -131,13 +221,34 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
@Override
public Float getAggregate() {
- return agg;
+ return (float) agg;
+ }
+ }
+
+ public static final class FloatValueSumAgg extends SumAggregationFunction<FloatValue> {
+ private static final long serialVersionUID = 1L;
+
+ private double agg;
+
+ @Override
+ public void initializeAggregate() {
+ agg = 0.0f;
+ }
+
+ @Override
+ public void aggregate(FloatValue value) {
+ agg += value.getValue();
+ }
+
+ @Override
+ public FloatValue getAggregate() {
+ return new FloatValue((float) agg);
}
}
-
+
public static final class DoubleSumAgg extends SumAggregationFunction<Double> {
private static final long serialVersionUID = 1L;
-
+
private double agg;
@Override
@@ -155,36 +266,75 @@ public abstract class SumAggregationFunction<T> extends AggregationFunction<T> {
return agg;
}
}
-
+
+ public static final class DoubleValueSumAgg extends SumAggregationFunction<DoubleValue> {
+ private static final long serialVersionUID = 1L;
+
+ private double agg;
+
+ @Override
+ public void initializeAggregate() {
+ agg = 0.0;
+ }
+
+ @Override
+ public void aggregate(DoubleValue value) {
+ agg += value.getValue();
+ }
+
+ @Override
+ public DoubleValue getAggregate() {
+ return new DoubleValue(agg);
+ }
+ }
+
// --------------------------------------------------------------------------------------------
-
+
public static final class SumAggregationFunctionFactory implements AggregationFunctionFactory {
private static final long serialVersionUID = 1L;
-
+
@SuppressWarnings("unchecked")
@Override
public <T> AggregationFunction<T> createAggregationFunction(Class<T> type) {
if (type == Long.class) {
return (AggregationFunction<T>) new LongSumAgg();
}
+ else if (type == LongValue.class) {
+ return (AggregationFunction<T>) new LongValueSumAgg();
+ }
else if (type == Integer.class) {
return (AggregationFunction<T>) new IntSumAgg();
}
+ else if (type == IntValue.class) {
+ return (AggregationFunction<T>) new IntValueSumAgg();
+ }
else if (type == Double.class) {
return (AggregationFunction<T>) new DoubleSumAgg();
}
+ else if (type == DoubleValue.class) {
+ return (AggregationFunction<T>) new DoubleValueSumAgg();
+ }
else if (type == Float.class) {
return (AggregationFunction<T>) new FloatSumAgg();
}
+ else if (type == FloatValue.class) {
+ return (AggregationFunction<T>) new FloatValueSumAgg();
+ }
else if (type == Byte.class) {
return (AggregationFunction<T>) new ByteSumAgg();
}
+ else if (type == ByteValue.class) {
+ return (AggregationFunction<T>) new ByteValueSumAgg();
+ }
else if (type == Short.class) {
return (AggregationFunction<T>) new ShortSumAgg();
}
+ else if (type == ShortValue.class) {
+ return (AggregationFunction<T>) new ShortValueSumAgg();
+ }
else {
throw new UnsupportedAggregationTypeException("The type " + type.getName() +
- " has currently not supported for built-in sum aggregations.");
+ " is currently not supported for built-in sum aggregations.");
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index 0b4823e..5187de7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -51,7 +51,18 @@ import org.apache.flink.types.Value;
public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implements AtomicType<T> {
private static final long serialVersionUID = 1L;
-
+
+ public static final ValueTypeInfo<BooleanValue> BOOLEAN_VALUE_TYPE_INFO = new ValueTypeInfo<>(BooleanValue.class);
+ public static final ValueTypeInfo<ByteValue> BYTE_VALUE_TYPE_INFO = new ValueTypeInfo<>(ByteValue.class);
+ public static final ValueTypeInfo<CharValue> CHAR_VALUE_TYPE_INFO = new ValueTypeInfo<>(CharValue.class);
+ public static final ValueTypeInfo<DoubleValue> DOUBLE_VALUE_TYPE_INFO = new ValueTypeInfo<>(DoubleValue.class);
+ public static final ValueTypeInfo<FloatValue> FLOAT_VALUE_TYPE_INFO = new ValueTypeInfo<>(FloatValue.class);
+ public static final ValueTypeInfo<IntValue> INT_VALUE_TYPE_INFO = new ValueTypeInfo<>(IntValue.class);
+ public static final ValueTypeInfo<LongValue> LONG_VALUE_TYPE_INFO = new ValueTypeInfo<>(LongValue.class);
+ public static final ValueTypeInfo<NullValue> NULL_VALUE_TYPE_INFO = new ValueTypeInfo<>(NullValue.class);
+ public static final ValueTypeInfo<ShortValue> SHORT_VALUE_TYPE_INFO = new ValueTypeInfo<>(ShortValue.class);
+ public static final ValueTypeInfo<StringValue> STRING_VALUE_TYPE_INFO = new ValueTypeInfo<>(StringValue.class);
+
private final Class<T> type;
public ValueTypeInfo(Class<T> type) {
http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
index d02f228..fc01ce7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
@@ -25,7 +25,11 @@ import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.javaApiOperators.util.ValueCollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -62,6 +66,27 @@ public class AggregateITCase extends MultipleProgramsTestBase {
}
@Test
+ public void testFullAggregateOfMutableValueTypes() throws Exception {
+ /*
+ * Full Aggregate of mutable value types
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds
+ .aggregate(Aggregations.SUM, 0)
+ .and(Aggregations.MAX, 1)
+ .project(0, 1);
+
+ List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
+
+ String expected = "231,6\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
public void testGroupedAggregate() throws Exception {
/*
* Grouped Aggregate
@@ -87,6 +112,31 @@ public class AggregateITCase extends MultipleProgramsTestBase {
}
@Test
+ public void testGroupedAggregateOfMutableValueTypes() throws Exception {
+ /*
+ * Grouped Aggregate of mutable value types
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple2<IntValue, LongValue>> aggregateDs = ds.groupBy(1)
+ .aggregate(Aggregations.SUM, 0)
+ .project(1, 0);
+
+ List<Tuple2<IntValue, LongValue>> result = aggregateDs.collect();
+
+ String expected = "1,1\n" +
+ "2,5\n" +
+ "3,15\n" +
+ "4,34\n" +
+ "5,65\n" +
+ "6,111\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
public void testNestedAggregate() throws Exception {
/*
* Nested Aggregate
@@ -106,4 +156,25 @@ public class AggregateITCase extends MultipleProgramsTestBase {
compareResultAsTuples(result, expected);
}
+
+ @Test
+ public void testNestedAggregateOfMutableValueTypes() throws Exception {
+ /*
+ * Nested Aggregate of mutable value types
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<IntValue, LongValue, StringValue>> ds = ValueCollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple1<IntValue>> aggregateDs = ds.groupBy(1)
+ .aggregate(Aggregations.MIN, 0)
+ .aggregate(Aggregations.MIN, 0)
+ .project(0);
+
+ List<Tuple1<IntValue>> result = aggregateDs.collect();
+
+ String expected = "1\n";
+
+ compareResultAsTuples(result, expected);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/da248b15/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
new file mode 100644
index 0000000..04a7bc5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/ValueCollectionDataSets.java
@@ -0,0 +1,730 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.util;
+
+import java.io.File;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.apache.hadoop.io.IntWritable;
+
+import scala.math.BigInt;
+
+/**
+ * #######################################################################################################
+ *
+ * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA.
+ * IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING!
+ *
+ * #######################################################################################################
+ */
+public class ValueCollectionDataSets {
+
+ public static DataSet<Tuple3<IntValue, LongValue, StringValue>> get3TupleDataSet(ExecutionEnvironment env) {
+ List<Tuple3<IntValue, LongValue, StringValue>> data = new ArrayList<>();
+
+ data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new StringValue("Hi")));
+ data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new StringValue("Hello")));
+ data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new StringValue("Hello world")));
+ data.add(new Tuple3<>(new IntValue(4), new LongValue(3l), new StringValue("Hello world, how are you?")));
+ data.add(new Tuple3<>(new IntValue(5), new LongValue(3l), new StringValue("I am fine.")));
+ data.add(new Tuple3<>(new IntValue(6), new LongValue(3l), new StringValue("Luke Skywalker")));
+ data.add(new Tuple3<>(new IntValue(7), new LongValue(4l), new StringValue("Comment#1")));
+ data.add(new Tuple3<>(new IntValue(8), new LongValue(4l), new StringValue("Comment#2")));
+ data.add(new Tuple3<>(new IntValue(9), new LongValue(4l), new StringValue("Comment#3")));
+ data.add(new Tuple3<>(new IntValue(10), new LongValue(4l), new StringValue("Comment#4")));
+ data.add(new Tuple3<>(new IntValue(11), new LongValue(5l), new StringValue("Comment#5")));
+ data.add(new Tuple3<>(new IntValue(12), new LongValue(5l), new StringValue("Comment#6")));
+ data.add(new Tuple3<>(new IntValue(13), new LongValue(5l), new StringValue("Comment#7")));
+ data.add(new Tuple3<>(new IntValue(14), new LongValue(5l), new StringValue("Comment#8")));
+ data.add(new Tuple3<>(new IntValue(15), new LongValue(5l), new StringValue("Comment#9")));
+ data.add(new Tuple3<>(new IntValue(16), new LongValue(6l), new StringValue("Comment#10")));
+ data.add(new Tuple3<>(new IntValue(17), new LongValue(6l), new StringValue("Comment#11")));
+ data.add(new Tuple3<>(new IntValue(18), new LongValue(6l), new StringValue("Comment#12")));
+ data.add(new Tuple3<>(new IntValue(19), new LongValue(6l), new StringValue("Comment#13")));
+ data.add(new Tuple3<>(new IntValue(20), new LongValue(6l), new StringValue("Comment#14")));
+ data.add(new Tuple3<>(new IntValue(21), new LongValue(6l), new StringValue("Comment#15")));
+
+ Collections.shuffle(data);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet<Tuple3<IntValue, LongValue, StringValue>> getSmall3TupleDataSet(ExecutionEnvironment env) {
+ List<Tuple3<IntValue, LongValue, StringValue>> data = new ArrayList<>();
+
+ data.add(new Tuple3<>(new IntValue(1), new LongValue(1l), new StringValue("Hi")));
+ data.add(new Tuple3<>(new IntValue(2), new LongValue(2l), new StringValue("Hello")));
+ data.add(new Tuple3<>(new IntValue(3), new LongValue(2l), new StringValue("Hello world")));
+
+ Collections.shuffle(data);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> get5TupleDataSet(ExecutionEnvironment env) {
+ List<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> data = new ArrayList<>();
+
+ data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new IntValue(0), new StringValue("Hallo"), new LongValue(1l)));
+ data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l)));
+ data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l)));
+ data.add(new Tuple5<>(new IntValue(3), new LongValue(4l), new IntValue(3), new StringValue("Hallo Welt wie gehts?"), new LongValue(2l)));
+ data.add(new Tuple5<>(new IntValue(3), new LongValue(5l), new IntValue(4), new StringValue("ABC"), new LongValue(2l)));
+ data.add(new Tuple5<>(new IntValue(3), new LongValue(6l), new IntValue(5), new StringValue("BCD"), new LongValue(3l)));
+ data.add(new Tuple5<>(new IntValue(4), new LongValue(7l), new IntValue(6), new StringValue("CDE"), new LongValue(2l)));
+ data.add(new Tuple5<>(new IntValue(4), new LongValue(8l), new IntValue(7), new StringValue("DEF"), new LongValue(1l)));
+ data.add(new Tuple5<>(new IntValue(4), new LongValue(9l), new IntValue(8), new StringValue("EFG"), new LongValue(1l)));
+ data.add(new Tuple5<>(new IntValue(4), new LongValue(10l), new IntValue(9), new StringValue("FGH"), new LongValue(2l)));
+ data.add(new Tuple5<>(new IntValue(5), new LongValue(11l), new IntValue(10), new StringValue("GHI"), new LongValue(1l)));
+ data.add(new Tuple5<>(new IntValue(5), new LongValue(12l), new IntValue(11), new StringValue("HIJ"), new LongValue(3l)));
+ data.add(new Tuple5<>(new IntValue(5), new LongValue(13l), new IntValue(12), new StringValue("IJK"), new LongValue(3l)));
+ data.add(new Tuple5<>(new IntValue(5), new LongValue(14l), new IntValue(13), new StringValue("JKL"), new LongValue(2l)));
+ data.add(new Tuple5<>(new IntValue(5), new LongValue(15l), new IntValue(14), new StringValue("KLM"), new LongValue(2l)));
+
+ Collections.shuffle(data);
+
+ TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> type = new
+ TupleTypeInfo<>(
+ ValueTypeInfo.INT_VALUE_TYPE_INFO,
+ ValueTypeInfo.LONG_VALUE_TYPE_INFO,
+ ValueTypeInfo.INT_VALUE_TYPE_INFO,
+ ValueTypeInfo.STRING_VALUE_TYPE_INFO,
+ ValueTypeInfo.LONG_VALUE_TYPE_INFO
+ );
+
+ return env.fromCollection(data, type);
+ }
+
+ public static DataSet<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> getSmall5TupleDataSet(ExecutionEnvironment env) {
+ List<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> data = new ArrayList<>();
+
+ data.add(new Tuple5<>(new IntValue(1), new LongValue(1l), new IntValue(0), new StringValue("Hallo"), new LongValue(1l)));
+ data.add(new Tuple5<>(new IntValue(2), new LongValue(2l), new IntValue(1), new StringValue("Hallo Welt"), new LongValue(2l)));
+ data.add(new Tuple5<>(new IntValue(2), new LongValue(3l), new IntValue(2), new StringValue("Hallo Welt wie"), new LongValue(1l)));
+
+ Collections.shuffle(data);
+
+ TupleTypeInfo<Tuple5<IntValue, LongValue, IntValue, StringValue, LongValue>> type = new
+ TupleTypeInfo<>(
+ ValueTypeInfo.INT_VALUE_TYPE_INFO,
+ ValueTypeInfo.LONG_VALUE_TYPE_INFO,
+ ValueTypeInfo.INT_VALUE_TYPE_INFO,
+ ValueTypeInfo.STRING_VALUE_TYPE_INFO,
+ ValueTypeInfo.LONG_VALUE_TYPE_INFO
+ );
+
+ return env.fromCollection(data, type);
+ }
+
+ public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> getSmallNestedTupleDataSet(ExecutionEnvironment env) {
+ List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = new ArrayList<>();
+
+ data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(1)), new StringValue("one")));
+ data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(2)), new StringValue("two")));
+ data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(3)), new StringValue("three")));
+
+ TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> type = new
+ TupleTypeInfo<>(
+ new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO),
+ ValueTypeInfo.STRING_VALUE_TYPE_INFO
+ );
+
+ return env.fromCollection(data, type);
+ }
+
+ public static DataSet<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> getGroupSortedNestedTupleDataSet(ExecutionEnvironment env) {
+ List<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> data = new ArrayList<>();
+
+ data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(3)), new StringValue("a")));
+ data.add(new Tuple2<>(new Tuple2<>(new IntValue(1), new IntValue(2)), new StringValue("a")));
+ data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(1)), new StringValue("a")));
+ data.add(new Tuple2<>(new Tuple2<>(new IntValue(2), new IntValue(2)), new StringValue("b")));
+ data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(3)), new StringValue("c")));
+ data.add(new Tuple2<>(new Tuple2<>(new IntValue(3), new IntValue(6)), new StringValue("c")));
+ data.add(new Tuple2<>(new Tuple2<>(new IntValue(4), new IntValue(9)), new StringValue("c")));
+
+ TupleTypeInfo<Tuple2<Tuple2<IntValue, IntValue>, StringValue>> type = new
+ TupleTypeInfo<>(
+ new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO),
+ ValueTypeInfo.STRING_VALUE_TYPE_INFO
+ );
+
+ return env.fromCollection(data, type);
+ }
+
+ public static DataSet<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) {
+ List<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> data = new ArrayList<>();
+
+ data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(1), new IntValue(3)), new StringValue("a"), new IntValue(2)));
+ data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(1), new IntValue(2)), new StringValue("a"), new IntValue(1)));
+ data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(2), new IntValue(1)), new StringValue("a"), new IntValue(3)));
+ data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(2), new IntValue(2)), new StringValue("b"), new IntValue(4)));
+ data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(3), new IntValue(3)), new StringValue("c"), new IntValue(5)));
+ data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(3), new IntValue(6)), new StringValue("c"), new IntValue(6)));
+ data.add(new Tuple3<>(new Tuple2<IntValue, IntValue>(new IntValue(4), new IntValue(9)), new StringValue("c"), new IntValue(7)));
+
+ TupleTypeInfo<Tuple3<Tuple2<IntValue, IntValue>, StringValue, IntValue>> type = new
+ TupleTypeInfo<>(
+ new TupleTypeInfo<Tuple2<IntValue, IntValue>>(ValueTypeInfo.INT_VALUE_TYPE_INFO, ValueTypeInfo.INT_VALUE_TYPE_INFO),
+ ValueTypeInfo.STRING_VALUE_TYPE_INFO,
+ ValueTypeInfo.INT_VALUE_TYPE_INFO
+ );
+
+ return env.fromCollection(data, type);
+ }
+
+ public static DataSet<StringValue> getStringDataSet(ExecutionEnvironment env) {
+ List<StringValue> data = new ArrayList<>();
+
+ data.add(new StringValue("Hi"));
+ data.add(new StringValue("Hello"));
+ data.add(new StringValue("Hello world"));
+ data.add(new StringValue("Hello world, how are you?"));
+ data.add(new StringValue("I am fine."));
+ data.add(new StringValue("Luke Skywalker"));
+ data.add(new StringValue("Random comment"));
+ data.add(new StringValue("LOL"));
+
+ Collections.shuffle(data);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet<IntValue> getIntDataSet(ExecutionEnvironment env) {
+ List<IntValue> data = new ArrayList<>();
+
+ data.add(new IntValue(1));
+ data.add(new IntValue(2));
+ data.add(new IntValue(2));
+ data.add(new IntValue(3));
+ data.add(new IntValue(3));
+ data.add(new IntValue(3));
+ data.add(new IntValue(4));
+ data.add(new IntValue(4));
+ data.add(new IntValue(4));
+ data.add(new IntValue(4));
+ data.add(new IntValue(5));
+ data.add(new IntValue(5));
+ data.add(new IntValue(5));
+ data.add(new IntValue(5));
+ data.add(new IntValue(5));
+
+ Collections.shuffle(data);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet<CustomType> getCustomTypeDataSet(ExecutionEnvironment env) {
+ List<CustomType> data = new ArrayList<CustomType>();
+
+ data.add(new CustomType(1, 0l, "Hi"));
+ data.add(new CustomType(2, 1l, "Hello"));
+ data.add(new CustomType(2, 2l, "Hello world"));
+ data.add(new CustomType(3, 3l, "Hello world, how are you?"));
+ data.add(new CustomType(3, 4l, "I am fine."));
+ data.add(new CustomType(3, 5l, "Luke Skywalker"));
+ data.add(new CustomType(4, 6l, "Comment#1"));
+ data.add(new CustomType(4, 7l, "Comment#2"));
+ data.add(new CustomType(4, 8l, "Comment#3"));
+ data.add(new CustomType(4, 9l, "Comment#4"));
+ data.add(new CustomType(5, 10l, "Comment#5"));
+ data.add(new CustomType(5, 11l, "Comment#6"));
+ data.add(new CustomType(5, 12l, "Comment#7"));
+ data.add(new CustomType(5, 13l, "Comment#8"));
+ data.add(new CustomType(5, 14l, "Comment#9"));
+ data.add(new CustomType(6, 15l, "Comment#10"));
+ data.add(new CustomType(6, 16l, "Comment#11"));
+ data.add(new CustomType(6, 17l, "Comment#12"));
+ data.add(new CustomType(6, 18l, "Comment#13"));
+ data.add(new CustomType(6, 19l, "Comment#14"));
+ data.add(new CustomType(6, 20l, "Comment#15"));
+
+ Collections.shuffle(data);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet<CustomType> getSmallCustomTypeDataSet(ExecutionEnvironment env) {
+ List<CustomType> data = new ArrayList<CustomType>();
+
+ data.add(new CustomType(1, 0l, "Hi"));
+ data.add(new CustomType(2, 1l, "Hello"));
+ data.add(new CustomType(2, 2l, "Hello world"));
+
+ Collections.shuffle(data);
+
+ return env.fromCollection(data);
+ }
+
+ public static class CustomType implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public IntValue myInt;
+ public LongValue myLong;
+ public StringValue myString;
+
+ public CustomType() {
+ }
+
+ public CustomType(int i, long l, String s) {
+ myInt = new IntValue(i);
+ myLong = new LongValue(l);
+ myString = new StringValue(s);
+ }
+
+ @Override
+ public String toString() {
+ return myInt + "," + myLong + "," + myString;
+ }
+ }
+
+ public static class CustomTypeComparator implements Comparator<CustomType> {
+
+ @Override
+ public int compare(CustomType o1, CustomType o2) {
+ int diff = o1.myInt.getValue() - o2.myInt.getValue();
+ if (diff != 0) {
+ return diff;
+ }
+ diff = (int) (o1.myLong.getValue() - o2.myLong.getValue());
+ return diff != 0 ? diff : o1.myString.getValue().compareTo(o2.myString.getValue());
+ }
+
+ }
+
+ public static DataSet<Tuple7<IntValue, StringValue, IntValue, IntValue, LongValue, StringValue, LongValue>> getSmallTuplebasedDataSet(ExecutionEnvironment env) {
+ List<Tuple7<IntValue, StringValue, IntValue, IntValue, LongValue, StringValue, LongValue>> data = new ArrayList<>();
+
+ data.add(new Tuple7<>(new IntValue(1), new StringValue("First"), new IntValue(10), new IntValue(100), new LongValue(1000L), new StringValue("One"), new LongValue(10000L)));
+ data.add(new Tuple7<>(new IntValue(2), new StringValue("Second"), new IntValue(20), new IntValue(200), new LongValue(2000L), new StringValue("Two"), new LongValue(20000L)));
+ data.add(new Tuple7<>(new IntValue(3), new StringValue("Third"), new IntValue(30), new IntValue(300), new LongValue(3000L), new StringValue("Three"), new LongValue(30000L)));
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet<Tuple7<LongValue, IntValue, IntValue, LongValue, StringValue, IntValue, StringValue>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) {
+ List<Tuple7<LongValue, IntValue, IntValue, LongValue, StringValue, IntValue, StringValue>> data = new ArrayList<>();
+
+ data.add(new Tuple7<>(new LongValue(10000L), new IntValue(10), new IntValue(100), new LongValue(1000L), new StringValue("One"), new IntValue(1), new StringValue("First")));
+ data.add(new Tuple7<>(new LongValue(20000L), new IntValue(20), new IntValue(200), new LongValue(2000L), new StringValue("Two"), new IntValue(2), new StringValue("Second")));
+ data.add(new Tuple7<>(new LongValue(30000L), new IntValue(30), new IntValue(300), new LongValue(3000L), new StringValue("Three"), new IntValue(3), new StringValue("Third")));
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet<POJO> getSmallPojoDataSet(ExecutionEnvironment env) {
+ List<POJO> data = new ArrayList<POJO>();
+
+ data.add(new POJO(1 /*number*/, "First" /*str*/, 10 /*f0*/, 100/*f1.myInt*/, 1000L/*f1.myLong*/, "One" /*f1.myString*/, 10000L /*nestedPojo.longNumber*/));
+ data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
+ data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet<POJO> getDuplicatePojoDataSet(ExecutionEnvironment env) {
+ List<POJO> data = new ArrayList<POJO>();
+
+ data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L)); // 5x
+ data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+ data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+ data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+ data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10000L));
+ data.add(new POJO(2, "Second", 20, 200, 2000L, "Two", 20000L));
+ data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L)); // 2x
+ data.add(new POJO(3, "Third", 30, 300, 3000L, "Three", 30000L));
+
+ return env.fromCollection(data);
+ }
+
+ public static DataSet<POJO> getMixedPojoDataSet(ExecutionEnvironment env) {
+ List<POJO> data = new ArrayList<POJO>();
+
+ data.add(new POJO(1, "First", 10, 100, 1000L, "One", 10100L)); // 5x
+ data.add(new POJO(2, "First_", 10, 105, 1000L, "One", 10200L));
+ data.add(new POJO(3, "First", 11, 102, 3000L, "One", 10200L));
+ data.add(new POJO(4, "First_", 11, 106, 1000L, "One", 10300L));
+ data.add(new POJO(5, "First", 11, 102, 2000L, "One", 10100L));
+ data.add(new POJO(6, "Second_", 20, 200, 2000L, "Two", 10100L));
+ data.add(new POJO(7, "Third", 31, 301, 2000L, "Three", 10200L)); // 2x
+ data.add(new POJO(8, "Third_", 30, 300, 1000L, "Three", 10100L));
+
+ return env.fromCollection(data);
+ }
+
+ public static class POJO {
+ public IntValue number;
+ public StringValue str;
+ public Tuple2<IntValue, CustomType> nestedTupleWithCustom;
+ public NestedPojo nestedPojo;
+ public transient LongValue ignoreMe;
+
+ public POJO(int i0, String s0,
+ int i1, int i2, long l0, String s1,
+ long l1) {
+ this.number = new IntValue(i0);
+ this.str = new StringValue(s0);
+ this.nestedTupleWithCustom = new Tuple2<>(new IntValue(i1), new CustomType(i2, l0, s1));
+ this.nestedPojo = new NestedPojo();
+ this.nestedPojo.longNumber = new LongValue(l1);
+ }
+
+ public POJO() {
+ }
+
+ @Override
+ public String toString() {
+ return number + " " + str + " " + nestedTupleWithCustom + " " + nestedPojo.longNumber;
+ }
+ }
+
+ public static class NestedPojo {
+ public static Object ignoreMe;
+ public LongValue longNumber;
+
+ public NestedPojo() {
+ }
+ }
+
+ public static DataSet<CrazyNested> getCrazyNestedDataSet(ExecutionEnvironment env) {
+ List<CrazyNested> data = new ArrayList<CrazyNested>();
+
+ data.add(new CrazyNested("aa"));
+ data.add(new CrazyNested("bb"));
+ data.add(new CrazyNested("bb"));
+ data.add(new CrazyNested("cc"));
+ data.add(new CrazyNested("cc"));
+ data.add(new CrazyNested("cc"));
+
+ return env.fromCollection(data);
+ }
+
+ public static class CrazyNested {
+ public CrazyNestedL1 nest_Lvl1;
+ public LongValue something; // test proper null-value handling
+
+ public CrazyNested() {
+ }
+
+ public CrazyNested(String set, String second, long s) { // additional CTor to set all fields to non-null values
+ this(set);
+ something = new LongValue(s);
+ nest_Lvl1.a = new StringValue(second);
+ }
+
+ public CrazyNested(String set) {
+ nest_Lvl1 = new CrazyNestedL1();
+ nest_Lvl1.nest_Lvl2 = new CrazyNestedL2();
+ nest_Lvl1.nest_Lvl2.nest_Lvl3 = new CrazyNestedL3();
+ nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4 = new CrazyNestedL4();
+ nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal = new StringValue(set);
+ }
+ }
+
+ public static class CrazyNestedL1 {
+ public StringValue a;
+ public IntValue b;
+ public CrazyNestedL2 nest_Lvl2;
+ }
+
+ public static class CrazyNestedL2 {
+ public CrazyNestedL3 nest_Lvl3;
+ }
+
+ public static class CrazyNestedL3 {
+ public CrazyNestedL4 nest_Lvl4;
+ }
+
+ public static class CrazyNestedL4 {
+ public StringValue f1nal;
+ }
+
+ // Copied from TypeExtractorTest
+ public static class FromTuple extends Tuple3<StringValue, StringValue, LongValue> {
+ private static final long serialVersionUID = 1L;
+ public IntValue special;
+ }
+
+ public static class FromTupleWithCTor extends FromTuple {
+
+ private static final long serialVersionUID = 1L;
+
+ public FromTupleWithCTor() {}
+
+ public FromTupleWithCTor(int special, long tupleField) {
+ this.special = new IntValue(special);
+ this.setField(new LongValue(tupleField), 2);
+ }
+ }
+
+ public static DataSet<FromTupleWithCTor> getPojoExtendingFromTuple(ExecutionEnvironment env) {
+ List<FromTupleWithCTor> data = new ArrayList<>();
+ data.add(new FromTupleWithCTor(1, 10L)); // 3x
+ data.add(new FromTupleWithCTor(1, 10L));
+ data.add(new FromTupleWithCTor(1, 10L));
+ data.add(new FromTupleWithCTor(2, 20L)); // 2x
+ data.add(new FromTupleWithCTor(2, 20L));
+ return env.fromCollection(data);
+ }
+
+ public static class PojoContainingTupleAndWritable {
+ public IntValue someInt;
+ public StringValue someString;
+ public IntWritable hadoopFan;
+ public Tuple2<LongValue, LongValue> theTuple;
+
+ public PojoContainingTupleAndWritable() {
+ }
+
+ public PojoContainingTupleAndWritable(int i, long l1, long l2) {
+ hadoopFan = new IntWritable(i);
+ someInt = new IntValue(i);
+ theTuple = new Tuple2<>(new LongValue(l1), new LongValue(l2));
+ }
+ }
+
+ public static DataSet<PojoContainingTupleAndWritable> getPojoContainingTupleAndWritable(ExecutionEnvironment env) {
+ List<PojoContainingTupleAndWritable> data = new ArrayList<>();
+ data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
+ data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
+ data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+ data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+ data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+ data.add(new PojoContainingTupleAndWritable(2, 20L, 200L));
+ return env.fromCollection(data);
+ }
+
+
+
+ public static DataSet<PojoContainingTupleAndWritable> getGroupSortedPojoContainingTupleAndWritable(ExecutionEnvironment env) {
+ List<PojoContainingTupleAndWritable> data = new ArrayList<>();
+ data.add(new PojoContainingTupleAndWritable(1, 10L, 100L)); // 1x
+ data.add(new PojoContainingTupleAndWritable(2, 20L, 200L)); // 5x
+ data.add(new PojoContainingTupleAndWritable(2, 20L, 201L));
+ data.add(new PojoContainingTupleAndWritable(2, 30L, 200L));
+ data.add(new PojoContainingTupleAndWritable(2, 30L, 600L));
+ data.add(new PojoContainingTupleAndWritable(2, 30L, 400L));
+ return env.fromCollection(data);
+ }
+
+ public static DataSet<Tuple3<IntValue, CrazyNested, POJO>> getTupleContainingPojos(ExecutionEnvironment env) {
+ List<Tuple3<IntValue, CrazyNested, POJO>> data = new ArrayList<>();
+ data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 3x
+ data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
+ data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(1), new CrazyNested("one", "uno", 1L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L)));
+ // POJO is not initialized according to the first two fields.
+ data.add(new Tuple3<IntValue, CrazyNested, POJO>(new IntValue(2), new CrazyNested("two", "duo", 2L), new POJO(1, "First", 10, 100, 1000L, "One", 10000L))); // 1x
+ return env.fromCollection(data);
+ }
+
+ public static class Pojo1 {
+ public StringValue a;
+ public StringValue b;
+
+ public Pojo1() {}
+
+ public Pojo1(String a, String b) {
+ this.a = new StringValue(a);
+ this.b = new StringValue(b);
+ }
+ }
+
+ public static class Pojo2 {
+ public StringValue a2;
+ public StringValue b2;
+ }
+
+ public static class PojoWithMultiplePojos {
+ public Pojo1 p1;
+ public Pojo2 p2;
+ public IntValue i0;
+
+ public PojoWithMultiplePojos() {
+ }
+
+ public PojoWithMultiplePojos(String a, String b, String a1, String b1, int i0) {
+ p1 = new Pojo1();
+ p1.a = new StringValue(a);
+ p1.b = new StringValue(b);
+ p2 = new Pojo2();
+ p2.a2 = new StringValue(a1);
+ p2.b2 = new StringValue(b1);
+ this.i0 = new IntValue(i0);
+ }
+ }
+
+ public static DataSet<PojoWithMultiplePojos> getPojoWithMultiplePojos(ExecutionEnvironment env) {
+ List<PojoWithMultiplePojos> data = new ArrayList<>();
+ data.add(new PojoWithMultiplePojos("a", "aa", "b", "bb", 1));
+ data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+ data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+ data.add(new PojoWithMultiplePojos("b", "bb", "c", "cc", 2));
+ data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
+ data.add(new PojoWithMultiplePojos("d", "dd", "e", "ee", 3));
+ return env.fromCollection(data);
+ }
+
+ public enum Category {
+ CAT_A, CAT_B;
+ }
+
+ public static class PojoWithDateAndEnum {
+ public StringValue group;
+ public Date date;
+ public Category cat;
+ }
+
+ public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) {
+ List<PojoWithDateAndEnum> data = new ArrayList<PojoWithDateAndEnum>();
+
+ PojoWithDateAndEnum one = new PojoWithDateAndEnum();
+ one.group = new StringValue("a");
+ one.date = new Date(666);
+ one.cat = Category.CAT_A;
+ data.add(one);
+
+ PojoWithDateAndEnum two = new PojoWithDateAndEnum();
+ two.group = new StringValue("a");
+ two.date = new Date(666);
+ two.cat = Category.CAT_A;
+ data.add(two);
+
+ PojoWithDateAndEnum three = new PojoWithDateAndEnum();
+ three.group = new StringValue("b");
+ three.date = new Date(666);
+ three.cat = Category.CAT_B;
+ data.add(three);
+
+ return env.fromCollection(data);
+ }
+
+ public static class PojoWithCollection {
+ public List<Pojo1> pojos;
+ public IntValue key;
+ public java.sql.Date sqlDate;
+ public BigInteger bigInt;
+ public BigDecimal bigDecimalKeepItNull;
+ public BigInt scalaBigInt;
+ public List<Object> mixed;
+
+ @Override
+ public String toString() {
+ return "PojoWithCollection{" +
+ "pojos.size()=" + pojos.size() +
+ ", key=" + key +
+ ", sqlDate=" + sqlDate +
+ ", bigInt=" + bigInt +
+ ", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
+ ", scalaBigInt=" + scalaBigInt +
+ ", mixed=" + mixed +
+ '}';
+ }
+ }
+
+ public static class PojoWithCollectionGeneric {
+ public List<Pojo1> pojos;
+ public IntValue key;
+ public java.sql.Date sqlDate;
+ public BigInteger bigInt;
+ public BigDecimal bigDecimalKeepItNull;
+ public BigInt scalaBigInt;
+ public List<Object> mixed;
+ private PojoWithDateAndEnum makeMeGeneric;
+
+ @Override
+ public String toString() {
+ return "PojoWithCollection{" +
+ "pojos.size()=" + pojos.size() +
+ ", key=" + key +
+ ", sqlDate=" + sqlDate +
+ ", bigInt=" + bigInt +
+ ", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
+ ", scalaBigInt=" + scalaBigInt +
+ ", mixed=" + mixed +
+ '}';
+ }
+ }
+
+ public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env) {
+ List<PojoWithCollection> data = new ArrayList<>();
+
+ List<Pojo1> pojosList1 = new ArrayList<>();
+ pojosList1.add(new Pojo1("a", "aa"));
+ pojosList1.add(new Pojo1("b", "bb"));
+
+ List<Pojo1> pojosList2 = new ArrayList<>();
+ pojosList2.add(new Pojo1("a2", "aa2"));
+ pojosList2.add(new Pojo1("b2", "bb2"));
+
+ PojoWithCollection pwc1 = new PojoWithCollection();
+ pwc1.pojos = pojosList1;
+ pwc1.key = new IntValue(0);
+ pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+ pwc1.scalaBigInt = BigInt.int2bigInt(10);
+ pwc1.bigDecimalKeepItNull = null;
+
+ // use calendar to make it stable across time zones
+ GregorianCalendar gcl1 = new GregorianCalendar(2033, 04, 18);
+ pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
+ pwc1.mixed = new ArrayList<Object>();
+ Map<StringValue, IntValue> map = new HashMap<>();
+ map.put(new StringValue("someKey"), new IntValue(1));
+ pwc1.mixed.add(map);
+ pwc1.mixed.add(new File("/this/is/wrong"));
+ pwc1.mixed.add("uhlala");
+
+ PojoWithCollection pwc2 = new PojoWithCollection();
+ pwc2.pojos = pojosList2;
+ pwc2.key = new IntValue(0);
+ pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+ pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
+ pwc2.bigDecimalKeepItNull = null;
+
+ GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
+ pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976
+
+ data.add(pwc1);
+ data.add(pwc2);
+
+ return env.fromCollection(data);
+ }
+
+}
+