You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/04/22 13:03:59 UTC
[2/3] flink git commit: [FLINK-703] [java api] Use complete element
as join key
[FLINK-703] [java api] Use complete element as join key
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30a74c76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30a74c76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30a74c76
Branch: refs/heads/master
Commit: 30a74c768b88043c173053f5d37bfb0fde85f149
Parents: e1618e2
Author: Chiwan Park <ch...@icloud.com>
Authored: Mon Apr 6 03:18:23 2015 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Apr 21 19:03:15 2015 +0200
----------------------------------------------------------------------
.../operators/base/CoGroupOperatorBase.java | 10 ++-
.../base/GroupCombineOperatorBase.java | 42 +++++----
.../operators/base/GroupReduceOperatorBase.java | 41 +++++----
.../apache/flink/api/java/operators/Keys.java | 45 ++++++----
.../api/java/operator/CoGroupOperatorTest.java | 94 +++++++++++++++++---
.../flink/api/java/operator/GroupingTest.java | 34 ++++++-
.../api/java/operator/JoinOperatorTest.java | 72 +++++++++++++++
.../optimizer/postpass/JavaApiPostPass.java | 5 --
.../test/javaApiOperators/CoGroupITCase.java | 82 ++++++++++++++++-
.../javaApiOperators/GroupReduceITCase.java | 38 ++++++--
.../flink/test/javaApiOperators/JoinITCase.java | 31 +++++++
11 files changed, 406 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
index dbebeb4..7be5650 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.GenericPairComparator;
@@ -273,12 +274,15 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
return result;
}
+ @SuppressWarnings("unchecked")
private <T> TypeComparator<T> getTypeComparator(ExecutionConfig executionConfig, TypeInformation<T> inputType, int[] inputKeys, boolean[] inputSortDirections) {
- if (!(inputType instanceof CompositeType)) {
- throw new InvalidProgramException("Input types of coGroup must be composite types.");
+ if (inputType instanceof CompositeType) {
+ return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections, 0, executionConfig);
+ } else if (inputType instanceof AtomicType) {
+ return ((AtomicType<T>) inputType).createComparator(inputSortDirections[0], executionConfig);
}
- return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections, 0, executionConfig);
+ throw new InvalidProgramException("Input type of coGroup must be one of composite types or atomic types.");
}
private static class CoGroupSortListIterator<IN1, IN2> {
http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
index 27fbc1c..c7ba92b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupCombineOperatorBase.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -77,6 +78,16 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<I
return this.groupOrder;
}
+ private TypeComparator<IN> getTypeComparator(TypeInformation<IN> typeInfo, int[] sortColumns, boolean[] sortOrderings, ExecutionConfig executionConfig) {
+ if (typeInfo instanceof CompositeType) {
+ return ((CompositeType<IN>) typeInfo).createComparator(sortColumns, sortOrderings, 0, executionConfig);
+ } else if (typeInfo instanceof AtomicType) {
+ return ((AtomicType<IN>) typeInfo).createComparator(sortOrderings[0], executionConfig);
+ }
+
+ throw new InvalidProgramException("Input type of GroupCombine must be one of composite types or atomic types.");
+ }
+
// --------------------------------------------------------------------------------------------
@Override
@@ -87,11 +98,6 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<I
TypeInformation<IN> inputType = operatorInfo.getInputType();
int[] keyColumns = getKeyColumns(0);
-
- if (!(inputType instanceof CompositeType) && (keyColumns.length > 0 || groupOrder != null)) {
- throw new InvalidProgramException("Grouping or group-sorting is only possible on composite type.");
- }
-
int[] sortColumns = keyColumns;
boolean[] sortOrderings = new boolean[sortColumns.length];
@@ -100,19 +106,17 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<I
sortOrderings = ArrayUtils.addAll(sortOrderings, groupOrder.getFieldSortDirections());
}
- if (inputType instanceof CompositeType) {
- if(sortColumns.length == 0) { // => all reduce. No comparator
- Preconditions.checkArgument(sortOrderings.length == 0);
- } else {
- final TypeComparator<IN> sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig);
-
- Collections.sort(inputData, new Comparator<IN>() {
- @Override
- public int compare(IN o1, IN o2) {
- return sortComparator.compare(o1, o2);
- }
- });
- }
+ if(sortColumns.length == 0) { // => all reduce. No comparator
+ Preconditions.checkArgument(sortOrderings.length == 0);
+ } else {
+ final TypeComparator<IN> sortComparator = getTypeComparator(inputType, sortColumns, sortOrderings, executionConfig);
+
+ Collections.sort(inputData, new Comparator<IN>() {
+ @Override
+ public int compare(IN o1, IN o2) {
+ return sortComparator.compare(o1, o2);
+ }
+ });
}
FunctionUtils.setFunctionRuntimeContext(function, ctx);
@@ -133,7 +137,7 @@ public class GroupCombineOperatorBase<IN, OUT, FT extends GroupCombineFunction<I
} else {
final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
boolean[] keyOrderings = new boolean[keyColumns.length];
- final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0, executionConfig);
+ final TypeComparator<IN> comparator = getTypeComparator(inputType, keyColumns, keyOrderings, executionConfig);
ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index 57f07f3..3056fe7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -148,6 +149,16 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
public Partitioner<?> getCustomPartitioner() {
return customPartitioner;
}
+
+ private TypeComparator<IN> getTypeComparator(TypeInformation<IN> typeInfo, int[] sortColumns, boolean[] sortOrderings, ExecutionConfig executionConfig) {
+ if (typeInfo instanceof CompositeType) {
+ return ((CompositeType<IN>) typeInfo).createComparator(sortColumns, sortOrderings, 0, executionConfig);
+ } else if (typeInfo instanceof AtomicType) {
+ return ((AtomicType<IN>) typeInfo).createComparator(sortOrderings[0], executionConfig);
+ }
+
+ throw new InvalidProgramException("Input type of GroupReduce must be one of composite types or atomic types.");
+ }
// --------------------------------------------------------------------------------------------
@@ -159,11 +170,6 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
TypeInformation<IN> inputType = operatorInfo.getInputType();
int[] keyColumns = getKeyColumns(0);
-
- if (!(inputType instanceof CompositeType) && (keyColumns.length > 0 || groupOrder != null)) {
- throw new InvalidProgramException("Grouping or group-sorting is only possible on composite type.");
- }
-
int[] sortColumns = keyColumns;
boolean[] sortOrderings = new boolean[sortColumns.length];
@@ -172,19 +178,16 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
sortOrderings = ArrayUtils.addAll(sortOrderings, groupOrder.getFieldSortDirections());
}
- if (inputType instanceof CompositeType) {
- if(sortColumns.length == 0) { // => all reduce. No comparator
- Preconditions.checkArgument(sortOrderings.length == 0);
- } else {
- final TypeComparator<IN> sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0, executionConfig);
-
- Collections.sort(inputData, new Comparator<IN>() {
- @Override
- public int compare(IN o1, IN o2) {
- return sortComparator.compare(o1, o2);
- }
- });
- }
+ if(sortColumns.length == 0) { // => all reduce. No comparator
+ Preconditions.checkArgument(sortOrderings.length == 0);
+ } else {
+ final TypeComparator<IN> sortComparator = getTypeComparator(inputType, sortColumns, sortOrderings, executionConfig);
+ Collections.sort(inputData, new Comparator<IN>() {
+ @Override
+ public int compare(IN o1, IN o2) {
+ return sortComparator.compare(o1, o2);
+ }
+ });
}
FunctionUtils.setFunctionRuntimeContext(function, ctx);
@@ -205,7 +208,7 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
} else {
final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
boolean[] keyOrderings = new boolean[keyColumns.length];
- final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0, executionConfig);
+ final TypeComparator<IN> comparator = getTypeComparator(inputType, keyColumns, keyOrderings, executionConfig);
ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index a2cde07..ee233e8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -274,24 +274,33 @@ public abstract class Keys<T> {
* Create ExpressionKeys from String-expressions
*/
public ExpressionKeys(String[] expressionsIn, TypeInformation<T> type) {
- if(!(type instanceof CompositeType<?>)) {
- throw new IllegalArgumentException("Key expressions are only supported on POJO types and Tuples. "
- + "A type is considered a POJO if all its fields are public, or have both getters and setters defined");
- }
- CompositeType<T> cType = (CompositeType<T>) type;
-
- String[] expressions = removeDuplicates(expressionsIn);
- if(expressionsIn.length != expressions.length) {
- LOG.warn("The key expressions contained duplicates. They are now unique");
- }
- // extract the keys on their flat position
- keyFields = new ArrayList<FlatFieldDescriptor>(expressions.length);
- for (int i = 0; i < expressions.length; i++) {
- List<FlatFieldDescriptor> keys = cType.getFlatFields(expressions[i]); // use separate list to do a size check
- if(keys.size() == 0) {
- throw new IllegalArgumentException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType);
+ Preconditions.checkNotNull(expressionsIn, "Field expression cannot be null.");
+
+ if (type instanceof AtomicType) {
+ if (!type.isKeyType()) {
+ throw new InvalidProgramException("This type (" + type + ") cannot be used as key.");
+ } else if (expressionsIn.length != 1 || !(Keys.ExpressionKeys.SELECT_ALL_CHAR.equals(expressionsIn[0]) || Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA.equals(expressionsIn[0]))) {
+ throw new IllegalArgumentException("Field expression for atomic type must be equal to '*' or '_'.");
+ }
+
+ keyFields = new ArrayList<FlatFieldDescriptor>(1);
+ keyFields.add(new FlatFieldDescriptor(0, type));
+ } else {
+ CompositeType<T> cType = (CompositeType<T>) type;
+
+ String[] expressions = removeDuplicates(expressionsIn);
+ if(expressionsIn.length != expressions.length) {
+ LOG.warn("The key expressions contained duplicates. They are now unique");
+ }
+ // extract the keys on their flat position
+ keyFields = new ArrayList<FlatFieldDescriptor>(expressions.length);
+ for (int i = 0; i < expressions.length; i++) {
+ List<FlatFieldDescriptor> keys = cType.getFlatFields(expressions[i]); // use separate list to do a size check
+ if(keys.size() == 0) {
+ throw new IllegalArgumentException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType);
+ }
+ keyFields.addAll(keys);
}
- keyFields.addAll(keys);
}
}
@@ -410,7 +419,7 @@ public abstract class Keys<T> {
return Arrays.copyOfRange(fields, 0, k+1);
}
}
-
+
public static class IncompatibleKeysException extends Exception {
private static final long serialVersionUID = 1L;
public static final String SIZE_MISMATCH_MESSAGE = "The number of specified keys is different.";
http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
index 60754e6..f32f6a9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
@@ -18,26 +18,26 @@
package org.apache.flink.api.java.operator;
-import java.util.ArrayList;
-import java.util.List;
-
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operator.JoinOperatorTest.CustomType;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operator.JoinOperatorTest.CustomType;
+
+import java.util.ArrayList;
+import java.util.List;
import static org.junit.Assert.assertTrue;
@@ -181,6 +181,78 @@ public class CoGroupOperatorTest {
// should not work, cogroup key non-existent
ds1.coGroup(ds2).where("myNonExistent").equalTo("myInt");
}
+
+ @Test
+ public void testCoGroupKeyAtomicExpression1() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+ DataSet<Integer> ds2 = env.fromElements(0, 0, 1);
+
+ ds1.coGroup(ds2).where("myInt").equalTo("*");
+ }
+
+ @Test
+ public void testCoGroupKeyAtomicExpression2() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> ds1 = env.fromElements(0, 0, 1);
+ DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+ ds1.coGroup(ds2).where("*").equalTo("myInt");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testCoGroupKeyAtomicInvalidExpression1() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> ds1 = env.fromElements(0, 0, 1);
+ DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+ ds1.coGroup(ds2).where("*", "invalidKey");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testCoGroupKeyAtomicInvalidExpression2() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> ds1 = env.fromElements(0, 0, 1);
+ DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+ ds1.coGroup(ds2).where("invalidKey");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testCoGroupKeyAtomicInvalidExpression3() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+ DataSet<Integer> ds2 = env.fromElements(0, 0, 1);
+
+ ds1.coGroup(ds2).where("myInt").equalTo("invalidKey");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testCoGroupKeyAtomicInvalidExpression4() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+ DataSet<Integer> ds2 = env.fromElements(0, 0, 1);
+
+ ds1.coGroup(ds2).where("myInt").equalTo("*", "invalidKey");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testCoGroupKeyAtomicInvalidExpression5() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<ArrayList<Integer>> ds1 = env.fromElements(new ArrayList<Integer>());
+ DataSet<Integer> ds2 = env.fromElements(0, 0, 0);
+
+ ds1.coGroup(ds2).where("*");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testCoGroupKeyAtomicInvalidExpression6() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> ds1 = env.fromElements(0, 0, 0);
+ DataSet<ArrayList<Integer>> ds2 = env.fromElements(new ArrayList<Integer>());
+
+ ds1.coGroup(ds2).where("*").equalTo("*");
+ }
@Test
public void testCoGroupKeyExpressions1Nested() {
http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index 314695f..b3922b3 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -143,7 +143,7 @@ public class GroupingTest {
}
}
- @Test(expected = IllegalArgumentException.class)
+ @Test(expected = InvalidProgramException.class)
public void testGroupByKeyExpressions2() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -551,6 +551,38 @@ public class GroupingTest {
}, Order.ASCENDING);
}
+ @Test
+ public void testGroupingAtomicType() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> dataSet = env.fromElements(0, 1, 1, 2, 0, 0);
+
+ dataSet.groupBy("*");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testGroupAtomicTypeWithInvalid1() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> dataSet = env.fromElements(0, 1, 2, 3);
+
+ dataSet.groupBy("*", "invalidField");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testGroupAtomicTypeWithInvalid2() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> dataSet = env.fromElements(0, 1, 2, 3);
+
+ dataSet.groupBy("invalidField");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testGroupAtomicTypeWithInvalid3() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<ArrayList<Integer>> dataSet = env.fromElements(new ArrayList<Integer>());
+
+ dataSet.groupBy("*");
+ }
+
public static class CustomType implements Serializable {
http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index f1aadca..be964cc 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -585,6 +585,78 @@ public class JoinOperatorTest {
}
);
}
+
+ @Test
+ public void testJoinKeyAtomic1() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> ds1 = env.fromElements(0, 0, 0);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ ds1.join(ds2).where("*").equalTo(0);
+ }
+
+ @Test
+ public void testJoinKeyAtomic2() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Integer> ds2 = env.fromElements(0, 0, 0);
+
+ ds1.join(ds2).where(0).equalTo("*");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testJoinKeyInvalidAtomic1() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> ds1 = env.fromElements(0, 0, 0);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ ds1.join(ds2).where("*", "invalidKey");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testJoinKeyInvalidAtomic2() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Integer> ds2 = env.fromElements(0, 0, 0);
+
+ ds1.join(ds2).where(0).equalTo("*", "invalidKey");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testJoinKeyInvalidAtomic3() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> ds1 = env.fromElements(0, 0, 0);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ ds1.join(ds2).where("invalidKey");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testJoinKeyInvalidAtomic4() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Integer> ds2 = env.fromElements(0, 0, 0);
+
+ ds1.join(ds2).where(0).equalTo("invalidKey");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testJoinKeyInvalidAtomic5() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<ArrayList<Integer>> ds1 = env.fromElements(new ArrayList<Integer>());
+ DataSet<Integer> ds2 = env.fromElements(0, 0, 0);
+
+ ds1.join(ds2).where("*").equalTo("*");
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testJoinKeyInvalidAtomic6() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> ds1 = env.fromElements(0, 0, 0);
+ DataSet<ArrayList<Integer>> ds2 = env.fromElements(new ArrayList<Integer>());
+
+ ds1.join(ds2).where("*").equalTo("*");
+ }
@Test
public void testJoinProjection1() {
http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
index 5fdf3dd..a685ff4 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
@@ -41,7 +41,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.RuntimeComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
@@ -305,10 +304,6 @@ public class JavaApiPostPass implements OptimizerPostPass {
}
private static <T1 extends Tuple, T2 extends Tuple> TypePairComparatorFactory<T1,T2> createPairComparator(TypeInformation<?> typeInfo1, TypeInformation<?> typeInfo2) {
- if (!(typeInfo1.isTupleType() || typeInfo1 instanceof PojoTypeInfo) && (typeInfo2.isTupleType() || typeInfo2 instanceof PojoTypeInfo)) {
- throw new RuntimeException("The runtime currently supports only keyed binary operations (such as joins) on tuples and POJO types.");
- }
-
// @SuppressWarnings("unchecked")
// TupleTypeInfo<T1> info1 = (TupleTypeInfo<T1>) typeInfo1;
// @SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index 99f568e..84c05d6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -18,10 +18,6 @@
package org.apache.flink.test.javaApiOperators;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.java.DataSet;
@@ -47,6 +43,10 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
@RunWith(Parameterized.class)
public class CoGroupITCase extends MultipleProgramsTestBase {
@@ -488,6 +488,36 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
"-1,30000,Flink\n";
}
+ @Test
+ public void testCoGroupWithAtomicType1() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> ds2 = env.fromElements(0, 1, 2);
+
+ DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where(0).equalTo("*").with(new CoGroupAtomic1());
+
+ coGroupDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "(1,1,Hi)\n" +
+ "(2,2,Hello)";
+ }
+
+ @Test
+ public void testCoGroupWithAtomicType2() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> ds1 = env.fromElements(0, 1, 2);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGroupDs = ds1.coGroup(ds2).where("*").equalTo(0).with(new CoGroupAtomic2());
+
+ coGroupDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "(1,1,Hi)\n" +
+ "(2,2,Hello)";
+ }
+
public static class KeySelector1 implements KeySelector<POJO, Long> {
private static final long serialVersionUID = 1L;
@@ -719,4 +749,48 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
}
}
}
+
+ public static class CoGroupAtomic1 implements CoGroupFunction<Tuple3<Integer, Long, String>, Integer, Tuple3<Integer, Long, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterable<Tuple3<Integer, Long, String>> first, Iterable<Integer> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+ List<Integer> ints = new ArrayList<Integer>();
+
+ for (Integer i : second) {
+ ints.add(i);
+ }
+
+ for (Tuple3<Integer, Long, String> t : first) {
+ for (Integer i : ints) {
+ if (t.f0.equals(i)) {
+ out.collect(t);
+ }
+ }
+ }
+ }
+ }
+
+ public static class CoGroupAtomic2 implements CoGroupFunction<Integer, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterable<Integer> first, Iterable<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
+ List<Integer> ints = new ArrayList<Integer>();
+
+ for (Integer i : first) {
+ ints.add(i);
+ }
+
+ for (Tuple3<Integer, Long, String> t : second) {
+ for (Integer i : ints) {
+ if (t.f0.equals(i)) {
+ out.collect(t);
+ }
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 9eb9a37..cf6b529 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -18,21 +18,20 @@
package org.apache.flink.test.javaApiOperators;
-import java.util.Collection;
-import java.util.Iterator;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CrazyNested;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
@@ -49,10 +48,13 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
import scala.math.BigInt;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
public class GroupReduceITCase extends MultipleProgramsTestBase {
@@ -1063,6 +1065,26 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
}
+ @Test
+ public void testGroupReduceWithAtomicValue() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Integer> ds = env.fromElements(1, 1, 2, 3, 4);
+ DataSet<Integer> reduceDs = ds.groupBy("*").reduceGroup(new GroupReduceFunction<Integer, Integer>() {
+ @Override
+ public void reduce(Iterable<Integer> values, Collector<Integer> out) throws Exception {
+ out.collect(values.iterator().next());
+ }
+ });
+
+ reduceDs.writeAsText(resultPath);
+ env.execute();
+
+ expected = "1\n" +
+ "2\n" +
+ "3\n" +
+ "4";
+ }
+
public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
@Override
public void reduce(
http://git-wip-us.apache.org/repos/asf/flink/blob/30a74c76/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index 0080fb1..fe436a3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -663,6 +663,37 @@ public class JoinITCase extends MultipleProgramsTestBase {
"((3,2,Hello world),(3,2,Hello world)),((3,2,Hello world),(3,2,Hello world))\n";
}
+ @Test
+ public void testJoinWithAtomicType1() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> ds2 = env.fromElements(1, 2);
+
+ DataSet<Tuple2<Tuple3<Integer, Long, String>, Integer>> joinDs = ds1.join(ds2).where(0).equalTo("*");
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "(1,1,Hi),1\n" +
+ "(2,2,Hello),2";
+ }
+
+ public void testJoinWithAtomicType2() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Integer> ds1 = env.fromElements(1, 2);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+
+ DataSet<Tuple2<Integer, Tuple3<Integer, Long, String>>> joinDs = ds1.join(ds2).where("*").equalTo(0);
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ expected = "1,(1,1,Hi)\n" +
+ "2,(2,2,Hello)";
+ }
+
public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
@Override