You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:16 UTC
[10/10] flink git commit: [FLINK-2107] Add hash-based strategies for
left and right outer joins.
[FLINK-2107] Add hash-based strategies for left and right outer joins.
This closes #1262
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5671c77c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5671c77c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5671c77c
Branch: refs/heads/master
Commit: 5671c77c3afbb84ed89427bbab9b1355d0b1f8cc
Parents: c900577
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Oct 15 10:58:58 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Oct 19 16:17:42 2015 +0200
----------------------------------------------------------------------
docs/apis/dataset_transformations.md | 58 +-
.../java/org/apache/flink/api/java/DataSet.java | 29 +-
.../operator/FullOuterJoinOperatorTest.java | 250 +++++
.../operator/LeftOuterJoinOperatorTest.java | 251 +++++
.../operator/RightOuterJoinOperatorTest.java | 250 +++++
.../flink/optimizer/costs/CostEstimator.java | 2 +
.../flink/optimizer/dag/OuterJoinNode.java | 97 +-
.../HashLeftOuterJoinBuildSecondDescriptor.java | 68 ++
.../HashRightOuterJoinBuildFirstDescriptor.java | 68 ++
.../AbstractCachedBuildSideJoinDriver.java | 28 +-
.../operators/AbstractOuterJoinDriver.java | 11 +-
.../flink/runtime/operators/DriverStrategy.java | 8 +-
.../runtime/operators/FullOuterJoinDriver.java | 6 +-
.../flink/runtime/operators/JoinDriver.java | 20 +-
.../runtime/operators/LeftOuterJoinDriver.java | 28 +-
.../runtime/operators/RightOuterJoinDriver.java | 28 +-
.../operators/hash/HashJoinIteratorBase.java | 56 ++
.../operators/hash/HashMatchIteratorBase.java | 56 --
.../NonReusingBuildFirstHashJoinIterator.java | 166 ++++
.../NonReusingBuildFirstHashMatchIterator.java | 152 ---
...ingBuildFirstReOpenableHashJoinIterator.java | 87 ++
...ngBuildFirstReOpenableHashMatchIterator.java | 86 --
.../NonReusingBuildSecondHashJoinIterator.java | 164 ++++
.../NonReusingBuildSecondHashMatchIterator.java | 150 ---
...ngBuildSecondReOpenableHashJoinIterator.java | 88 ++
...gBuildSecondReOpenableHashMatchIterator.java | 86 --
.../hash/ReusingBuildFirstHashJoinIterator.java | 168 ++++
.../ReusingBuildFirstHashMatchIterator.java | 154 ---
...ingBuildFirstReOpenableHashJoinIterator.java | 87 ++
...ngBuildFirstReOpenableHashMatchIterator.java | 86 --
.../ReusingBuildSecondHashJoinIterator.java | 166 ++++
.../ReusingBuildSecondHashMatchIterator.java | 152 ---
...ngBuildSecondReOpenableHashJoinIterator.java | 87 ++
...gBuildSecondReOpenableHashMatchIterator.java | 85 --
.../AbstractOuterJoinTaskExternalITCase.java | 23 +-
.../operators/AbstractOuterJoinTaskTest.java | 35 +-
.../FullOuterJoinTaskExternalITCase.java | 7 +-
.../operators/FullOuterJoinTaskTest.java | 7 +-
.../LeftOuterJoinTaskExternalITCase.java | 42 +-
.../operators/LeftOuterJoinTaskTest.java | 230 ++++-
.../RightOuterJoinTaskExternalITCase.java | 44 +-
.../operators/RightOuterJoinTaskTest.java | 223 ++++-
.../hash/NonReusingHashJoinIteratorITCase.java | 947 +++++++++++++++++++
.../hash/NonReusingHashMatchIteratorITCase.java | 766 ---------------
.../NonReusingReOpenableHashTableITCase.java | 12 +-
.../hash/ReusingHashJoinIteratorITCase.java | 709 ++++++++++++++
.../hash/ReusingHashMatchIteratorITCase.java | 768 ---------------
.../hash/ReusingReOpenableHashTableITCase.java | 15 +-
.../operators/util/HashVsSortMiniBenchmark.java | 16 +-
.../org/apache/flink/api/scala/DataSet.scala | 28 +-
.../test/javaApiOperators/OuterJoinITCase.java | 61 +-
51 files changed, 4524 insertions(+), 2687 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/docs/apis/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/apis/dataset_transformations.md b/docs/apis/dataset_transformations.md
index cc7e742..feed121 100644
--- a/docs/apis/dataset_transformations.md
+++ b/docs/apis/dataset_transformations.md
@@ -1408,25 +1408,25 @@ Not supported.
The following hints are available:
-* OPTIMIZER_CHOOSES: Equivalent to not giving a hint at all, leaves the choice to the system.
+* `OPTIMIZER_CHOOSES`: Equivalent to not giving a hint at all, leaves the choice to the system.
-* BROADCAST_HASH_FIRST: Broadcasts the first input and builds a hash table from it, which is
+* `BROADCAST_HASH_FIRST`: Broadcasts the first input and builds a hash table from it, which is
probed by the second input. A good strategy if the first input is very small.
-* BROADCAST_HASH_SECOND: Broadcasts the second input and builds a hash table from it, which is
+* `BROADCAST_HASH_SECOND`: Broadcasts the second input and builds a hash table from it, which is
probed by the first input. A good strategy if the second input is very small.
-* REPARTITION_HASH_FIRST: The system partitions (shuffles) each input (unless the input is already
+* `REPARTITION_HASH_FIRST`: The system partitions (shuffles) each input (unless the input is already
partitioned) and builds a hash table from the first input. This strategy is good if the first
input is smaller than the second, but both inputs are still large.
*Note:* This is the default fallback strategy that the system uses if no size estimates can be made
and no pre-existing partitiongs and sort-orders can be re-used.
-* REPARTITION_HASH_SECOND: The system partitions (shuffles) each input (unless the input is already
+* `REPARTITION_HASH_SECOND`: The system partitions (shuffles) each input (unless the input is already
partitioned) and builds a hash table from the second input. This strategy is good if the second
input is smaller than the first, but both inputs are still large.
-* REPARTITION_SORT_MERGE: The system partitions (shuffles) each input (unless the input is already
+* `REPARTITION_SORT_MERGE`: The system partitions (shuffles) each input (unless the input is already
partitioned) and sorts each input (unless it is already sorted). The inputs are joined by
a streamed merge of the sorted inputs. This strategy is good if one or both of the inputs are
already sorted.
@@ -1558,9 +1558,13 @@ to manually pick a strategy, in case you want to enforce a specific way of execu
DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]
-DataSet<Tuple2<SomeType, AnotherType> result =
+DataSet<Tuple2<SomeType, AnotherType> result1 =
input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
.where("id").equalTo("key");
+
+DataSet<Tuple2<SomeType, AnotherType> result2 =
+ input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST)
+ .where("id").equalTo("key");
~~~
</div>
@@ -1573,6 +1577,8 @@ val input2: DataSet[AnotherType] = // [...]
// hint that the second DataSet is very small
val result1 = input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE).where("id").equalTo("key")
+val result2 = input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")
+
~~~
</div>
@@ -1585,15 +1591,47 @@ Not supported.
</div>
</div>
-**NOTE:** Right now, outer joins can only be executed using the `REPARTITION_SORT_MERGE` strategy. Further execution strategies will be added in the future.
+The following hints are available.
-* OPTIMIZER_CHOOSES: Equivalent to not giving a hint at all, leaves the choice to the system.
+* `OPTIMIZER_CHOOSES`: Equivalent to not giving a hint at all, leaves the choice to the system.
-* REPARTITION_SORT_MERGE: The system partitions (shuffles) each input (unless the input is already
+* `BROADCAST_HASH_FIRST`: Broadcasts the first input and builds a hash table from it, which is
+ probed by the second input. A good strategy if the first input is very small.
+
+* `BROADCAST_HASH_SECOND`: Broadcasts the second input and builds a hash table from it, which is
+ probed by the first input. A good strategy if the second input is very small.
+
+* `REPARTITION_HASH_FIRST`: The system partitions (shuffles) each input (unless the input is already
+ partitioned) and builds a hash table from the first input. This strategy is good if the first
+ input is smaller than the second, but both inputs are still large.
+
+* `REPARTITION_HASH_SECOND`: The system partitions (shuffles) each input (unless the input is already
+ partitioned) and builds a hash table from the second input. This strategy is good if the second
+ input is smaller than the first, but both inputs are still large.
+
+* `REPARTITION_SORT_MERGE`: The system partitions (shuffles) each input (unless the input is already
partitioned) and sorts each input (unless it is already sorted). The inputs are joined by
a streamed merge of the sorted inputs. This strategy is good if one or both of the inputs are
already sorted.
+**NOTE:** Not all execution strategies are supported by every outer join type, yet.
+
+* `LeftOuterJoin` supports:
+ * `OPTIMIZER_CHOOSES`
+ * `BROADCAST_HASH_SECOND`
+ * `REPARTITION_HASH_SECOND`
+ * `REPARTITION_SORT_MERGE`
+
+* `RightOuterJoin` supports:
+ * `OPTIMIZER_CHOOSES`
+ * `BROADCAST_HASH_FIRST`
+ * `REPARTITION_HASH_FIRST`
+ * `REPARTITION_SORT_MERGE`
+
+* `FullOuterJoin` supports:
+ * `OPTIMIZER_CHOOSES`
+ * `REPARTITION_SORT_MERGE`
+
### Cross
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 164b4af..3101f35 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -841,7 +841,16 @@ public abstract class DataSet<T> {
* @see DataSet
*/
public <R> JoinOperatorSetsBase<T, R> leftOuterJoin(DataSet<R> other, JoinHint strategy) {
- return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER);
+ switch(strategy) {
+ case OPTIMIZER_CHOOSES:
+ case REPARTITION_SORT_MERGE:
+ case REPARTITION_HASH_SECOND:
+ case BROADCAST_HASH_SECOND:
+ return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER);
+ default:
+ throw new InvalidProgramException("Invalid JoinHint for LeftOuterJoin: "+strategy);
+ }
+
}
/**
@@ -881,7 +890,15 @@ public abstract class DataSet<T> {
* @see DataSet
*/
public <R> JoinOperatorSetsBase<T, R> rightOuterJoin(DataSet<R> other, JoinHint strategy) {
- return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER);
+ switch(strategy) {
+ case OPTIMIZER_CHOOSES:
+ case REPARTITION_SORT_MERGE:
+ case REPARTITION_HASH_FIRST:
+ case BROADCAST_HASH_FIRST:
+ return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER);
+ default:
+ throw new InvalidProgramException("Invalid JoinHint for RightOuterJoin: "+strategy);
+ }
}
/**
@@ -921,7 +938,13 @@ public abstract class DataSet<T> {
* @see DataSet
*/
public <R> JoinOperatorSetsBase<T, R> fullOuterJoin(DataSet<R> other, JoinHint strategy) {
- return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER);
+ switch(strategy) {
+ case OPTIMIZER_CHOOSES:
+ case REPARTITION_SORT_MERGE:
+ return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER);
+ default:
+ throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: "+strategy);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
new file mode 100644
index 0000000..fe362f0
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FullOuterJoinOperatorTest {
+
+ // TUPLE DATA
+ private static final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+ new ArrayList<>();
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+ TupleTypeInfo<>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ @Test
+ public void testFullOuter1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.fullOuterJoin(ds2)
+ .where(0).equalTo(4)
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testFullOuter2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.fullOuterJoin(ds2)
+ .where("f1").equalTo("f3")
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testFullOuter3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.fullOuterJoin(ds2)
+ .where(new IntKeySelector()).equalTo(new IntKeySelector())
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testFullOuter4() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.fullOuterJoin(ds2)
+ .where(0).equalTo(new IntKeySelector())
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testFullOuter5() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.fullOuterJoin(ds2)
+ .where(new IntKeySelector()).equalTo("f4")
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testFullOuter6() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.fullOuterJoin(ds2)
+ .where("f0").equalTo(4)
+ .with(new DummyJoin());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFullOuter7() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // invalid key position
+ ds1.fullOuterJoin(ds2)
+ .where(5).equalTo(0)
+ .with(new DummyJoin());
+ }
+
+ @Test(expected = CompositeType.InvalidFieldReferenceException.class)
+ public void testFullOuter8() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // invalid key reference
+ ds1.fullOuterJoin(ds2)
+ .where(1).equalTo("f5")
+ .with(new DummyJoin());
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testFullOuter9() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // key types do not match
+ ds1.fullOuterJoin(ds2)
+ .where(0).equalTo(1)
+ .with(new DummyJoin());
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testFullOuter10() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // key types do not match
+ ds1.fullOuterJoin(ds2)
+ .where(new IntKeySelector()).equalTo(new LongKeySelector())
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testFullOuterStrategy1() {
+ this.testFullOuterStrategies(JoinHint.OPTIMIZER_CHOOSES);
+ }
+
+ @Test
+ public void testFullOuterStrategy2() {
+ this.testFullOuterStrategies(JoinHint.REPARTITION_SORT_MERGE);
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testFullOuterStrategy3() {
+ this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_SECOND);
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testFullOuterStrategy4() {
+ this.testFullOuterStrategies(JoinHint.BROADCAST_HASH_SECOND);
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testFullOuterStrategy5() {
+ this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_FIRST);
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testFullOuterStrategy6() {
+ this.testFullOuterStrategies(JoinHint.BROADCAST_HASH_FIRST);
+ }
+
+
+ private void testFullOuterStrategies(JoinHint hint) {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.fullOuterJoin(ds2, hint)
+ .where(0).equalTo(4)
+ .with(new DummyJoin());
+ }
+
+
+ /*
+ * ####################################################################
+ */
+
+ @SuppressWarnings("serial")
+ public static class DummyJoin implements
+ JoinFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>, Long> {
+
+ @Override
+ public Long join(Tuple5<Integer, Long, String, Long, Integer> v1, Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+ return 1L;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class IntKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Integer> {
+
+ @Override
+ public Integer getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
+ return v.f0;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class LongKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Long> {
+
+ @Override
+ public Long getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
+ return v.f1;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
new file mode 100644
index 0000000..06b0c13
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LeftOuterJoinOperatorTest {
+
+ // TUPLE DATA
+ private static final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+ new ArrayList<>();
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+ TupleTypeInfo<>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ @Test
+ public void testLeftOuter1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.leftOuterJoin(ds2)
+ .where(0).equalTo(4)
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testLeftOuter2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.leftOuterJoin(ds2)
+ .where("f1").equalTo("f3")
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testLeftOuter3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.leftOuterJoin(ds2)
+ .where(new IntKeySelector()).equalTo(new IntKeySelector())
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testLeftOuter4() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.leftOuterJoin(ds2)
+ .where(0).equalTo(new IntKeySelector())
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testLeftOuter5() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.leftOuterJoin(ds2)
+ .where(new IntKeySelector()).equalTo("f4")
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testLeftOuter6() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.leftOuterJoin(ds2)
+ .where("f0").equalTo(4)
+ .with(new DummyJoin());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testLeftOuter7() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // invalid key position
+ ds1.leftOuterJoin(ds2)
+ .where(5).equalTo(0)
+ .with(new DummyJoin());
+ }
+
+ @Test(expected = CompositeType.InvalidFieldReferenceException.class)
+ public void testLeftOuter8() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // invalid key reference
+ ds1.leftOuterJoin(ds2)
+ .where(1).equalTo("f5")
+ .with(new DummyJoin());
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testLeftOuter9() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // key types do not match
+ ds1.leftOuterJoin(ds2)
+ .where(0).equalTo(1)
+ .with(new DummyJoin());
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testLeftOuter10() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // key types do not match
+ ds1.leftOuterJoin(ds2)
+ .where(new IntKeySelector()).equalTo(new LongKeySelector())
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testLeftOuterStrategy1() {
+ this.testLeftOuterStrategies(JoinHint.OPTIMIZER_CHOOSES);
+ }
+
+ @Test
+ public void testLeftOuterStrategy2() {
+ this.testLeftOuterStrategies(JoinHint.REPARTITION_SORT_MERGE);
+ }
+
+ @Test
+ public void testLeftOuterStrategy3() {
+ this.testLeftOuterStrategies(JoinHint.REPARTITION_HASH_SECOND);
+ }
+
+ @Test
+ public void testLeftOuterStrategy4() {
+ this.testLeftOuterStrategies(JoinHint.BROADCAST_HASH_SECOND);
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testLeftOuterStrategy5() {
+ this.testLeftOuterStrategies(JoinHint.REPARTITION_HASH_FIRST);
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testLeftOuterStrategy6() {
+ this.testLeftOuterStrategies(JoinHint.BROADCAST_HASH_FIRST);
+ }
+
+
+ private void testLeftOuterStrategies(JoinHint hint) {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.leftOuterJoin(ds2, hint)
+ .where(0).equalTo(4)
+ .with(new DummyJoin());
+ }
+
+
+ /*
+ * ####################################################################
+ */
+
+ @SuppressWarnings("serial")
+ public static class DummyJoin implements
+ JoinFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>, Long> {
+
+ @Override
+ public Long join(Tuple5<Integer, Long, String, Long, Integer> v1, Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+ return 1L;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class IntKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Integer> {
+
+ @Override
+ public Integer getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
+ return v.f0;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class LongKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Long> {
+
+ @Override
+ public Long getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
+ return v.f1;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
new file mode 100644
index 0000000..0e407ca
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operator;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RightOuterJoinOperatorTest {
+
+ // TUPLE DATA
+ private static final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+ new ArrayList<>();
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+ TupleTypeInfo<>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ @Test
+ public void testRightOuter1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.rightOuterJoin(ds2)
+ .where(0).equalTo(4)
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testRightOuter2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.rightOuterJoin(ds2)
+ .where("f1").equalTo("f3")
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testRightOuter3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.rightOuterJoin(ds2)
+ .where(new IntKeySelector()).equalTo(new IntKeySelector())
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testRightOuter4() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.rightOuterJoin(ds2)
+ .where(0).equalTo(new IntKeySelector())
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testRightOuter5() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.rightOuterJoin(ds2)
+ .where(new IntKeySelector()).equalTo("f4")
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testRightOuter6() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.rightOuterJoin(ds2)
+ .where("f0").equalTo(4)
+ .with(new DummyJoin());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testRightOuter7() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // invalid key position
+ ds1.rightOuterJoin(ds2)
+ .where(5).equalTo(0)
+ .with(new DummyJoin());
+ }
+
+ @Test(expected = CompositeType.InvalidFieldReferenceException.class)
+ public void testRightOuter8() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // invalid key reference
+ ds1.rightOuterJoin(ds2)
+ .where(1).equalTo("f5")
+ .with(new DummyJoin());
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testRightOuter9() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // key types do not match
+ ds1.rightOuterJoin(ds2)
+ .where(0).equalTo(1)
+ .with(new DummyJoin());
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testRightOuter10() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // key types do not match
+ ds1.rightOuterJoin(ds2)
+ .where(new IntKeySelector()).equalTo(new LongKeySelector())
+ .with(new DummyJoin());
+ }
+
+ @Test
+ public void testRightOuterStrategy1() {
+ this.testRightOuterStrategies(JoinHint.OPTIMIZER_CHOOSES);
+ }
+
+ @Test
+ public void testRightOuterStrategy2() {
+ this.testRightOuterStrategies(JoinHint.REPARTITION_SORT_MERGE);
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testRightOuterStrategy3() {
+ this.testRightOuterStrategies(JoinHint.REPARTITION_HASH_SECOND);
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testRightOuterStrategy4() {
+ this.testRightOuterStrategies(JoinHint.BROADCAST_HASH_SECOND);
+ }
+
+ @Test
+ public void testRightOuterStrategy5() {
+ this.testRightOuterStrategies(JoinHint.REPARTITION_HASH_FIRST);
+ }
+
+ @Test
+ public void testRightOuterStrategy6() {
+ this.testRightOuterStrategies(JoinHint.BROADCAST_HASH_FIRST);
+ }
+
+
+ private void testRightOuterStrategies(JoinHint hint) {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ ds1.rightOuterJoin(ds2, hint)
+ .where(0).equalTo(4)
+ .with(new DummyJoin());
+ }
+
+
+ /*
+ * ####################################################################
+ */
+
+ @SuppressWarnings("serial")
+ public static class DummyJoin implements
+ JoinFunction<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>, Long> {
+
+ @Override
+ public Long join(Tuple5<Integer, Long, String, Long, Integer> v1, Tuple5<Integer, Long, String, Long, Integer> v2) throws Exception {
+ return 1L;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class IntKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Integer> {
+
+ @Override
+ public Integer getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
+ return v.f0;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class LongKeySelector implements KeySelector<Tuple5<Integer, Long, String, Long, Integer>, Long> {
+
+ @Override
+ public Long getKey(Tuple5<Integer, Long, String, Long, Integer> v) throws Exception {
+ return v.f1;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
index 553c127..5ff9eaf 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
@@ -202,9 +202,11 @@ public abstract class CostEstimator {
addLocalMergeCost(firstInput, secondInput, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_FIRST:
+ case RIGHT_HYBRIDHASH_BUILD_FIRST:
addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_SECOND:
+ case LEFT_HYBRIDHASH_BUILD_SECOND:
addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_FIRST_CACHED:
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
index ebdfcc8..0784de3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoi
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
+import org.apache.flink.optimizer.operators.HashLeftOuterJoinBuildSecondDescriptor;
+import org.apache.flink.optimizer.operators.HashRightOuterJoinBuildFirstDescriptor;
import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
import org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor;
import org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor;
@@ -56,20 +58,19 @@ public class OuterJoinNode extends TwoInputNode {
JoinHint joinHint = operator.getJoinHint();
joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
- List<OperatorDescriptorDual> list = new ArrayList<>();
- switch (joinHint) {
- case OPTIMIZER_CHOOSES:
- list.add(getSortMergeDescriptor(type, true));
+ List<OperatorDescriptorDual> list;
+ switch (type) {
+ case LEFT:
+ list = createLeftOuterJoinDescriptors(joinHint);
break;
- case REPARTITION_SORT_MERGE:
- list.add(getSortMergeDescriptor(type, false));
+ case RIGHT:
+ list = createRightOuterJoinDescriptors(joinHint);
+ break;
+ case FULL:
+ list = createFullOuterJoinDescriptors(joinHint);
break;
- case REPARTITION_HASH_FIRST:
- case REPARTITION_HASH_SECOND:
- case BROADCAST_HASH_FIRST:
- case BROADCAST_HASH_SECOND:
default:
- throw new CompilerException("Invalid join hint: " + joinHint + " for outer join type: " + type);
+ throw new CompilerException("Unknown outer join type: " + type);
}
Partitioner<?> customPartitioner = operator.getCustomPartitioner();
@@ -81,14 +82,74 @@ public class OuterJoinNode extends TwoInputNode {
return list;
}
- private OperatorDescriptorDual getSortMergeDescriptor(OuterJoinType type, boolean broadcastAllowed) {
- if (type == OuterJoinType.FULL) {
- return new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2);
- } else if (type == OuterJoinType.LEFT) {
- return new SortMergeLeftOuterJoinDescriptor(this.keys1, this.keys2, broadcastAllowed);
- } else {
- return new SortMergeRightOuterJoinDescriptor(this.keys1, this.keys2, broadcastAllowed);
+ private List<OperatorDescriptorDual> createLeftOuterJoinDescriptors(JoinHint hint) {
+
+ List<OperatorDescriptorDual> list = new ArrayList<>();
+ switch (hint) {
+ case OPTIMIZER_CHOOSES:
+ list.add(new SortMergeLeftOuterJoinDescriptor(this.keys1, this.keys2, true));
+ list.add(new HashLeftOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, true, true));
+ break;
+ case REPARTITION_SORT_MERGE:
+ list.add(new SortMergeLeftOuterJoinDescriptor(this.keys1, this.keys2, false));
+ break;
+ case REPARTITION_HASH_SECOND:
+ list.add(new HashLeftOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, false, true));
+ break;
+ case BROADCAST_HASH_SECOND:
+ list.add(new HashLeftOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, true, false));
+ break;
+ case BROADCAST_HASH_FIRST:
+ case REPARTITION_HASH_FIRST:
+ default:
+ throw new CompilerException("Invalid join hint: " + hint + " for left outer join");
}
+ return list;
+ }
+
+ private List<OperatorDescriptorDual> createRightOuterJoinDescriptors(JoinHint hint) {
+
+ List<OperatorDescriptorDual> list = new ArrayList<>();
+ switch (hint) {
+ case OPTIMIZER_CHOOSES:
+ list.add(new SortMergeRightOuterJoinDescriptor(this.keys1, this.keys2, true));
+ list.add(new HashRightOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, true, true));
+ break;
+ case REPARTITION_SORT_MERGE:
+ list.add(new SortMergeRightOuterJoinDescriptor(this.keys1, this.keys2, false));
+ break;
+ case REPARTITION_HASH_FIRST:
+ list.add(new HashRightOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, false, true));
+ break;
+ case BROADCAST_HASH_FIRST:
+ list.add(new HashRightOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, true, false));
+ break;
+ case BROADCAST_HASH_SECOND:
+ case REPARTITION_HASH_SECOND:
+ default:
+ throw new CompilerException("Invalid join hint: " + hint + " for right outer join");
+ }
+ return list;
+ }
+
+ private List<OperatorDescriptorDual> createFullOuterJoinDescriptors(JoinHint hint) {
+
+ List<OperatorDescriptorDual> list = new ArrayList<>();
+ switch (hint) {
+ case OPTIMIZER_CHOOSES:
+ list.add(new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2));
+ break;
+ case REPARTITION_SORT_MERGE:
+ list.add(new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2));
+ break;
+ case REPARTITION_HASH_SECOND:
+ case BROADCAST_HASH_SECOND:
+ case BROADCAST_HASH_FIRST:
+ case REPARTITION_HASH_FIRST:
+ default:
+ throw new CompilerException("Invalid join hint: " + hint + " for full outer join");
+ }
+ return list;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java
new file mode 100644
index 0000000..8ed7969
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildSecondDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+public class HashLeftOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor {
+
+ public HashLeftOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2,
+ boolean broadcastSecondAllowed, boolean repartitionAllowed) {
+ super(keys1, keys2, false, broadcastSecondAllowed, repartitionAllowed);
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.LEFT_HYBRIDHASH_BUILD_SECOND;
+ }
+
+ @Override
+ protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+ // all properties are possible
+ return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
+ }
+
+ @Override
+ public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
+ LocalProperties produced1, LocalProperties produced2) {
+ return true;
+ }
+
+ @Override
+ public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+
+ String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")";
+ return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
+ return new LocalProperties();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java
new file mode 100644
index 0000000..5ddba1c
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildFirstDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+public class HashRightOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor {
+
+ public HashRightOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2,
+ boolean broadcastFirstAllowed, boolean repartitionAllowed) {
+ super(keys1, keys2, broadcastFirstAllowed, false, repartitionAllowed);
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.RIGHT_HYBRIDHASH_BUILD_FIRST;
+ }
+
+ @Override
+ protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+ // all properties are possible
+ return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
+ }
+
+ @Override
+ public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
+ LocalProperties produced1, LocalProperties produced2) {
+ return true;
+ }
+
+ @Override
+ public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+
+ String nodeName = "RightOuterJoin("+node.getOperator().getName()+")";
+ return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
+ return new LocalProperties();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index 8f72754..9ac2ed6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -25,10 +25,10 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
@@ -85,7 +85,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
if (objectReuseEnabled) {
if (buildSideIndex == 0 && probeSideIndex == 1) {
- matchIterator = new ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
+ matchIterator = new ReusingBuildFirstReOpenableHashJoinIterator<IT1, IT2, OT>(
input1, input2,
serializer1, comparator1,
serializer2, comparator2,
@@ -94,12 +94,13 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory,
+ false,
hashJoinUseBitMaps);
} else if (buildSideIndex == 1 && probeSideIndex == 0) {
- matchIterator = new ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
+ matchIterator = new ReusingBuildSecondReOpenableHashJoinIterator<IT1, IT2, OT>(
input1, input2,
serializer1, comparator1,
serializer2, comparator2,
@@ -108,6 +109,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory,
+ false,
hashJoinUseBitMaps);
} else {
@@ -116,7 +118,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
} else {
if (buildSideIndex == 0 && probeSideIndex == 1) {
- matchIterator = new NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
+ matchIterator = new NonReusingBuildFirstReOpenableHashJoinIterator<IT1, IT2, OT>(
input1, input2,
serializer1, comparator1,
serializer2, comparator2,
@@ -125,12 +127,13 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory,
+ false,
hashJoinUseBitMaps);
} else if (buildSideIndex == 1 && probeSideIndex == 0) {
- matchIterator = new NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
+ matchIterator = new NonReusingBuildSecondReOpenableHashJoinIterator<IT1, IT2, OT>(
input1, input2,
serializer1, comparator1,
serializer2, comparator2,
@@ -139,6 +142,7 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
this.taskContext.getIOManager(),
this.taskContext.getOwningNepheleTask(),
availableMemory,
+ false,
hashJoinUseBitMaps);
} else {
@@ -173,20 +177,20 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
if (objectReuseEnabled) {
if (buildSideIndex == 0 && probeSideIndex == 1) {
- final ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+ final ReusingBuildFirstReOpenableHashJoinIterator<IT1, IT2, OT> matchIterator = (ReusingBuildFirstReOpenableHashJoinIterator<IT1, IT2, OT>) this.matchIterator;
matchIterator.reopenProbe(input2);
} else {
- final ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+ final ReusingBuildSecondReOpenableHashJoinIterator<IT1, IT2, OT> matchIterator = (ReusingBuildSecondReOpenableHashJoinIterator<IT1, IT2, OT>) this.matchIterator;
matchIterator.reopenProbe(input1);
}
} else {
if (buildSideIndex == 0 && probeSideIndex == 1) {
- final NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+ final NonReusingBuildFirstReOpenableHashJoinIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildFirstReOpenableHashJoinIterator<IT1, IT2, OT>) this.matchIterator;
matchIterator.reopenProbe(input2);
} else {
- final NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+ final NonReusingBuildSecondReOpenableHashJoinIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildSecondReOpenableHashJoinIterator<IT1, IT2, OT>) this.matchIterator;
matchIterator.reopenProbe(input1);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
index 8c964d4..2589ca5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
@@ -81,8 +81,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
final IOManager ioManager = this.taskContext.getIOManager();
// set up memory and I/O parameters
- final double fractionAvailableMemory = config.getRelativeMemoryDriver();
- final int numPages = memoryManager.computeNumberOfPages(fractionAvailableMemory);
+ final double driverMemFraction = config.getRelativeMemoryDriver();
final DriverStrategy ls = config.getDriverStrategy();
@@ -121,7 +120,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
pairComparatorFactory,
memoryManager,
ioManager,
- numPages
+ driverMemFraction
);
} else {
this.outerJoinIterator = getNonReusingOuterJoinIterator(
@@ -135,7 +134,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
pairComparatorFactory,
memoryManager,
ioManager,
- numPages
+ driverMemFraction
);
}
@@ -183,7 +182,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
MemoryManager memoryManager,
IOManager ioManager,
- int numPages
+ double driverMemFraction
) throws Exception;
protected abstract JoinTaskIterator<IT1, IT2, OT> getNonReusingOuterJoinIterator(
@@ -197,6 +196,6 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<Fl
TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
MemoryManager memoryManager,
IOManager ioManager,
- int numPages
+ double driverMemFraction
) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index bc7bee5..b069f12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -73,11 +73,8 @@ public enum DriverStrategy {
// both inputs are merged, but materialized to the side for block-nested-loop-join among values with equal key
INNER_MERGE(JoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
-
LEFT_OUTER_MERGE(LeftOuterJoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
-
RIGHT_OUTER_MERGE(RightOuterJoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
-
FULL_OUTER_MERGE(FullOuterJoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
// co-grouping inputs
@@ -94,6 +91,11 @@ public enum DriverStrategy {
HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
// cached variant of HYBRIDHASH_BUILD_SECOND, that can only be used inside of iterations
HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2),
+
+ // right outer join, the first input is build side, the second side is probe side of a hybrid hash table
+ RIGHT_HYBRIDHASH_BUILD_FIRST(RightOuterJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
+ // left outer join, the second input is build side, the first side is probe side of a hybrid hash table
+ LEFT_HYBRIDHASH_BUILD_SECOND(LeftOuterJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2),
// the second input is inner loop, the first input is outer loop and block-wise processed
NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, 0),
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
index d942b72..2c01fec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
@@ -47,10 +47,11 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
MemoryManager memoryManager,
IOManager ioManager,
- int numPages
+ double driverMemFraction
) throws Exception {
switch (driverStrategy) {
case FULL_OUTER_MERGE:
+ int numPages = memoryManager.computeNumberOfPages(driverMemFraction);
return new ReusingMergeOuterJoinIterator<>(
OuterJoinType.FULL,
in1,
@@ -82,10 +83,11 @@ public class FullOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
MemoryManager memoryManager,
IOManager ioManager,
- int numPages
+ double driverMemFraction
) throws Exception {
switch (driverStrategy) {
case FULL_OUTER_MERGE:
+ int numPages = memoryManager.computeNumberOfPages(driverMemFraction);
return new NonReusingMergeOuterJoinIterator<>(
OuterJoinType.FULL,
in1,
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index 7a9c8e6..c55843a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -26,10 +26,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator;
import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
@@ -133,23 +133,25 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
break;
case HYBRIDHASH_BUILD_FIRST:
- this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2,
+ this.joinIterator = new ReusingBuildFirstHashJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
+ false,
hashJoinUseBitMaps);
break;
case HYBRIDHASH_BUILD_SECOND:
- this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2,
+ this.joinIterator = new ReusingBuildSecondHashJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
+ false,
hashJoinUseBitMaps);
break;
default:
@@ -166,23 +168,25 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT
break;
case HYBRIDHASH_BUILD_FIRST:
- this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2,
+ this.joinIterator = new NonReusingBuildFirstHashJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator21(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
+ false,
hashJoinUseBitMaps);
break;
case HYBRIDHASH_BUILD_SECOND:
- this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2,
+ this.joinIterator = new NonReusingBuildSecondHashJoinIterator<>(in1, in2,
serializer1, comparator1,
serializer2, comparator2,
pairComparatorFactory.createComparator12(comparator1, comparator2),
memoryManager, ioManager,
this.taskContext.getOwningNepheleTask(),
fractionAvailableMemory,
+ false,
hashJoinUseBitMaps);
break;
default:
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
index ae05d1e..49d3648 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator;
import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
@@ -47,10 +49,11 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
MemoryManager memoryManager,
IOManager ioManager,
- int numPages
+ double driverMemFraction
) throws Exception {
switch (driverStrategy) {
case LEFT_OUTER_MERGE:
+ int numPages = memoryManager.computeNumberOfPages(driverMemFraction);
return new ReusingMergeOuterJoinIterator<>(
OuterJoinType.LEFT,
in1,
@@ -65,6 +68,16 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
numPages,
super.taskContext.getOwningNepheleTask()
);
+ case LEFT_HYBRIDHASH_BUILD_SECOND:
+ return new ReusingBuildSecondHashJoinIterator<>(in1, in2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator12(comparator1, comparator2),
+ memoryManager, ioManager,
+ this.taskContext.getOwningNepheleTask(),
+ driverMemFraction,
+ true,
+ false);
default:
throw new Exception("Unsupported driver strategy for left outer join driver: " + driverStrategy.name());
}
@@ -82,10 +95,11 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
MemoryManager memoryManager,
IOManager ioManager,
- int numPages
+ double driverMemFraction
) throws Exception {
switch (driverStrategy) {
case LEFT_OUTER_MERGE:
+ int numPages = memoryManager.computeNumberOfPages(driverMemFraction);
return new NonReusingMergeOuterJoinIterator<>(
OuterJoinType.LEFT,
in1,
@@ -100,6 +114,16 @@ public class LeftOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<I
numPages,
super.taskContext.getOwningNepheleTask()
);
+ case LEFT_HYBRIDHASH_BUILD_SECOND:
+ return new NonReusingBuildSecondHashJoinIterator<>(in1, in2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator12(comparator1, comparator2),
+ memoryManager, ioManager,
+ this.taskContext.getOwningNepheleTask(),
+ driverMemFraction,
+ true,
+ false);
default:
throw new Exception("Unsupported driver strategy for left outer join driver: " + driverStrategy.name());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
index 6fc8abd..1b67397 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator;
import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
@@ -47,10 +49,11 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
MemoryManager memoryManager,
IOManager ioManager,
- int numPages
+ double driverMemFraction
) throws Exception {
switch (driverStrategy) {
case RIGHT_OUTER_MERGE:
+ int numPages = memoryManager.computeNumberOfPages(driverMemFraction);
return new ReusingMergeOuterJoinIterator<>(
OuterJoinType.RIGHT,
in1,
@@ -65,6 +68,16 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
numPages,
super.taskContext.getOwningNepheleTask()
);
+ case RIGHT_HYBRIDHASH_BUILD_FIRST:
+ return new ReusingBuildFirstHashJoinIterator<>(in1, in2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator21(comparator1, comparator2),
+ memoryManager, ioManager,
+ this.taskContext.getOwningNepheleTask(),
+ driverMemFraction,
+ true,
+ false);
default:
throw new Exception("Unsupported driver strategy for right outer join driver: " + driverStrategy.name());
}
@@ -82,10 +95,11 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
TypePairComparatorFactory<IT1, IT2> pairComparatorFactory,
MemoryManager memoryManager,
IOManager ioManager,
- int numPages
+ double driverMemFraction
) throws Exception {
switch (driverStrategy) {
case RIGHT_OUTER_MERGE:
+ int numPages = memoryManager.computeNumberOfPages(driverMemFraction);
return new NonReusingMergeOuterJoinIterator<>(
OuterJoinType.RIGHT,
in1,
@@ -100,6 +114,16 @@ public class RightOuterJoinDriver<IT1, IT2, OT> extends AbstractOuterJoinDriver<
numPages,
super.taskContext.getOwningNepheleTask()
);
+ case RIGHT_HYBRIDHASH_BUILD_FIRST:
+ return new NonReusingBuildFirstHashJoinIterator<>(in1, in2,
+ serializer1, comparator1,
+ serializer2, comparator2,
+ pairComparatorFactory.createComparator21(comparator1, comparator2),
+ memoryManager, ioManager,
+ this.taskContext.getOwningNepheleTask(),
+ driverMemFraction,
+ true,
+ false);
default:
throw new Exception("Unsupported driver strategy for right outer join driver: " + driverStrategy.name());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashJoinIteratorBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashJoinIteratorBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashJoinIteratorBase.java
new file mode 100644
index 0000000..30b1df2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashJoinIteratorBase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+
+import java.util.List;
+
+/**
+ * Common methods for all Hash Join Iterators.
+ */
+public class HashJoinIteratorBase {
+
+ public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+ TypeSerializer<BT> buildSideSerializer,
+ TypeComparator<BT> buildSideComparator,
+ TypeSerializer<PT> probeSideSerializer,
+ TypeComparator<PT> probeSideComparator,
+ TypePairComparator<PT, BT> pairComparator,
+ MemoryManager memManager,
+ IOManager ioManager,
+ AbstractInvokable ownerTask,
+ double memoryFraction,
+ boolean useBloomFilters) throws MemoryAllocationException {
+
+ final int numPages = memManager.computeNumberOfPages(memoryFraction);
+ final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
+
+ return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+ buildSideComparator, probeSideComparator, pairComparator,
+ memorySegments, ioManager,
+ useBloomFilters);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
deleted file mode 100644
index 3b12c68..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-
-import java.util.List;
-
-/**
- * Common methods for all Hash Join Iterators.
- */
-public class HashMatchIteratorBase {
-
- public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
- TypeSerializer<BT> buildSideSerializer,
- TypeComparator<BT> buildSideComparator,
- TypeSerializer<PT> probeSideSerializer,
- TypeComparator<PT> probeSideComparator,
- TypePairComparator<PT, BT> pairComparator,
- MemoryManager memManager,
- IOManager ioManager,
- AbstractInvokable ownerTask,
- double memoryFraction,
- boolean useBloomFilters) throws MemoryAllocationException {
-
- final int numPages = memManager.computeNumberOfPages(memoryFraction);
- final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-
- return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
- buildSideComparator, probeSideComparator, pairComparator,
- memorySegments, ioManager,
- useBloomFilters);
- }
-}