You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/06/20 13:36:15 UTC

flink git commit: [FLINK-4078] [dataSet] Introduce closure cleaning in CoGroup.where() and equaltTo()

Repository: flink
Updated Branches:
  refs/heads/master 6a1144e5a -> 78d3c61c3


[FLINK-4078] [dataSet] Introduce closure cleaning in CoGroup.where() and equaltTo()

This closes #2116.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78d3c61c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78d3c61c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78d3c61c

Branch: refs/heads/master
Commit: 78d3c61c3dbf37ef61b37e0b5f1a90fc55d2c586
Parents: 6a1144e
Author: Stefan Richter <st...@gmail.com>
Authored: Thu Jun 16 12:11:32 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Jun 20 15:35:57 2016 +0200

----------------------------------------------------------------------
 .../api/java/operators/CoGroupOperator.java     |   4 +-
 .../test/javaApiOperators/CoGroupITCase.java    | 115 ++++++++++++++++++-
 2 files changed, 116 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78d3c61c/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 45ea385..3c838cc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -455,7 +455,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		 */
 		public <K> CoGroupOperatorSetsPredicate where(KeySelector<I1, K> keyExtractor) {
 			TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input1.getType());
-			return new CoGroupOperatorSetsPredicate(new SelectorFunctionKeys<>(keyExtractor, input1.getType(), keyType));
+			return new CoGroupOperatorSetsPredicate(new SelectorFunctionKeys<>(input1.clean(keyExtractor), input1.getType(), keyType));
 		}
 
 		// ----------------------------------------------------------------------------------------
@@ -520,7 +520,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			 */
 			public <K> CoGroupOperatorWithoutFunction equalTo(KeySelector<I2, K> keyExtractor) {
 				TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input2.getType());
-				return createCoGroupOperator(new SelectorFunctionKeys<>(keyExtractor, input2.getType(), keyType));
+				return createCoGroupOperator(new SelectorFunctionKeys<>(input1.clean(keyExtractor), input2.getType(), keyType));
 			}
 
 			/**

http://git-wip-us.apache.org/repos/asf/flink/blob/78d3c61c/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index 5b7caa7..4ca0f3e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
@@ -308,7 +309,7 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testCoGroupWithMultipleKeyFieldsWithKeyExtractor() throws Exception {
+	public void testCoGroupWithMultipleKeyFieldsWithStaticClassKeyExtractor() throws Exception {
 		/*
 		 * CoGroup with multiple key fields
 		 */
@@ -334,6 +335,118 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		compareResultAsTuples(result, expected);
 	}
 
+	@Test
+	public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithClosureCleaner() throws Exception {
+		/*
+		 * CoGroup with multiple key fields, test working closure cleaner for inner classes
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+		DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+				where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
+						Tuple2<Integer, Long>>() {
+					@Override
+					public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
+						return new Tuple2<Integer, Long>(t.f0, t.f4);
+					}
+				}).
+				equalTo(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
+
+					@Override
+					public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
+						return new Tuple2<Integer, Long>(t.f0, t.f1);
+					}
+				}).
+				with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
+					@Override
+					public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+					                    Iterable<Tuple3<Integer, Long, String>> second,
+					                    Collector<Tuple3<Integer, Long, String>> out)
+					{
+						List<String> strs = new ArrayList<String>();
+
+						for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+							strs.add(t.f3);
+						}
+
+						for(Tuple3<Integer, Long, String> t : second) {
+							for(String s : strs) {
+								out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+							}
+						}
+					}
+				});
+
+		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+
+		String expected = "1,1,Hallo\n" +
+				"2,2,Hallo Welt\n" +
+				"3,2,Hallo Welt wie gehts?\n" +
+				"3,2,ABC\n" +
+				"5,3,HIJ\n" +
+				"5,3,IJK\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testCoGroupWithMultipleKeyFieldsWithInnerClassKeyExtractorWithoutClosureCleaner() throws Exception {
+		/*
+		 * CoGroup with multiple key fields, test that disabling closure cleaner leads to an exception when using inner
+		 * classes.
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableClosureCleaner();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+		boolean correctExceptionTriggered = false;
+		try {
+			DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+					where(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>,
+							Tuple2<Integer, Long>>() {
+						@Override
+						public Tuple2<Integer, Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) throws Exception {
+							return new Tuple2<Integer, Long>(t.f0, t.f4);
+						}
+					}).
+					equalTo(new KeySelector<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>>() {
+
+						@Override
+						public Tuple2<Integer, Long> getKey(Tuple3<Integer, Long, String> t) {
+							return new Tuple2<Integer, Long>(t.f0, t.f1);
+						}
+					}).
+					with(new CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
+						@Override
+						public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+						                    Iterable<Tuple3<Integer, Long, String>> second,
+						                    Collector<Tuple3<Integer, Long, String>> out) {
+							List<String> strs = new ArrayList<String>();
+
+							for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+								strs.add(t.f3);
+							}
+
+							for (Tuple3<Integer, Long, String> t : second) {
+								for (String s : strs) {
+									out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+								}
+							}
+						}
+					});
+		} catch (InvalidProgramException ex) {
+			correctExceptionTriggered = (ex.getCause() instanceof java.io.NotSerializableException);
+		}
+		Assert.assertTrue(correctExceptionTriggered);
+
+	}
+
 	public static class KeySelector7 implements KeySelector<Tuple5<Integer,Long,Integer,String,Long>,
 	Tuple2<Integer, Long>> {
 		private static final long serialVersionUID = 1L;