You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:12 UTC

[06/10] flink git commit: [FLINK-2107] Add hash-based strategies for left and right outer joins.

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index 2a306ca..582077e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -42,8 +42,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.TupleMatch;
-import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.TupleMatchRemovingJoin;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
 import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
@@ -59,6 +59,9 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples;
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData;
+
 /**
  * Test specialized hash join that keeps the build side data (in memory and on hard disk)
  * This is used for iterative tasks.
@@ -203,7 +206,7 @@ public class ReusingReOpenableHashTableITCase {
 	
 	private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, TestData.TupleGenerator pgen) throws Exception {
 		// collect expected data
-		final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = ReusingHashMatchIteratorITCase.matchSecondTupleFields(ReusingHashMatchIteratorITCase.collectTupleData(buildInput), ReusingHashMatchIteratorITCase.collectTupleData(probeInput));
+		final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = joinTuples(collectTupleData(buildInput), collectTupleData(probeInput));
 		
 		final List<Map<Integer, Collection<TupleMatch>>> expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
 		final FlatJoinFunction[] nMatcher = new TupleMatchRemovingJoin[NUM_PROBES];
@@ -224,11 +227,11 @@ public class ReusingReOpenableHashTableITCase {
 		probeInput.reset();
 
 		// compare with iterator values
-		ReusingBuildFirstReOpenableHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-				new ReusingBuildFirstReOpenableHashMatchIterator<>(
+		ReusingBuildFirstReOpenableHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new ReusingBuildFirstReOpenableHashJoinIterator<>(
 						buildInput, probeInput, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0, true);
+					this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
 		
 		iterator.open();
 		// do first join with both inputs

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 4c5a07e..4f18494 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -29,8 +29,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
@@ -180,11 +180,11 @@ public class HashVsSortMiniBenchmark {
 			long start = System.nanoTime();
 			
 			// compare with iterator values
-			final ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-					new ReusingBuildFirstHashMatchIterator<>(
+			final ReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildFirstHashJoinIterator<>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 							this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
-							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
+							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true);
 			
 			iterator.open();
 			
@@ -219,11 +219,11 @@ public class HashVsSortMiniBenchmark {
 			long start = System.nanoTime();
 			
 			// compare with iterator values
-			ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
-					new ReusingBuildSecondHashMatchIterator<>(
+			ReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					new ReusingBuildSecondHashJoinIterator<>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 						this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
-						this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
+						this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true);
 			
 			iterator.open();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index dbae59d..7c39085 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -932,7 +932,13 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * use. If null is given as the join strategy, then the optimizer will pick the strategy.
    */
   def fullOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] =
-    new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.FULL_OUTER)
+    strategy match {
+      case JoinHint.OPTIMIZER_CHOOSES |
+           JoinHint.REPARTITION_SORT_MERGE =>
+        new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.FULL_OUTER)
+      case _ =>
+        throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: " + strategy)
+    }
 
   /**
    * An outer join on the left side.
@@ -960,7 +966,15 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * @see #fullOuterJoin
    */
   def leftOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] =
-    new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.LEFT_OUTER)
+    strategy match {
+      case JoinHint.OPTIMIZER_CHOOSES |
+           JoinHint.REPARTITION_SORT_MERGE |
+           JoinHint.REPARTITION_HASH_SECOND |
+      JoinHint.BROADCAST_HASH_SECOND =>
+        new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.LEFT_OUTER)
+      case _ =>
+        throw new InvalidProgramException("Invalid JoinHint for LeftOuterJoin: " + strategy)
+    }
 
   /**
    * An outer join on the right side.
@@ -988,7 +1002,15 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * @see #fullOuterJoin
    */
   def rightOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] =
-    new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.RIGHT_OUTER)
+    strategy match {
+      case JoinHint.OPTIMIZER_CHOOSES |
+           JoinHint.REPARTITION_SORT_MERGE |
+           JoinHint.REPARTITION_HASH_FIRST |
+      JoinHint.BROADCAST_HASH_FIRST =>
+        new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.RIGHT_OUTER)
+      case _ =>
+        throw new InvalidProgramException("Invalid JoinHint for RightOuterJoin: " + strategy)
+    }
 
   // --------------------------------------------------------------------------------------------
   //  Co-Group

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
index ebd1ddf..c2dca66 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.javaApiOperators;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -50,7 +51,21 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testUDFLeftOuterJoinOnTuplesWithKeyFieldPositions() throws Exception {
+	public void testLeftOuterJoin1() throws Exception {
+		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
+	}
+
+	@Test
+	public void testLeftOuterJoin2() throws Exception {
+		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
+	}
+
+	@Test
+	public void testLeftOuterJoin3() throws Exception {
+		testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
+	}
+
+	private void testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
 		/*
 		 * UDF Join on tuples with key field positions
 		 */
@@ -60,7 +75,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
 		DataSet<Tuple2<String, String>> joinDs =
-				ds1.leftOuterJoin(ds2)
+				ds1.leftOuterJoin(ds2, hint)
 						.where(0)
 						.equalTo(0)
 						.with(new T3T5FlatJoin());
@@ -76,7 +91,21 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testUDFRightOuterJoinOnTuplesWithKeyFieldPositions() throws Exception {
+	public void testRightOuterJoin1() throws Exception {
+		testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
+	}
+
+	@Test
+	public void testRightOuterJoin2() throws Exception {
+		testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST);
+	}
+
+	@Test
+	public void testRightOuterJoin3() throws Exception {
+		testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
+	}
+
+	private void testRightOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
 		/*
 		 * UDF Join on tuples with key field positions
 		 */
@@ -86,7 +115,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
 		DataSet<Tuple2<String, String>> joinDs =
-				ds1.rightOuterJoin(ds2)
+				ds1.rightOuterJoin(ds2, hint)
 						.where(1)
 						.equalTo(1)
 						.with(new T3T5FlatJoin());
@@ -102,7 +131,11 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testUDFFullOuterJoinOnTuplesWithKeyFieldPositions() throws Exception {
+	public void testFullOuterJoin1() throws Exception {
+		testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
+	}
+
+	private void testFullOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
 		/*
 		 * UDF Join on tuples with key field positions
 		 */
@@ -112,7 +145,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
 		DataSet<Tuple2<String, String>> joinDs =
-				ds1.fullOuterJoin(ds2)
+				ds1.fullOuterJoin(ds2, hint)
 						.where(0)
 						.equalTo(2)
 						.with(new T3T5FlatJoin());
@@ -128,7 +161,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testUDFJoinOnTuplesWithMultipleKeyFieldPositions() throws Exception {
+	public void testJoinOnTuplesWithCompositeKeyPositions() throws Exception {
 		/*
 		 * UDF Join on tuples with multiple key field positions
 		 */
@@ -183,7 +216,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testJoinOnACustomTypeInputWithKeyExtractorAndATupleInputWithKeyFieldSelector() throws Exception {
+	public void testJoinWithMixedKeyTypes1() throws Exception {
 		/*
 		 * Join on a tuple input with key field selector and a custom type input with key extractor
 		 */
@@ -218,7 +251,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 
 
 	@Test
-	public void testJoinOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor()
+	public void testJoinWithMixedKeyTypes2()
 			throws Exception {
 		/*
 		 * Join on a tuple input with key field selector and a custom type input with key extractor
@@ -252,7 +285,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testUDFJoinOnTuplesWithTupleReturningKeySelectors() throws Exception {
+	public void testJoinWithTupleReturningKeySelectors() throws Exception {
 		/*
 		 * UDF Join on tuples with tuple-returning key selectors
 		 */
@@ -296,7 +329,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testJoinNestedPojoAgainstTupleSelectedUsingString() throws Exception {
+	public void testJoinWithNestedKeyExpression1() throws Exception {
 		/*
 		 * Join nested pojo against tuple (selected using a string)
 		 */
@@ -308,7 +341,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 				ds1.fullOuterJoin(ds2)
 						.where("nestedPojo.longNumber")
 						.equalTo("f6")
-						.with(new ProjectBothFunction());
+						.with(new ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>());
 
 		List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
 
@@ -320,7 +353,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testJoinNestedPojoAgainstTupleSelectedUsingInteger() throws Exception {
+	public void testJoinWithNestedKeyExpression2() throws Exception {
 		/*
 		 * Join nested pojo against tuple (selected as an integer)
 		 */
@@ -344,7 +377,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testSelectingMultipleFieldsUsingExpressionLanguage() throws Exception {
+	public void testJoinWithCompositeKeyExpressions() throws Exception {
 		/*
 		 * selecting multiple fields using expression language
 		 */