You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/08/05 00:41:10 UTC
[5/5] flink git commit: [FLINK-2105] Implement Sort-Merge Outer Join
algorithm
[FLINK-2105] Implement Sort-Merge Outer Join algorithm
This closes #907
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/941ac6df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/941ac6df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/941ac6df
Branch: refs/heads/master
Commit: 941ac6dfd446d8e97e2fe2f589164978602adf94
Parents: df9f481
Author: r-pogalz <r....@campus.tu-berlin.de>
Authored: Mon Aug 3 12:59:48 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 23:51:27 2015 +0200
----------------------------------------------------------------------
.../operators/sort/AbstractMergeIterator.java | 58 +--
.../sort/AbstractMergeOuterJoinIterator.java | 189 ++++++++
.../sort/NonReusingMergeOuterJoinIterator.java | 60 +++
.../sort/ReusingMergeOuterJoinIterator.java | 63 +++
...bstractSortMergeOuterJoinIteratorITCase.java | 462 +++++++++++++++++++
...ReusingSortMergeInnerJoinIteratorITCase.java | 4 +-
...ReusingSortMergeOuterJoinIteratorITCase.java | 82 ++++
...ReusingSortMergeInnerJoinIteratorITCase.java | 4 +-
...ReusingSortMergeOuterJoinIteratorITCase.java | 82 ++++
.../runtime/operators/testutils/Match.java | 2 +-
.../testutils/MatchRemovingJoiner.java | 58 +++
.../testutils/MatchRemovingMatcher.java | 58 ---
12 files changed, 1030 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
index 9a61c14..c01afc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
@@ -115,20 +115,20 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
}
/**
- * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come
- * from different inputs. The output of the <code>match()</code> method is forwarded.
+ * Calls the <code>JoinFunction#join()</code> method for all two key-value pairs that share the same key and come
+ * from different inputs. The output of the <code>join()</code> method is forwarded.
* <p>
* This method first zig-zags between the two sorted inputs in order to find a common
- * key, and then calls the match stub with the cross product of the values.
+ * key, and then calls the join stub with the cross product of the values.
*
* @throws Exception Forwards all exceptions from the user code and the I/O system.
* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
*/
@Override
- public abstract boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+ public abstract boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector)
throws Exception;
- protected void crossMatchingGroup(Iterator<T1> values1, Iterator<T2> values2, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) throws Exception {
+ protected void crossMatchingGroup(Iterator<T1> values1, Iterator<T2> values2, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws Exception {
final T1 firstV1 = values1.next();
final T2 firstV2 = values2.next();
@@ -143,23 +143,23 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
if (v2HasNext) {
// both sides contain more than one value
// TODO: Decide which side to spill and which to block!
- crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector);
+ crossMwithNValues(firstV1, values1, firstV2, values2, joinFunction, collector);
} else {
- crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector);
+ crossSecond1withNValues(firstV2, firstV1, values1, joinFunction, collector);
}
} else {
if (v2HasNext) {
- crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector);
+ crossFirst1withNValues(firstV1, firstV2, values2, joinFunction, collector);
} else {
// both sides contain only one value
- matchFunction.join(firstV1, firstV2, collector);
+ joinFunction.join(firstV1, firstV2, collector);
}
}
}
/**
* Crosses a single value from the first input with N values, all sharing a common key.
- * Effectively realizes a <i>1:N</i> match (join).
+ * Effectively realizes a <i>1:N</i> join.
*
* @param val1 The value form the <i>1</i> side.
* @param firstValN The first of the values from the <i>N</i> side.
@@ -167,21 +167,21 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
* @throws Exception Forwards all exceptions thrown by the stub.
*/
private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
- final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+ final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector)
throws Exception {
T1 copy1 = createCopy(serializer1, val1, this.copy1);
- matchFunction.join(copy1, firstValN, collector);
+ joinFunction.join(copy1, firstValN, collector);
- // set copy and match first element
+ // set copy and join first element
boolean more = true;
do {
final T2 nRec = valsN.next();
if (valsN.hasNext()) {
copy1 = createCopy(serializer1, val1, this.copy1);
- matchFunction.join(copy1, nRec, collector);
+ joinFunction.join(copy1, nRec, collector);
} else {
- matchFunction.join(val1, nRec, collector);
+ joinFunction.join(val1, nRec, collector);
more = false;
}
}
@@ -190,7 +190,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
/**
* Crosses a single value from the second side with N values, all sharing a common key.
- * Effectively realizes a <i>N:1</i> match (join).
+ * Effectively realizes a <i>N:1</i> join.
*
* @param val1 The value form the <i>1</i> side.
* @param firstValN The first of the values from the <i>N</i> side.
@@ -198,20 +198,20 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
* @throws Exception Forwards all exceptions thrown by the stub.
*/
private void crossSecond1withNValues(T2 val1, T1 firstValN,
- Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) throws Exception {
+ Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws Exception {
T2 copy2 = createCopy(serializer2, val1, this.copy2);
- matchFunction.join(firstValN, copy2, collector);
+ joinFunction.join(firstValN, copy2, collector);
- // set copy and match first element
+ // set copy and join first element
boolean more = true;
do {
final T1 nRec = valsN.next();
if (valsN.hasNext()) {
copy2 = createCopy(serializer2, val1, this.copy2);
- matchFunction.join(nRec, copy2, collector);
+ joinFunction.join(nRec, copy2, collector);
} else {
- matchFunction.join(nRec, val1, collector);
+ joinFunction.join(nRec, val1, collector);
more = false;
}
}
@@ -220,7 +220,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
final T2 firstV2, final Iterator<T2> blockVals,
- final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) throws Exception {
+ final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector) throws Exception {
// ==================================================
// We have one first (head) element from both inputs (firstV1 and firstV2)
// We have an iterator for both inputs.
@@ -237,13 +237,13 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
// 5) cross the head of the spilling side with the next block
// 6) cross the spilling iterator with the next block.
- // match the first values first
+ // join the first values first
T1 copy1 = this.createCopy(serializer1, firstV1, this.copy1);
T2 blockHeadCopy = this.createCopy(serializer2, firstV2, this.blockHeadCopy);
T1 spillHeadCopy = null;
// --------------- 1) Cross the heads -------------------
- matchFunction.join(copy1, firstV2, collector);
+ joinFunction.join(copy1, firstV2, collector);
// for the remaining values, we do a block-nested-loops join
SpillingResettableIterator<T1> spillIt = null;
@@ -256,7 +256,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
while (this.blockIt.hasNext()) {
final T2 nextBlockRec = this.blockIt.next();
copy1 = this.createCopy(serializer1, firstV1, this.copy1);
- matchFunction.join(copy1, nextBlockRec, collector);
+ joinFunction.join(copy1, nextBlockRec, collector);
}
this.blockIt.reset();
@@ -286,7 +286,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
// -------- 3) cross the iterator of the spilling side with the head of the block side --------
T2 copy2 = this.createCopy(serializer2, blockHeadCopy, this.copy2);
- matchFunction.join(copy1, copy2, collector);
+ joinFunction.join(copy1, copy2, collector);
// -------- 4) cross the iterator of the spilling side with the first block --------
while (this.blockIt.hasNext()) {
@@ -294,7 +294,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
// get instances of key and block value
copy1 = this.createCopy(serializer1, nextSpillVal, this.copy1);
- matchFunction.join(copy1, nextBlockRec, collector);
+ joinFunction.join(copy1, nextBlockRec, collector);
}
// reset block iterator
this.blockIt.reset();
@@ -316,7 +316,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
while (this.blockIt.hasNext()) {
copy1 = this.createCopy(serializer1, spillHeadCopy, this.copy1);
final T2 nextBlockVal = blockIt.next();
- matchFunction.join(copy1, nextBlockVal, collector);
+ joinFunction.join(copy1, nextBlockVal, collector);
}
this.blockIt.reset();
@@ -329,7 +329,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
// get instances of key and block value
final T2 nextBlockVal = this.blockIt.next();
copy1 = this.createCopy(serializer1, nextSpillVal, this.copy1);
- matchFunction.join(copy1, nextBlockVal, collector);
+ joinFunction.join(copy1, nextBlockVal, collector);
}
// reset block iterator
http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
new file mode 100644
index 0000000..01b371e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ */
+public abstract class AbstractMergeOuterJoinIterator<T1, T2, O> extends AbstractMergeIterator<T1, T2, O> {
+
+ public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+ private final OuterJoinType outerJoinType;
+
+ private boolean initialized = false;
+ private boolean it1Empty = false;
+ private boolean it2Empty = false;
+
+
+ public AbstractMergeOuterJoinIterator(
+ OuterJoinType outerJoinType,
+ MutableObjectIterator<T1> input1,
+ MutableObjectIterator<T2> input2,
+ TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+ TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+ TypePairComparator<T1, T2> pairComparator,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ int numMemoryPages,
+ AbstractInvokable parentTask)
+ throws MemoryAllocationException {
+ super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+
+ this.outerJoinType = outerJoinType;
+ }
+
+ /**
+ * Calls the <code>JoinFunction#join()</code> method for all two key-value pairs that share the same key and come
+ * from different inputs. Furthermore, depending on the outer join type (LEFT, RIGHT, FULL), all key-value pairs where no
+ * matching partner from the other input exists are joined with null.
+ * The output of the <code>join()</code> method is forwarded.
+ *
+ * @throws Exception Forwards all exceptions from the user code and the I/O system.
+ * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
+ */
+ @Override
+ public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector) throws Exception {
+ if (!initialized) {
+ //first run, set iterators to first elements
+ it1Empty = !this.iterator1.nextKey();
+ it2Empty = !this.iterator2.nextKey();
+ initialized = true;
+ }
+
+ if (it1Empty && it2Empty) {
+ return false;
+ } else if (it2Empty) {
+ if (outerJoinType == OuterJoinType.LEFT || outerJoinType == OuterJoinType.FULL) {
+ joinLeftKeyValuesWithNull(iterator1.getValues(), joinFunction, collector);
+ it1Empty = !iterator1.nextKey();
+ return true;
+ } else {
+ //consume rest of left side
+ while (iterator1.nextKey()) ;
+ it1Empty = true;
+ return false;
+ }
+ } else if (it1Empty) {
+ if (outerJoinType == OuterJoinType.RIGHT || outerJoinType == OuterJoinType.FULL) {
+ joinRightKeyValuesWithNull(iterator2.getValues(), joinFunction, collector);
+ it2Empty = !iterator2.nextKey();
+ return true;
+ } else {
+ //consume rest of right side
+ while (iterator2.nextKey()) ;
+ it2Empty = true;
+ return false;
+ }
+ } else {
+ final TypePairComparator<T1, T2> comparator = super.pairComparator;
+ comparator.setReference(this.iterator1.getCurrent());
+ T2 current2 = this.iterator2.getCurrent();
+
+ // zig zag
+ while (true) {
+ // determine the relation between the (possibly composite) keys
+ final int comp = comparator.compareToReference(current2);
+
+ if (comp == 0) {
+ break;
+ }
+
+ if (comp < 0) {
+ //right key < left key
+ if (outerJoinType == OuterJoinType.RIGHT || outerJoinType == OuterJoinType.FULL) {
+ //join right key values with null in case of right or full outer join
+ joinRightKeyValuesWithNull(iterator2.getValues(), joinFunction, collector);
+ it2Empty = !iterator2.nextKey();
+ return true;
+ } else {
+ //skip this right key if it is a left outer join
+ if (!this.iterator2.nextKey()) {
+ //if right side is empty, join current left key values with null
+ joinLeftKeyValuesWithNull(iterator1.getValues(), joinFunction, collector);
+ it1Empty = !iterator1.nextKey();
+ it2Empty = true;
+ return true;
+ }
+ current2 = this.iterator2.getCurrent();
+ }
+ } else {
+ //right key > left key
+ if (outerJoinType == OuterJoinType.LEFT || outerJoinType == OuterJoinType.FULL) {
+ //join left key values with null in case of left or full outer join
+ joinLeftKeyValuesWithNull(iterator1.getValues(), joinFunction, collector);
+ it1Empty = !iterator1.nextKey();
+ return true;
+ } else {
+ //skip this left key if it is a right outer join
+ if (!this.iterator1.nextKey()) {
+ //if right side is empty, join current right key values with null
+ joinRightKeyValuesWithNull(iterator2.getValues(), joinFunction, collector);
+ it1Empty = true;
+ it2Empty = !iterator2.nextKey();
+ return true;
+ }
+ comparator.setReference(this.iterator1.getCurrent());
+ }
+ }
+ }
+
+ // here, we have a common key! call the join function with the cross product of the
+ // values
+ final Iterator<T1> values1 = this.iterator1.getValues();
+ final Iterator<T2> values2 = this.iterator2.getValues();
+
+ crossMatchingGroup(values1, values2, joinFunction, collector);
+ it1Empty = !iterator1.nextKey();
+ it2Empty = !iterator2.nextKey();
+ return true;
+ }
+ }
+
+ private void joinLeftKeyValuesWithNull(Iterator<T1> values, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws Exception {
+ while (values.hasNext()) {
+ T1 next = values.next();
+ this.copy1 = createCopy(serializer1, next, copy1);
+ joinFunction.join(copy1, null, collector);
+ }
+ }
+
+ private void joinRightKeyValuesWithNull(Iterator<T2> values, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws Exception {
+ while (values.hasNext()) {
+ T2 next = values.next();
+ this.copy2 = createCopy(serializer2, next, copy2);
+ joinFunction.join(null, copy2, collector);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
new file mode 100644
index 0000000..ac49ece
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class NonReusingMergeOuterJoinIterator<T1, T2, O> extends AbstractMergeOuterJoinIterator<T1, T2, O> {
+
+ public NonReusingMergeOuterJoinIterator(
+ OuterJoinType outerJoinType,
+ MutableObjectIterator<T1> input1,
+ MutableObjectIterator<T2> input2,
+ TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+ TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+ TypePairComparator<T1, T2> pairComparator,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ int numMemoryPages,
+ AbstractInvokable parentTask)
+ throws MemoryAllocationException {
+ super(outerJoinType, input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+ }
+
+ @Override
+ protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) {
+ return new NonReusingKeyGroupedIterator<T>(input, comparator);
+ }
+
+ @Override
+ protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) {
+ return serializer.copy(value);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
new file mode 100644
index 0000000..0cefbc5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class ReusingMergeOuterJoinIterator<T1, T2, O> extends AbstractMergeOuterJoinIterator<T1, T2, O> {
+
+ public ReusingMergeOuterJoinIterator(
+ OuterJoinType outerJoinType,
+ MutableObjectIterator<T1> input1,
+ MutableObjectIterator<T2> input2,
+ TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+ TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+ TypePairComparator<T1, T2> pairComparator,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ int numMemoryPages,
+ AbstractInvokable parentTask)
+ throws MemoryAllocationException {
+ super(outerJoinType, input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+
+ this.copy1 = serializer1.createInstance();
+ this.spillHeadCopy = serializer1.createInstance();
+ this.copy2 = serializer2.createInstance();
+ this.blockHeadCopy = serializer2.createInstance();
+ }
+
+ @Override
+ protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) {
+ return new ReusingKeyGroupedIterator<T>(input, serializer, comparator);
+ }
+
+ @Override
+ protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) { return serializer.copy(value, reuse); }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
new file mode 100644
index 0000000..1fbe025
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import org.apache.flink.runtime.operators.testutils.*;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleConstantValueIterator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGeneratorIterator;
+import org.apache.flink.runtime.util.ResettableMutableObjectIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+public abstract class AbstractSortMergeOuterJoinIteratorITCase {
+
+ // total memory
+ private static final int MEMORY_SIZE = 1024 * 1024 * 16;
+ private static final int PAGES_FOR_BNLJN = 2;
+
+ // the size of the left and right inputs
+ private static final int INPUT_1_SIZE = 20000;
+
+ private static final int INPUT_2_SIZE = 1000;
+
+ // random seeds for the left and right input data generators
+ private static final long SEED1 = 561349061987311L;
+
+ private static final long SEED2 = 231434613412342L;
+
+ // dummy abstract task
+ private final AbstractInvokable parentTask = new DummyInvokable();
+
+ private IOManager ioManager;
+ private MemoryManager memoryManager;
+
+ private TupleTypeInfo<Tuple2<String, String>> typeInfo1;
+ private TupleTypeInfo<Tuple2<String, Integer>> typeInfo2;
+ private TupleSerializer<Tuple2<String, String>> serializer1;
+ private TupleSerializer<Tuple2<String, Integer>> serializer2;
+ private TypeComparator<Tuple2<String, String>> comparator1;
+ private TypeComparator<Tuple2<String, Integer>> comparator2;
+ private TypePairComparator<Tuple2<String, String>, Tuple2<String, Integer>> pairComp;
+
+
+ @Before
+ public void beforeTest() {
+ ExecutionConfig config = new ExecutionConfig();
+ config.disableObjectReuse();
+
+ typeInfo1 = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
+ typeInfo2 = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class);
+ serializer1 = typeInfo1.createSerializer(config);
+ serializer2 = typeInfo2.createSerializer(config);
+ comparator1 = typeInfo1.createComparator(new int[]{0}, new boolean[]{true}, 0, config);
+ comparator2 = typeInfo2.createComparator(new int[]{0}, new boolean[]{true}, 0, config);
+ pairComp = new GenericPairComparator<Tuple2<String, String>, Tuple2<String, Integer>>(comparator1, comparator2);
+
+ this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+ this.ioManager = new IOManagerAsync();
+ }
+
+ @After
+ public void afterTest() {
+ if (this.ioManager != null) {
+ this.ioManager.shutdown();
+ if (!this.ioManager.isProperlyShutDown()) {
+ Assert.fail("I/O manager failed to properly shut down.");
+ }
+ this.ioManager = null;
+ }
+
+ if (this.memoryManager != null) {
+ Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+ this.memoryManager.verifyEmpty());
+ this.memoryManager.shutdown();
+ this.memoryManager = null;
+ }
+ }
+
+ protected void testFullOuterWithSample() throws Exception {
+ CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of(
+ new Tuple2<String, String>("Jack", "Engineering"),
+ new Tuple2<String, String>("Tim", "Sales"),
+ new Tuple2<String, String>("Zed", "HR")
+ );
+ CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of(
+ new Tuple2<String, Integer>("Allison", 100),
+ new Tuple2<String, Integer>("Jack", 200),
+ new Tuple2<String, Integer>("Zed", 150),
+ new Tuple2<String, Integer>("Zed", 250)
+ );
+
+ OuterJoinType outerJoinType = OuterJoinType.FULL;
+ List<Tuple4<String, String, String, Object>> actual = computeOuterJoin(input1, input2, outerJoinType);
+
+ List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
+ new Tuple4<String, String, String, Object>(null, null, "Allison", 100),
+ new Tuple4<String, String, String, Object>("Jack", "Engineering", "Jack", 200),
+ new Tuple4<String, String, String, Object>("Tim", "Sales", null, null),
+ new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 150),
+ new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 250)
+ );
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ protected void testLeftOuterWithSample() throws Exception {
+ CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of(
+ new Tuple2<String, String>("Jack", "Engineering"),
+ new Tuple2<String, String>("Tim", "Sales"),
+ new Tuple2<String, String>("Zed", "HR")
+ );
+ CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of(
+ new Tuple2<String, Integer>("Allison", 100),
+ new Tuple2<String, Integer>("Jack", 200),
+ new Tuple2<String, Integer>("Zed", 150),
+ new Tuple2<String, Integer>("Zed", 250)
+ );
+
+ List<Tuple4<String, String, String, Object>> actual = computeOuterJoin(input1, input2, OuterJoinType.LEFT);
+
+ List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
+ new Tuple4<String, String, String, Object>("Jack", "Engineering", "Jack", 200),
+ new Tuple4<String, String, String, Object>("Tim", "Sales", null, null),
+ new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 150),
+ new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 250)
+ );
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ protected void testRightOuterWithSample() throws Exception {
+ CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of(
+ new Tuple2<String, String>("Jack", "Engineering"),
+ new Tuple2<String, String>("Tim", "Sales"),
+ new Tuple2<String, String>("Zed", "HR")
+ );
+ CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of(
+ new Tuple2<String, Integer>("Allison", 100),
+ new Tuple2<String, Integer>("Jack", 200),
+ new Tuple2<String, Integer>("Zed", 150),
+ new Tuple2<String, Integer>("Zed", 250)
+ );
+
+ List<Tuple4<String, String, String, Object>> actual = computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
+
+ List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
+ new Tuple4<String, String, String, Object>(null, null, "Allison", 100),
+ new Tuple4<String, String, String, Object>("Jack", "Engineering", "Jack", 200),
+ new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 150),
+ new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 250)
+ );
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ protected void testRightSideEmpty() throws Exception {
+ CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of(
+ new Tuple2<String, String>("Jack", "Engineering"),
+ new Tuple2<String, String>("Tim", "Sales"),
+ new Tuple2<String, String>("Zed", "HR")
+ );
+ CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of();
+
+ List<Tuple4<String, String, String, Object>> actualLeft = computeOuterJoin(input1, input2, OuterJoinType.LEFT);
+ List<Tuple4<String, String, String, Object>> actualRight = computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
+ List<Tuple4<String, String, String, Object>> actualFull = computeOuterJoin(input1, input2, OuterJoinType.FULL);
+
+ List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
+ new Tuple4<String, String, String, Object>("Jack", "Engineering", null, null),
+ new Tuple4<String, String, String, Object>("Tim", "Sales", null, null),
+ new Tuple4<String, String, String, Object>("Zed", "HR", null, null)
+ );
+
+ Assert.assertEquals(expected, actualLeft);
+ Assert.assertEquals(expected, actualFull);
+ Assert.assertEquals(Collections.<Tuple4<String,String,String,Object>>emptyList(), actualRight);
+ }
+
+ protected void testLeftSideEmpty() throws Exception {
+ CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of();
+ CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of(
+ new Tuple2<String, Integer>("Allison", 100),
+ new Tuple2<String, Integer>("Jack", 200),
+ new Tuple2<String, Integer>("Zed", 150),
+ new Tuple2<String, Integer>("Zed", 250)
+ );
+
+ List<Tuple4<String, String, String, Object>> actualLeft = computeOuterJoin(input1, input2, OuterJoinType.LEFT);
+ List<Tuple4<String, String, String, Object>> actualRight = computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
+ List<Tuple4<String, String, String, Object>> actualFull = computeOuterJoin(input1, input2, OuterJoinType.FULL);
+
+ List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
+ new Tuple4<String, String, String, Object>(null, null, "Allison", 100),
+ new Tuple4<String, String, String, Object>(null, null, "Jack", 200),
+ new Tuple4<String, String, String, Object>(null, null, "Zed", 150),
+ new Tuple4<String, String, String, Object>(null, null, "Zed", 250)
+ );
+
+ Assert.assertEquals(Collections.<Tuple4<String,String,String,Object>>emptyList(), actualLeft);
+ Assert.assertEquals(expected, actualRight);
+ Assert.assertEquals(expected, actualFull);
+ }
+
+ private List<Tuple4<String, String, String, Object>> computeOuterJoin(ResettableMutableObjectIterator<Tuple2<String, String>> input1,
+ ResettableMutableObjectIterator<Tuple2<String, Integer>> input2,
+ OuterJoinType outerJoinType) throws Exception {
+ input1.reset();
+ input2.reset();
+ AbstractMergeOuterJoinIterator<Tuple2<String, String>, Tuple2<String, Integer>, Tuple4<String, String, String, Object>> iterator =
+ createOuterJoinIterator(outerJoinType, input1, input2, serializer1, comparator1, serializer2, comparator2,
+ pairComp, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+
+ List<Tuple4<String, String, String, Object>> actual = new ArrayList<Tuple4<String, String, String, Object>>();
+ ListCollector<Tuple4<String, String, String, Object>> collector = new ListCollector<Tuple4<String, String, String, Object>>(actual);
+ while (iterator.callWithNextKey(new SimpleTupleJoinFunction(), collector)) ;
+ iterator.close();
+
+ return actual;
+ }
+
+ protected void testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType outerJoinType, int input1Size, int input1Duplicates, int input1ValueLength,
+ float input1KeyDensity, int input2Size, int input2Duplicates, int input2ValueLength, float input2KeyDensity) {
+ TypeSerializer<Tuple2<Integer, String>> serializer1 = new TupleSerializer<Tuple2<Integer, String>>(
+ (Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class,
+ new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+ TypeSerializer<Tuple2<Integer, String>> serializer2 = new TupleSerializer<Tuple2<Integer, String>>(
+ (Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class,
+ new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+ TypeComparator<Tuple2<Integer, String>> comparator1 = new TupleComparator<Tuple2<Integer, String>>(
+ new int[]{0},
+ new TypeComparator<?>[] { new IntComparator(true) },
+ new TypeSerializer<?>[] { IntSerializer.INSTANCE });
+ TypeComparator<Tuple2<Integer, String>> comparator2 = new TupleComparator<Tuple2<Integer, String>>(
+ new int[]{0},
+ new TypeComparator<?>[] { new IntComparator(true) },
+ new TypeSerializer<?>[] { IntSerializer.INSTANCE });
+
+ TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator =
+ new GenericPairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>>(comparator1, comparator2);
+
+ this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+ this.ioManager = new IOManagerAsync();
+
+ final int DUPLICATE_KEY = 13;
+
+ try {
+ final TupleGenerator generator1 = new TupleGenerator(SEED1, 500, input1KeyDensity, input1ValueLength, KeyMode.SORTED_SPARSE, ValueMode.RANDOM_LENGTH, null);
+ final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, input2KeyDensity, input2ValueLength, KeyMode.SORTED_SPARSE, ValueMode.RANDOM_LENGTH, null);
+
+ final TupleGeneratorIterator gen1Iter = new TupleGeneratorIterator(generator1, input1Size);
+ final TupleGeneratorIterator gen2Iter = new TupleGeneratorIterator(generator2, input2Size);
+
+ final TupleConstantValueIterator const1Iter = new TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", input1Duplicates);
+ final TupleConstantValueIterator const2Iter = new TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", input2Duplicates);
+
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>();
+ inList1.add(gen1Iter);
+ inList1.add(const1Iter);
+
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>();
+ inList2.add(gen2Iter);
+ inList2.add(const2Iter);
+
+ MutableObjectIterator<Tuple2<Integer, String>> input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
+ MutableObjectIterator<Tuple2<Integer, String>> input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
+
+ // collect expected data
+ final Map<Integer, Collection<Match>> expectedMatchesMap = joinValues(
+ collectData(input1),
+ collectData(input2),
+ outerJoinType);
+
+ // re-create the whole thing for actual processing
+
+ // reset the generators and iterators
+ generator1.reset();
+ generator2.reset();
+ const1Iter.reset();
+ const2Iter.reset();
+ gen1Iter.reset();
+ gen2Iter.reset();
+
+ inList1.clear();
+ inList1.add(gen1Iter);
+ inList1.add(const1Iter);
+
+ inList2.clear();
+ inList2.add(gen2Iter);
+ inList2.add(const2Iter);
+
+ input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
+ input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
+
+ final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
+ new MatchRemovingJoiner(expectedMatchesMap);
+
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
+
+
+ // we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it
+ // needs to spill for the duplicate keys
+ AbstractMergeOuterJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ createOuterJoinIterator(
+ outerJoinType, input1, input2, serializer1, comparator1, serializer2, comparator2,
+ pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+
+ iterator.open();
+
+ while (iterator.callWithNextKey(joinFunction, collector)) ;
+
+ iterator.close();
+
+ // assert that each expected match was seen
+ for (Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An exception occurred during the test: " + e.getMessage());
+ }
+ }
+
+ protected abstract <T1, T2> AbstractMergeOuterJoinIterator createOuterJoinIterator(OuterJoinType outerJoinType,
+ MutableObjectIterator<T1> input1,
+ MutableObjectIterator<T2> input2,
+ TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+ TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+ TypePairComparator<T1, T2> pairComparator,
+ MemoryManager memoryManager,
+ IOManager ioManager,
+ int numMemoryPages,
+ AbstractInvokable parentTask) throws Exception;
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+
+ private Map<Integer, Collection<Match>> joinValues(
+ Map<Integer, Collection<String>> leftMap,
+ Map<Integer, Collection<String>> rightMap,
+ OuterJoinType outerJoinType) {
+ Map<Integer, Collection<Match>> map = new HashMap<Integer, Collection<Match>>();
+
+ for (Integer key : leftMap.keySet()) {
+ Collection<String> leftValues = leftMap.get(key);
+ Collection<String> rightValues = rightMap.get(key);
+
+ if (outerJoinType == OuterJoinType.RIGHT && rightValues == null) {
+ continue;
+ }
+
+ if (!map.containsKey(key)) {
+ map.put(key, new ArrayList<Match>());
+ }
+
+ Collection<Match> joinedValues = map.get(key);
+
+ for (String leftValue : leftValues) {
+ if (rightValues != null) {
+ for (String rightValue : rightValues) {
+ joinedValues.add(new Match(leftValue, rightValue));
+ }
+ } else {
+ joinedValues.add(new Match(leftValue, null));
+ }
+ }
+ }
+
+ if (outerJoinType == OuterJoinType.RIGHT || outerJoinType == OuterJoinType.FULL) {
+ for (Integer key : rightMap.keySet()) {
+ Collection<String> leftValues = leftMap.get(key);
+ Collection<String> rightValues = rightMap.get(key);
+
+ if (leftValues != null) {
+ continue;
+ }
+
+ if (!map.containsKey(key)) {
+ map.put(key, new ArrayList<Match>());
+ }
+
+ Collection<Match> joinedValues = map.get(key);
+
+ for (String rightValue : rightValues) {
+ joinedValues.add(new Match(null, rightValue));
+ }
+ }
+ }
+
+ return map;
+ }
+
+
+ private Map<Integer, Collection<String>> collectData(MutableObjectIterator<Tuple2<Integer, String>> iter)
+ throws Exception {
+ final Map<Integer, Collection<String>> map = new HashMap<Integer, Collection<String>>();
+ Tuple2<Integer, String> pair = new Tuple2<Integer, String>();
+
+ while ((pair = iter.next(pair)) != null) {
+ final Integer key = pair.getField(0);
+
+ if (!map.containsKey(key)) {
+ map.put(key, new ArrayList<String>());
+ }
+
+ Collection<String> values = map.get(key);
+ final String value = pair.getField(1);
+ values.add(value);
+ }
+
+ return map;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
index 7fc3734..6548052 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
@@ -135,7 +135,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase {
collectData(input2));
final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
- new MatchRemovingMatcher(expectedMatchesMap);
+ new MatchRemovingJoiner(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
@@ -226,7 +226,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase {
input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
- final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = new MatchRemovingMatcher(expectedMatchesMap);
+ final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = new MatchRemovingJoiner(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
new file mode 100644
index 0000000..1205bc1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+public class NonReusingSortMergeOuterJoinIteratorITCase extends AbstractSortMergeOuterJoinIteratorITCase {
+
+ @Override
+ protected <T1, T2> AbstractMergeOuterJoinIterator createOuterJoinIterator(OuterJoinType outerJoinType, MutableObjectIterator<T1> input1,
+ MutableObjectIterator<T2> input2, TypeSerializer<T1> serializer1,
+ TypeComparator<T1> comparator1, TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+ TypePairComparator<T1, T2> pairComparator, MemoryManager memoryManager, IOManager ioManager,
+ int numMemoryPages, AbstractInvokable parentTask) throws Exception {
+ return new NonReusingMergeOuterJoinIterator(outerJoinType, input1, input2, serializer1, comparator1,
+ serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+ }
+
+ @Test
+ public void testFullOuterWithSample() throws Exception {
+ super.testFullOuterWithSample();
+ }
+
+ @Test
+ public void testLeftOuterWithSample() throws Exception {
+ super.testLeftOuterWithSample();
+ }
+
+ @Test
+ public void testRightOuterWithSample() throws Exception {
+ super.testRightOuterWithSample();
+ }
+
+ @Test
+ public void testRightSideEmpty() throws Exception {
+ super.testRightSideEmpty();
+ }
+
+ @Test
+ public void testLeftSideEmpty() throws Exception {
+ super.testLeftSideEmpty();
+ }
+
+ @Test
+ public void testFullOuterJoinWithHighNumberOfCommonKeys() {
+ testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.FULL, 200, 500, 2048, 0.02f, 200, 500, 2048, 0.02f);
+ }
+
+ @Test
+ public void testLeftOuterJoinWithHighNumberOfCommonKeys() {
+ testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.LEFT, 200, 10, 4096, 0.02f, 100, 4000, 2048, 0.02f);
+ }
+
+ @Test
+ public void testRightOuterJoinWithHighNumberOfCommonKeys() {
+ testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.RIGHT, 100, 10, 2048, 0.02f, 200, 4000, 4096, 0.02f);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
index e4eec86..39316e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
@@ -135,7 +135,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase {
collectData(input2));
final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
- new MatchRemovingMatcher(expectedMatchesMap);
+ new MatchRemovingJoiner(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
@@ -226,7 +226,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase {
input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
- final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new MatchRemovingMatcher(expectedMatchesMap);
+ final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new MatchRemovingJoiner(expectedMatchesMap);
final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
new file mode 100644
index 0000000..b4fbd80
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+public class ReusingSortMergeOuterJoinIteratorITCase extends AbstractSortMergeOuterJoinIteratorITCase {
+
+ @Override
+ protected <T1, T2> AbstractMergeOuterJoinIterator createOuterJoinIterator(OuterJoinType outerJoinType, MutableObjectIterator<T1> input1,
+ MutableObjectIterator<T2> input2, TypeSerializer<T1> serializer1,
+ TypeComparator<T1> comparator1, TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+ TypePairComparator<T1, T2> pairComparator, MemoryManager memoryManager, IOManager ioManager,
+ int numMemoryPages, AbstractInvokable parentTask) throws Exception {
+ return new ReusingMergeOuterJoinIterator(outerJoinType, input1, input2, serializer1, comparator1,
+ serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+ }
+
+ @Test
+ public void testFullOuterWithSample() throws Exception {
+ super.testFullOuterWithSample();
+ }
+
+ @Test
+ public void testLeftOuterWithSample() throws Exception {
+ super.testLeftOuterWithSample();
+ }
+
+ @Test
+ public void testRightOuterWithSample() throws Exception {
+ super.testRightOuterWithSample();
+ }
+
+ @Test
+ public void testRightSideEmpty() throws Exception {
+ super.testRightSideEmpty();
+ }
+
+ @Test
+ public void testLeftSideEmpty() throws Exception {
+ super.testLeftSideEmpty();
+ }
+
+ @Test
+ public void testFullOuterJoinWithHighNumberOfCommonKeys() {
+ testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.FULL, 200, 500, 2048, 0.02f, 200, 500, 2048, 0.02f);
+ }
+
+ @Test
+ public void testLeftOuterJoinWithHighNumberOfCommonKeys() {
+ testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.LEFT, 200, 10, 4096, 0.02f, 100, 4000, 2048, 0.02f);
+ }
+
+ @Test
+ public void testRightOuterJoinWithHighNumberOfCommonKeys() {
+ testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.RIGHT, 100, 10, 2048, 0.02f, 200, 4000, 4096, 0.02f);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
index 539d864..4ac9093 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators.testutils;
/**
* Utility class for keeping track of matches in join operator tests.
*
- * @see org.apache.flink.runtime.operators.testutils.MatchRemovingMatcher
+ * @see MatchRemovingJoiner
*/
public class Match {
private final String left;
http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java
new file mode 100644
index 0000000..e588d92
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.Collection;
+import java.util.Map;
+
+
+public final class MatchRemovingJoiner implements FlatJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple2<Integer,String>> {
+ private static final long serialVersionUID = 1L;
+
+ private final Map<Integer, Collection<Match>> toRemoveFrom;
+
+ public MatchRemovingJoiner(Map<Integer, Collection<Match>> map) {
+ this.toRemoveFrom = map;
+ }
+
+ @Override
+ public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception {
+ final Integer key = rec1 != null ? (Integer) rec1.getField(0) : (Integer) rec2.getField(0);
+ final String value1 = rec1 != null ? (String) rec1.getField(1) : null;
+ final String value2 = rec2 != null ? (String) rec2.getField(1) : null;
+
+ Collection<Match> matches = this.toRemoveFrom.get(key);
+ if (matches == null) {
+ Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
+ }
+
+ boolean contained = matches.remove(new Match(value1, value2));
+ if (!contained) {
+ Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2);
+ }
+ if (matches.isEmpty()) {
+ this.toRemoveFrom.remove(key);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
deleted file mode 100644
index f69b4d7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.testutils;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-import java.util.Collection;
-import java.util.Map;
-
-
-public final class MatchRemovingMatcher implements FlatJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple2<Integer,String>> {
- private static final long serialVersionUID = 1L;
-
- private final Map<Integer, Collection<Match>> toRemoveFrom;
-
- public MatchRemovingMatcher(Map<Integer, Collection<Match>> map) {
- this.toRemoveFrom = map;
- }
-
- @Override
- public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception {
- final Integer key = rec1 != null ? (Integer) rec1.getField(0) : (Integer) rec2.getField(0);
- final String value1 = rec1 != null ? (String) rec1.getField(1) : null;
- final String value2 = rec2 != null ? (String) rec2.getField(1) : null;
-
- Collection<Match> matches = this.toRemoveFrom.get(key);
- if (matches == null) {
- Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
- }
-
- boolean contained = matches.remove(new Match(value1, value2));
- if (!contained) {
- Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2);
- }
- if (matches.isEmpty()) {
- this.toRemoveFrom.remove(key);
- }
- }
-}