You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:12 UTC
[06/10] flink git commit: [FLINK-2107] Add hash-based strategies for
left and right outer joins.
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index 2a306ca..582077e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -42,8 +42,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.TupleMatch;
-import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.TupleMatchRemovingJoin;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
@@ -59,6 +59,9 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples;
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData;
+
/**
* Test specialized hash join that keeps the build side data (in memory and on hard disk)
* This is used for iterative tasks.
@@ -203,7 +206,7 @@ public class ReusingReOpenableHashTableITCase {
private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, TestData.TupleGenerator pgen) throws Exception {
// collect expected data
- final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = ReusingHashMatchIteratorITCase.matchSecondTupleFields(ReusingHashMatchIteratorITCase.collectTupleData(buildInput), ReusingHashMatchIteratorITCase.collectTupleData(probeInput));
+ final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = joinTuples(collectTupleData(buildInput), collectTupleData(probeInput));
final List<Map<Integer, Collection<TupleMatch>>> expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
final FlatJoinFunction[] nMatcher = new TupleMatchRemovingJoin[NUM_PROBES];
@@ -224,11 +227,11 @@ public class ReusingReOpenableHashTableITCase {
probeInput.reset();
// compare with iterator values
- ReusingBuildFirstReOpenableHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new ReusingBuildFirstReOpenableHashMatchIterator<>(
+ ReusingBuildFirstReOpenableHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildFirstReOpenableHashJoinIterator<>(
buildInput, probeInput, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0, true);
+ this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
iterator.open();
// do first join with both inputs
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 4c5a07e..4f18494 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -29,8 +29,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
@@ -180,11 +180,11 @@ public class HashVsSortMiniBenchmark {
long start = System.nanoTime();
// compare with iterator values
- final ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new ReusingBuildFirstHashMatchIterator<>(
+ final ReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildFirstHashJoinIterator<>(
input1, input2, this.serializer1.getSerializer(), this.comparator1,
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
- this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
+ this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true);
iterator.open();
@@ -219,11 +219,11 @@ public class HashVsSortMiniBenchmark {
long start = System.nanoTime();
// compare with iterator values
- ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new ReusingBuildSecondHashMatchIterator<>(
+ ReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildSecondHashJoinIterator<>(
input1, input2, this.serializer1.getSerializer(), this.comparator1,
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
- this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
+ this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true);
iterator.open();
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index dbae59d..7c39085 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -932,7 +932,13 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* use. If null is given as the join strategy, then the optimizer will pick the strategy.
*/
def fullOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] =
- new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.FULL_OUTER)
+ strategy match {
+ case JoinHint.OPTIMIZER_CHOOSES |
+ JoinHint.REPARTITION_SORT_MERGE =>
+ new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.FULL_OUTER)
+ case _ =>
+ throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: " + strategy)
+ }
/**
* An outer join on the left side.
@@ -960,7 +966,15 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* @see #fullOuterJoin
*/
def leftOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] =
- new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.LEFT_OUTER)
+ strategy match {
+ case JoinHint.OPTIMIZER_CHOOSES |
+ JoinHint.REPARTITION_SORT_MERGE |
+ JoinHint.REPARTITION_HASH_SECOND |
+ JoinHint.BROADCAST_HASH_SECOND =>
+ new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.LEFT_OUTER)
+ case _ =>
+ throw new InvalidProgramException("Invalid JoinHint for LeftOuterJoin: " + strategy)
+ }
/**
* An outer join on the right side.
@@ -988,7 +1002,15 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* @see #fullOuterJoin
*/
def rightOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] =
- new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.RIGHT_OUTER)
+ strategy match {
+ case JoinHint.OPTIMIZER_CHOOSES |
+ JoinHint.REPARTITION_SORT_MERGE |
+ JoinHint.REPARTITION_HASH_FIRST |
+ JoinHint.BROADCAST_HASH_FIRST =>
+ new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.RIGHT_OUTER)
+ case _ =>
+ throw new InvalidProgramException("Invalid JoinHint for RightOuterJoin: " + strategy)
+ }
// --------------------------------------------------------------------------------------------
// Co-Group
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
index ebd1ddf..c2dca66 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.javaApiOperators;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
@@ -50,7 +51,21 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
}
@Test
- public void testUDFLeftOuterJoinOnTuplesWithKeyFieldPositions() throws Exception {
+ public void testLeftOuterJoin1() throws Exception {
+ testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
+ }
+
+ @Test
+ public void testLeftOuterJoin2() throws Exception {
+ testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND);
+ }
+
+ @Test
+ public void testLeftOuterJoin3() throws Exception {
+ testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND);
+ }
+
+ private void testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
/*
* UDF Join on tuples with key field positions
*/
@@ -60,7 +75,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
DataSet<Tuple2<String, String>> joinDs =
- ds1.leftOuterJoin(ds2)
+ ds1.leftOuterJoin(ds2, hint)
.where(0)
.equalTo(0)
.with(new T3T5FlatJoin());
@@ -76,7 +91,21 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
}
@Test
- public void testUDFRightOuterJoinOnTuplesWithKeyFieldPositions() throws Exception {
+ public void testRightOuterJoin1() throws Exception {
+ testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
+ }
+
+ @Test
+ public void testRightOuterJoin2() throws Exception {
+ testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST);
+ }
+
+ @Test
+ public void testRightOuterJoin3() throws Exception {
+ testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST);
+ }
+
+ private void testRightOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
/*
* UDF Join on tuples with key field positions
*/
@@ -86,7 +115,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
DataSet<Tuple2<String, String>> joinDs =
- ds1.rightOuterJoin(ds2)
+ ds1.rightOuterJoin(ds2, hint)
.where(1)
.equalTo(1)
.with(new T3T5FlatJoin());
@@ -102,7 +131,11 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
}
@Test
- public void testUDFFullOuterJoinOnTuplesWithKeyFieldPositions() throws Exception {
+ public void testFullOuterJoin1() throws Exception {
+ testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE);
+ }
+
+ private void testFullOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception {
/*
* UDF Join on tuples with key field positions
*/
@@ -112,7 +145,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
DataSet<Tuple2<String, String>> joinDs =
- ds1.fullOuterJoin(ds2)
+ ds1.fullOuterJoin(ds2, hint)
.where(0)
.equalTo(2)
.with(new T3T5FlatJoin());
@@ -128,7 +161,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
}
@Test
- public void testUDFJoinOnTuplesWithMultipleKeyFieldPositions() throws Exception {
+ public void testJoinOnTuplesWithCompositeKeyPositions() throws Exception {
/*
* UDF Join on tuples with multiple key field positions
*/
@@ -183,7 +216,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
}
@Test
- public void testJoinOnACustomTypeInputWithKeyExtractorAndATupleInputWithKeyFieldSelector() throws Exception {
+ public void testJoinWithMixedKeyTypes1() throws Exception {
/*
* Join on a tuple input with key field selector and a custom type input with key extractor
*/
@@ -218,7 +251,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
@Test
- public void testJoinOnATupleInputWithKeyFieldSelectorAndACustomTypeInputWithKeyExtractor()
+ public void testJoinWithMixedKeyTypes2()
throws Exception {
/*
* Join on a tuple input with key field selector and a custom type input with key extractor
@@ -252,7 +285,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
}
@Test
- public void testUDFJoinOnTuplesWithTupleReturningKeySelectors() throws Exception {
+ public void testJoinWithTupleReturningKeySelectors() throws Exception {
/*
* UDF Join on tuples with tuple-returning key selectors
*/
@@ -296,7 +329,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
}
@Test
- public void testJoinNestedPojoAgainstTupleSelectedUsingString() throws Exception {
+ public void testJoinWithNestedKeyExpression1() throws Exception {
/*
* Join nested pojo against tuple (selected using a string)
*/
@@ -308,7 +341,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
ds1.fullOuterJoin(ds2)
.where("nestedPojo.longNumber")
.equalTo("f6")
- .with(new ProjectBothFunction());
+ .with(new ProjectBothFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>());
List<Tuple2<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>>> result = joinDs.collect();
@@ -320,7 +353,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
}
@Test
- public void testJoinNestedPojoAgainstTupleSelectedUsingInteger() throws Exception {
+ public void testJoinWithNestedKeyExpression2() throws Exception {
/*
* Join nested pojo against tuple (selected as an integer)
*/
@@ -344,7 +377,7 @@ public class OuterJoinITCase extends MultipleProgramsTestBase {
}
@Test
- public void testSelectingMultipleFieldsUsingExpressionLanguage() throws Exception {
+ public void testJoinWithCompositeKeyExpressions() throws Exception {
/*
* selecting multiple fields using expression language
*/