You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/06/20 13:36:15 UTC
flink git commit: [FLINK-4078] [dataSet] Introduce closure cleaning
in CoGroup.where() and equaltTo()
Repository: flink
Updated Branches:
refs/heads/master 6a1144e5a -> 78d3c61c3
[FLINK-4078] [dataSet] Introduce closure cleaning in CoGroup.where() and equaltTo()
This closes #2116.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78d3c61c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78d3c61c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78d3c61c
Branch: refs/heads/master
Commit: 78d3c61c3dbf37ef61b37e0b5f1a90fc55d2c586
Parents: 6a1144e
Author: Stefan Richter <st...@gmail.com>
Authored: Thu Jun 16 12:11:32 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jun 20 15:35:57 2016 +0200
----------------------------------------------------------------------
.../api/java/operators/CoGroupOperator.java | 4 +-
.../test/javaApiOperators/CoGroupITCase.java | 115 ++++++++++++++++++-
2 files changed, 116 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/78d3c61c/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 45ea385..3c838cc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -455,7 +455,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
*/
public <K> CoGroupOperatorSetsPredicate where(KeySelector<I1, K> keyExtractor) {
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input1.getType());
- return new CoGroupOperatorSetsPredicate(new SelectorFunctionKeys<>(keyExtractor, input1.getType(), keyType));
+ return new CoGroupOperatorSetsPredicate(new SelectorFunctionKeys<>(input1.clean(keyExtractor), input1.getType(), keyType));
}
// ----------------------------------------------------------------------------------------
@@ -520,7 +520,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
*/
public <K> CoGroupOperatorWithoutFunction equalTo(KeySelector<I2, K> keyExtractor) {
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input2.getType());
- return createCoGroupOperator(new SelectorFunctionKeys<>(keyExtractor, input2.getType(), keyType));
+ return createCoGroupOperator(new SelectorFunctionKeys<>(input1.clean(keyExtractor), input2.getType(), keyType));
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/78d3c61c/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index 5b7caa7..4ca0f3e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.test.javaApiOperators;
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
@@ -308,7 +309,7 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
}
@Test
- public void testCoGroupWithMultipleKeyFieldsWithKeyExtractor() throws Exception {
+ public void testCoGroupWithMultipleKeyFieldsWithStaticClassKeyExtractor() throws Exception {
/*
* CoGroup with multiple key fields
*/
@@ -334,6 +335,118 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
compareResultAsTuples(result, expected);
}
+ @Test
+ public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner() throws Exception {
+ /*
+ * CoGroup with multiple key fields, test working closure cleaner for inner classes
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+ where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
+ Tuple2<Integer, Long>>() {
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }).
+ equalTo(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f1);
+ }
+ }).
+ with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
+ @Override
+ public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple3<Integer, Long, String>> second,
+ Collector<Tuple3<Integer, Long, String>> out)
+ {
+ List<String> strs = new ArrayList<String>();
+
+ for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+ strs.add(t.f3);
+ }
+
+ for(Tuple3<Integer, Long, String> t : second) {
+ for(String s : strs) {
+ out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+ }
+ }
+ }
+ });
+
+ List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+
+ String expected = "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithoutClosureCleaner() throws Exception {
+ /*
+ * CoGroup with multiple key fields, test that disabling closure cleaner leads to an exception when using inner
+ * classes.
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableClosureCleaner();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+ boolean correctExceptionTriggered = false;
+ try {
+ DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+ where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
+ Tuple2<Integer, Long>>() {
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }).
+ equalTo(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() {
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f1);
+ }
+ }).
+ with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
+ @Override
+ public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple3<Integer, Long, String>> second,
+ Collector<Tuple3<Integer, Long, String>> out) {
+ List<String> strs = new ArrayList<String>();
+
+ for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+ strs.add(t.f3);
+ }
+
+ for (Tuple3<Integer, Long, String> t : second) {
+ for (String s : strs) {
+ out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+ }
+ }
+ }
+ });
+ } catch (InvalidProgramException ex) {
+ correctExceptionTriggered = (ex.getCause() instanceof java.io.NotSerializableException);
+ }
+ Assert.assertTrue(correctExceptionTriggered);
+
+ }
+
public static class KeySelector7 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>,
Tuple2<Integer, Long>> {
private static final long serialVersionUID = 1L;