You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/08/05 00:41:06 UTC

[1/5] flink git commit: [FLINK-2105] Extract abstract superclass, interface from MergeMatchIterators, KeyGroupedIterators

Repository: flink
Updated Branches:
  refs/heads/master 30761572b -> 941ac6dfd


[FLINK-2105] Extract abstract superclass, interface from MergeMatchIterators, KeyGroupedIterators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0dc6849a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0dc6849a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0dc6849a

Branch: refs/heads/master
Commit: 0dc6849a594b61a6cad8ee582ca1758f0349a72b
Parents: 3076157
Author: Johann Kovacs <me...@jkovacs.de>
Authored: Fri Jul 10 17:21:58 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 21:35:26 2015 +0200

----------------------------------------------------------------------
 .../operators/sort/AbstractMergeIterator.java   | 356 +++++++++++++++++
 .../sort/AbstractMergeMatchIterator.java        | 107 +++++
 .../sort/NonReusingMergeMatchIterator.java      | 382 +-----------------
 .../sort/ReusingMergeMatchIterator.java         | 389 +------------------
 .../flink/runtime/util/KeyGroupedIterator.java  |  31 ++
 .../util/NonReusingKeyGroupedIterator.java      |   2 +-
 .../runtime/util/ReusingKeyGroupedIterator.java |   5 +-
 7 files changed, 517 insertions(+), 755 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
new file mode 100644
index 0000000..9a61c14
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
+import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> {
+
+	private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+	protected TypePairComparator<T1, T2> pairComparator;
+
+	protected KeyGroupedIterator<T1> iterator1;
+	protected KeyGroupedIterator<T2> iterator2;
+
+	protected final TypeSerializer<T1> serializer1;
+	protected final TypeSerializer<T2> serializer2;
+
+	private final NonReusingBlockResettableIterator<T2> blockIt;    // for N:M cross products with same key
+
+	private final IOManager ioManager;
+	private final MemoryManager memoryManager;
+	private final List<MemorySegment> memoryForSpillingIterator;
+
+	// instances for object reuse
+	protected T1 copy1;
+	protected T1 spillHeadCopy;
+	protected T2 copy2;
+	protected T2 blockHeadCopy;
+
+	public AbstractMergeIterator(MutableObjectIterator<T1> input1, MutableObjectIterator<T2> input2,
+								TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+								TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+								TypePairComparator<T1, T2> pairComparator,
+								MemoryManager memoryManager,
+								IOManager ioManager,
+								int numMemoryPages,
+								AbstractInvokable parentTask) throws MemoryAllocationException {
+		if (numMemoryPages < 2) {
+			throw new IllegalArgumentException("Merger needs at least 2 memory pages.");
+		}
+
+		this.pairComparator = pairComparator;
+		this.serializer1 = serializer1;
+		this.serializer2 = serializer2;
+
+		this.memoryManager = memoryManager;
+		this.ioManager = ioManager;
+
+		this.iterator1 = createKeyGroupedIterator(input1, serializer1, comparator1.duplicate());
+		this.iterator2 = createKeyGroupedIterator(input2, serializer2, comparator2.duplicate());
+
+		final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
+		this.blockIt = new NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,
+				(numMemoryPages - numPagesForSpiller), parentTask);
+		this.memoryForSpillingIterator = memoryManager.allocatePages(parentTask, numPagesForSpiller);
+	}
+
+	@Override
+	public void open() throws IOException {
+	}
+
+	@Override
+	public void close() {
+		if (this.blockIt != null) {
+			try {
+				this.blockIt.close();
+			} catch (Throwable t) {
+				LOG.error("Error closing block memory iterator: " + t.getMessage(), t);
+			}
+		}
+
+		this.memoryManager.release(this.memoryForSpillingIterator);
+	}
+
+	@Override
+	public void abort() {
+		close();
+	}
+
+	/**
+	 * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come
+	 * from different inputs. The output of the <code>match()</code> method is forwarded.
+	 * <p>
+	 * This method first zig-zags between the two sorted inputs in order to find a common
+	 * key, and then calls the match stub with the cross product of the values.
+	 *
+	 * @throws Exception Forwards all exceptions from the user code and the I/O system.
+	 * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
+	 */
+	@Override
+	public abstract boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+			throws Exception;
+
+	protected void crossMatchingGroup(Iterator<T1> values1, Iterator<T2> values2, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) throws Exception {
+		final T1 firstV1 = values1.next();
+		final T2 firstV2 = values2.next();
+
+		final boolean v1HasNext = values1.hasNext();
+		final boolean v2HasNext = values2.hasNext();
+
+		// check if one side is already empty
+		// this check could be omitted if we put this in MatchTask.
+		// then we can derive the local strategy (with build side).
+
+		if (v1HasNext) {
+			if (v2HasNext) {
+				// both sides contain more than one value
+				// TODO: Decide which side to spill and which to block!
+				crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector);
+			} else {
+				crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector);
+			}
+		} else {
+			if (v2HasNext) {
+				crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector);
+			} else {
+				// both sides contain only one value
+				matchFunction.join(firstV1, firstV2, collector);
+			}
+		}
+	}
+
+	/**
+	 * Crosses a single value from the first input with N values, all sharing a common key.
+	 * Effectively realizes a <i>1:N</i> match (join).
+	 *
+	 * @param val1      The value form the <i>1</i> side.
+	 * @param firstValN The first of the values from the <i>N</i> side.
+	 * @param valsN     Iterator over remaining <i>N</i> side values.
+	 * @throws Exception Forwards all exceptions thrown by the stub.
+	 */
+	private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
+										final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+			throws Exception {
+		T1 copy1 = createCopy(serializer1, val1, this.copy1);
+		matchFunction.join(copy1, firstValN, collector);
+
+		// set copy and match first element
+		boolean more = true;
+		do {
+			final T2 nRec = valsN.next();
+
+			if (valsN.hasNext()) {
+				copy1 = createCopy(serializer1, val1, this.copy1);
+				matchFunction.join(copy1, nRec, collector);
+			} else {
+				matchFunction.join(val1, nRec, collector);
+				more = false;
+			}
+		}
+		while (more);
+	}
+
+	/**
+	 * Crosses a single value from the second side with N values, all sharing a common key.
+	 * Effectively realizes a <i>N:1</i> match (join).
+	 *
+	 * @param val1      The value form the <i>1</i> side.
+	 * @param firstValN The first of the values from the <i>N</i> side.
+	 * @param valsN     Iterator over remaining <i>N</i> side values.
+	 * @throws Exception Forwards all exceptions thrown by the stub.
+	 */
+	private void crossSecond1withNValues(T2 val1, T1 firstValN,
+										Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) throws Exception {
+		T2 copy2 = createCopy(serializer2, val1, this.copy2);
+		matchFunction.join(firstValN, copy2, collector);
+
+		// set copy and match first element
+		boolean more = true;
+		do {
+			final T1 nRec = valsN.next();
+
+			if (valsN.hasNext()) {
+				copy2 = createCopy(serializer2, val1, this.copy2);
+				matchFunction.join(nRec, copy2, collector);
+			} else {
+				matchFunction.join(nRec, val1, collector);
+				more = false;
+			}
+		}
+		while (more);
+	}
+
+	private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
+									final T2 firstV2, final Iterator<T2> blockVals,
+									final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) throws Exception {
+		// ==================================================
+		// We have one first (head) element from both inputs (firstV1 and firstV2)
+		// We have an iterator for both inputs.
+		// we make the V1 side the spilling side and the V2 side the blocking side.
+		// In order to get the full cross product without unnecessary spilling, we do the
+		// following:
+		// 1) cross the heads
+		// 2) cross the head of the spilling side against the first block of the blocking side
+		// 3) cross the iterator of the spilling side with the head of the block side
+		// 4) cross the iterator of the spilling side with the first block
+		// ---------------------------------------------------
+		// If the blocking side has more than one block, we really need to make the spilling side fully
+		// resettable. For each further block on the block side, we do:
+		// 5) cross the head of the spilling side with the next block
+		// 6) cross the spilling iterator with the next block.
+
+		// match the first values first
+		T1 copy1 = this.createCopy(serializer1, firstV1, this.copy1);
+		T2 blockHeadCopy = this.createCopy(serializer2, firstV2, this.blockHeadCopy);
+		T1 spillHeadCopy = null;
+
+		// --------------- 1) Cross the heads -------------------
+		matchFunction.join(copy1, firstV2, collector);
+
+		// for the remaining values, we do a block-nested-loops join
+		SpillingResettableIterator<T1> spillIt = null;
+
+		try {
+			// create block iterator on the second input
+			this.blockIt.reopen(blockVals);
+
+			// ------------- 2) cross the head of the spilling side with the first block ------------------
+			while (this.blockIt.hasNext()) {
+				final T2 nextBlockRec = this.blockIt.next();
+				copy1 = this.createCopy(serializer1, firstV1, this.copy1);
+				matchFunction.join(copy1, nextBlockRec, collector);
+			}
+			this.blockIt.reset();
+
+			// spilling is required if the blocked input has data beyond the current block.
+			// in that case, create the spilling iterator
+			final Iterator<T1> leftSideIter;
+			final boolean spillingRequired = this.blockIt.hasFurtherInput();
+			if (spillingRequired) {
+				// more data than would fit into one block. we need to wrap the other side in a spilling iterator
+				// create spilling iterator on first input
+				spillIt = new SpillingResettableIterator<T1>(spillVals, this.serializer1,
+						this.memoryManager, this.ioManager, this.memoryForSpillingIterator);
+				leftSideIter = spillIt;
+				spillIt.open();
+
+				spillHeadCopy = this.createCopy(serializer1, firstV1, this.spillHeadCopy);
+			} else {
+				leftSideIter = spillVals;
+			}
+
+			// cross the values in the v1 iterator against the current block
+
+			while (leftSideIter.hasNext()) {
+				final T1 nextSpillVal = leftSideIter.next();
+				copy1 = this.createCopy(serializer1, nextSpillVal, this.copy1);
+
+
+				// -------- 3) cross the iterator of the spilling side with the head of the block side --------
+				T2 copy2 = this.createCopy(serializer2, blockHeadCopy, this.copy2);
+				matchFunction.join(copy1, copy2, collector);
+
+				// -------- 4) cross the iterator of the spilling side with the first block --------
+				while (this.blockIt.hasNext()) {
+					T2 nextBlockRec = this.blockIt.next();
+
+					// get instances of key and block value
+					copy1 = this.createCopy(serializer1, nextSpillVal, this.copy1);
+					matchFunction.join(copy1, nextBlockRec, collector);
+				}
+				// reset block iterator
+				this.blockIt.reset();
+			}
+
+			// if everything from the block-side fit into a single block, we are done.
+			// note that in this special case, we did not create a spilling iterator at all
+			if (!spillingRequired) {
+				return;
+			}
+
+			// here we are, because we have more blocks on the block side
+			// loop as long as there are blocks from the blocked input
+			while (this.blockIt.nextBlock()) {
+				// rewind the spilling iterator
+				spillIt.reset();
+
+				// ------------- 5) cross the head of the spilling side with the next block ------------
+				while (this.blockIt.hasNext()) {
+					copy1 = this.createCopy(serializer1, spillHeadCopy, this.copy1);
+					final T2 nextBlockVal = blockIt.next();
+					matchFunction.join(copy1, nextBlockVal, collector);
+				}
+				this.blockIt.reset();
+
+				// -------- 6) cross the spilling iterator with the next block. ------------------
+				while (spillIt.hasNext()) {
+					// get value from resettable iterator
+					final T1 nextSpillVal = spillIt.next();
+					// cross value with block values
+					while (this.blockIt.hasNext()) {
+						// get instances of key and block value
+						final T2 nextBlockVal = this.blockIt.next();
+						copy1 = this.createCopy(serializer1, nextSpillVal, this.copy1);
+						matchFunction.join(copy1, nextBlockVal, collector);
+					}
+
+					// reset block iterator
+					this.blockIt.reset();
+				}
+				// reset v1 iterator
+				spillIt.reset();
+			}
+		} finally {
+			if (spillIt != null) {
+				this.memoryForSpillingIterator.addAll(spillIt.close());
+			}
+		}
+	}
+
+
+	protected abstract <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator);
+
+	/**
+	 * Copies an instance of the given type, potentially reusing the object passed as the reuse parameter, which may be null.
+	 */
+	protected abstract <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
new file mode 100644
index 0000000..791494d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * matching through a sort-merge join strategy.
+ */
+public abstract class AbstractMergeMatchIterator<T1, T2, O> extends AbstractMergeIterator<T1, T2, O> {
+
+	public AbstractMergeMatchIterator(MutableObjectIterator<T1> input1, MutableObjectIterator<T2> input2,
+									TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+									TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+									TypePairComparator<T1, T2> pairComparator,
+									MemoryManager memoryManager,
+									IOManager ioManager,
+									int numMemoryPages,
+									AbstractInvokable parentTask)
+			throws MemoryAllocationException {
+		super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+	}
+
+	/**
+	 * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come
+	 * from different inputs. The output of the <code>match()</code> method is forwarded.
+	 * <p>
+	 * This method first zig-zags between the two sorted inputs in order to find a common
+	 * key, and then calls the match stub with the cross product of the values.
+	 *
+	 * @throws Exception Forwards all exceptions from the user code and the I/O system.
+	 * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
+	 */
+	@Override
+	public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+			throws Exception {
+		if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
+			// consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon)
+			while (this.iterator1.nextKey()) ;
+			while (this.iterator2.nextKey()) ;
+
+			return false;
+		}
+
+		final TypePairComparator<T1, T2> comparator = this.pairComparator;
+		comparator.setReference(this.iterator1.getCurrent());
+		T2 current2 = this.iterator2.getCurrent();
+
+		// zig zag
+		while (true) {
+			// determine the relation between the (possibly composite) keys
+			final int comp = comparator.compareToReference(current2);
+
+			if (comp == 0) {
+				break;
+			}
+
+			if (comp < 0) {
+				if (!this.iterator2.nextKey()) {
+					return false;
+				}
+				current2 = this.iterator2.getCurrent();
+			} else {
+				if (!this.iterator1.nextKey()) {
+					return false;
+				}
+				comparator.setReference(this.iterator1.getCurrent());
+			}
+		}
+
+		// here, we have a common key! call the match function with the cross product of the
+		// values
+		final Iterator<T1> values1 = this.iterator1.getValues();
+		final Iterator<T2> values2 = this.iterator2.getValues();
+
+		crossMatchingGroup(values1, values2, matchFunction, collector);
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
index c89b5c5..9705778 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,60 +18,19 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
-import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
 
-/**
- * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
- * matching through a sort-merge join strategy.
- */
-public class NonReusingMergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> {
-
-	/**
-	 * The log used by this iterator to log messages.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(NonReusingMergeMatchIterator.class);
-
-	// --------------------------------------------------------------------------------------------
-
-	private TypePairComparator<T1, T2> comp;
-
-	private NonReusingKeyGroupedIterator<T1> iterator1;
-
-	private NonReusingKeyGroupedIterator<T2> iterator2;
-
-	private final TypeSerializer<T1> serializer1;
-
-	private final TypeSerializer<T2> serializer2;
-
-	private final NonReusingBlockResettableIterator<T2> blockIt;	// for N:M cross products with same key
-
-	private final List<MemorySegment> memoryForSpillingIterator;
-
-	private final MemoryManager memoryManager;
-
-	private final IOManager ioManager;
-
-	// --------------------------------------------------------------------------------------------
+public class NonReusingMergeMatchIterator<T1, T2, O> extends AbstractMergeMatchIterator<T1, T2, O> {
 
 	public NonReusingMergeMatchIterator(
 			MutableObjectIterator<T1> input1,
@@ -83,341 +42,18 @@ public class NonReusingMergeMatchIterator<T1, T2, O> implements JoinTaskIterator
 			IOManager ioManager,
 			int numMemoryPages,
 			AbstractInvokable parentTask)
-	throws MemoryAllocationException
-	{
-		if (numMemoryPages < 2) {
-			throw new IllegalArgumentException("Merger needs at least 2 memory pages.");
-		}
-
-		this.comp = pairComparator;
-		this.serializer1 = serializer1;
-		this.serializer2 = serializer2;
-
-		this.memoryManager = memoryManager;
-		this.ioManager = ioManager;
-
-		this.iterator1 = new NonReusingKeyGroupedIterator<T1>(input1, comparator1.duplicate());
-		this.iterator2 = new NonReusingKeyGroupedIterator<T2>(input2, comparator2.duplicate());
-
-		final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
-		this.blockIt = new NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,
-			(numMemoryPages - numPagesForSpiller), parentTask);
-		this.memoryForSpillingIterator = memoryManager.allocatePages(parentTask, numPagesForSpiller);
+			throws MemoryAllocationException {
+		super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
 	}
 
-
-	@Override
-	public void open() throws IOException {}
-
-
-	@Override
-	public void close() {
-		if (this.blockIt != null) {
-			try {
-				this.blockIt.close();
-			}
-			catch (Throwable t) {
-				LOG.error("Error closing block memory iterator: " + t.getMessage(), t);
-			}
-		}
-
-		this.memoryManager.release(this.memoryForSpillingIterator);
-	}
-
-
 	@Override
-	public void abort() {
-		close();
+	protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) {
+		return new NonReusingKeyGroupedIterator<T>(input, comparator);
 	}
 
-	/**
-	 * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come
-	 * from different inputs. The output of the <code>match()</code> method is forwarded.
-	 * <p>
-	 * This method first zig-zags between the two sorted inputs in order to find a common
-	 * key, and then calls the match stub with the cross product of the values.
-	 *
-	 * @throws Exception Forwards all exceptions from the user code and the I/O system.
-	 *
-	 * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
-	 */
 	@Override
-	public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
-	throws Exception
-	{
-		if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
-			// consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon)
-			while (this.iterator1.nextKey());
-			while (this.iterator2.nextKey());
-			
-			return false;
-		}
-
-		final TypePairComparator<T1, T2> comparator = this.comp;
-		comparator.setReference(this.iterator1.getCurrent());
-		T2 current2 = this.iterator2.getCurrent();
-				
-		// zig zag
-		while (true) {
-			// determine the relation between the (possibly composite) keys
-			final int comp = comparator.compareToReference(current2);
-			
-			if (comp == 0) {
-				break;
-			}
-			
-			if (comp < 0) {
-				if (!this.iterator2.nextKey()) {
-					return false;
-				}
-				current2 = this.iterator2.getCurrent();
-			}
-			else {
-				if (!this.iterator1.nextKey()) {
-					return false;
-				}
-				comparator.setReference(this.iterator1.getCurrent());
-			}
-		}
-		
-		// here, we have a common key! call the match function with the cross product of the
-		// values
-		final NonReusingKeyGroupedIterator<T1>.ValuesIterator values1 = this.iterator1.getValues();
-		final NonReusingKeyGroupedIterator<T2>.ValuesIterator values2 = this.iterator2.getValues();
-		
-		final T1 firstV1 = values1.next();
-		final T2 firstV2 = values2.next();	
-			
-		final boolean v1HasNext = values1.hasNext();
-		final boolean v2HasNext = values2.hasNext();
-
-		// check if one side is already empty
-		// this check could be omitted if we put this in MatchTask.
-		// then we can derive the local strategy (with build side).
-		
-		if (v1HasNext) {
-			if (v2HasNext) {
-				// both sides contain more than one value
-				// TODO: Decide which side to spill and which to block!
-				crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector);
-			} else {
-				crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector);
-			}
-		} else {
-			if (v2HasNext) {
-				crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector);
-			} else {
-				// both sides contain only one value
-				matchFunction.join(firstV1, firstV2, collector);
-			}
-		}
-		return true;
-	}
-
-	/**
-	 * Crosses a single value from the first input with N values, all sharing a common key.
-	 * Effectively realizes a <i>1:N</i> match (join).
-	 * 
-	 * @param val1 The value form the <i>1</i> side.
-	 * @param firstValN The first of the values from the <i>N</i> side.
-	 * @param valsN Iterator over remaining <i>N</i> side values.
-	 *          
-	 * @throws Exception Forwards all exceptions thrown by the stub.
-	 */
-	private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
-			final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
-	throws Exception
-	{
-		T1 copy1 = this.serializer1.copy(val1);
-		matchFunction.join(copy1, firstValN, collector);
-		
-		// set copy and match first element
-		boolean more = true;
-		do {
-			final T2 nRec = valsN.next();
-			
-			if (valsN.hasNext()) {
-				copy1 = this.serializer1.copy(val1);
-				matchFunction.join(copy1, nRec, collector);
-			} else {
-				matchFunction.join(val1, nRec, collector);
-				more = false;
-			}
-		}
-		while (more);
-	}
-	
-	/**
-	 * Crosses a single value from the second side with N values, all sharing a common key.
-	 * Effectively realizes a <i>N:1</i> match (join).
-	 * 
-	 * @param val1 The value form the <i>1</i> side.
-	 * @param firstValN The first of the values from the <i>N</i> side.
-	 * @param valsN Iterator over remaining <i>N</i> side values.
-	 *          
-	 * @throws Exception Forwards all exceptions thrown by the stub.
-	 */
-	private void crossSecond1withNValues(T2 val1, T1 firstValN,
-			Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector)
-	throws Exception
-	{
-		T2 copy2 = this.serializer2.copy(val1);
-		matchFunction.join(firstValN, copy2, collector);
-		
-		// set copy and match first element
-		boolean more = true;
-		do {
-			final T1 nRec = valsN.next();
-			
-			if (valsN.hasNext()) {
-				copy2 = this.serializer2.copy(val1);
-				matchFunction.join(nRec, copy2, collector);
-			} else {
-				matchFunction.join(nRec, val1, collector);
-				more = false;
-			}
-		}
-		while (more);
+	protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) {
+		return serializer.copy(value);
 	}
-	
-	/**
-	 * @param firstV1
-	 * @param spillVals
-	 * @param firstV2
-	 * @param blockVals
-	 */
-	private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
-			final T2 firstV2, final Iterator<T2> blockVals,
-			final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
-	throws Exception
-	{
-		// ==================================================
-		// We have one first (head) element from both inputs (firstV1 and firstV2)
-		// We have an iterator for both inputs.
-		// we make the V1 side the spilling side and the V2 side the blocking side.
-		// In order to get the full cross product without unnecessary spilling, we do the
-		// following:
-		// 1) cross the heads
-		// 2) cross the head of the spilling side against the first block of the blocking side
-		// 3) cross the iterator of the spilling side with the head of the block side
-		// 4) cross the iterator of the spilling side with the first block
-		// ---------------------------------------------------
-		// If the blocking side has more than one block, we really need to make the spilling side fully
-		// resettable. For each further block on the block side, we do:
-		// 5) cross the head of the spilling side with the next block
-		// 6) cross the spilling iterator with the next block.
-		
-		// match the first values first
-		T1 copy1 = this.serializer1.copy(firstV1);
-		T2 blockHeadCopy = this.serializer2.copy(firstV2);
-		T1 spillHeadCopy = null;
 
-
-		// --------------- 1) Cross the heads -------------------
-		matchFunction.join(copy1, firstV2, collector);
-		
-		// for the remaining values, we do a block-nested-loops join
-		SpillingResettableIterator<T1> spillIt = null;
-		
-		try {
-			// create block iterator on the second input
-			this.blockIt.reopen(blockVals);
-			
-			// ------------- 2) cross the head of the spilling side with the first block ------------------
-			while (this.blockIt.hasNext()) {
-				final T2 nextBlockRec = this.blockIt.next();
-				copy1 = this.serializer1.copy(firstV1);
-				matchFunction.join(copy1, nextBlockRec, collector);
-			}
-			this.blockIt.reset();
-			
-			// spilling is required if the blocked input has data beyond the current block.
-			// in that case, create the spilling iterator
-			final Iterator<T1> leftSideIter;
-			final boolean spillingRequired = this.blockIt.hasFurtherInput();
-			if (spillingRequired)
-			{
-				// more data than would fit into one block. we need to wrap the other side in a spilling iterator
-				// create spilling iterator on first input
-				spillIt = new SpillingResettableIterator<T1>(spillVals, this.serializer1,
-						this.memoryManager, this.ioManager, this.memoryForSpillingIterator);
-				leftSideIter = spillIt;
-				spillIt.open();
-				
-				spillHeadCopy = this.serializer1.copy(firstV1);
-			}
-			else {
-				leftSideIter = spillVals;
-			}
-			
-			// cross the values in the v1 iterator against the current block
-			
-			while (leftSideIter.hasNext()) {
-				final T1 nextSpillVal = leftSideIter.next();
-				copy1 = this.serializer1.copy(nextSpillVal);
-				
-				
-				// -------- 3) cross the iterator of the spilling side with the head of the block side --------
-				T2 copy2 = this.serializer2.copy(blockHeadCopy);
-				matchFunction.join(copy1, copy2, collector);
-				
-				// -------- 4) cross the iterator of the spilling side with the first block --------
-				while (this.blockIt.hasNext()) {
-					T2 nextBlockRec = this.blockIt.next();
-					
-					// get instances of key and block value
-					copy1 = this.serializer1.copy(nextSpillVal);
-					matchFunction.join(copy1, nextBlockRec, collector);
-				}
-				// reset block iterator
-				this.blockIt.reset();
-			}
-			
-			// if everything from the block-side fit into a single block, we are done.
-			// note that in this special case, we did not create a spilling iterator at all
-			if (!spillingRequired) {
-				return;
-			}
-			
-			// here we are, because we have more blocks on the block side
-			// loop as long as there are blocks from the blocked input
-			while (this.blockIt.nextBlock())
-			{
-				// rewind the spilling iterator
-				spillIt.reset();
-				
-				// ------------- 5) cross the head of the spilling side with the next block ------------
-				while (this.blockIt.hasNext()) {
-					copy1 = this.serializer1.copy(spillHeadCopy);
-					final T2 nextBlockVal = blockIt.next();
-					matchFunction.join(copy1, nextBlockVal, collector);
-				}
-				this.blockIt.reset();
-				
-				// -------- 6) cross the spilling iterator with the next block. ------------------
-				while (spillIt.hasNext())
-				{
-					// get value from resettable iterator
-					final T1 nextSpillVal = spillIt.next();
-					// cross value with block values
-					while (this.blockIt.hasNext()) {
-						// get instances of key and block value
-						final T2 nextBlockVal = this.blockIt.next();
-						copy1 = this.serializer1.copy(nextSpillVal);
-						matchFunction.join(copy1, nextBlockVal, collector);
-					}
-					
-					// reset block iterator
-					this.blockIt.reset();
-				}
-				// reset v1 iterator
-				spillIt.reset();
-			}
-		}
-		finally {
-			if (spillIt != null) {
-				this.memoryForSpillingIterator.addAll(spillIt.close());
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
index 66beee1..c9cf5a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
@@ -18,70 +18,20 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.resettable.NonReusingBlockResettableIterator;
-import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
 
-/**
- * An implementation of the {@link JoinTaskIterator} that realizes the
- * matching through a sort-merge join strategy.
- */
-public class ReusingMergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O> {
-	
-	/**
-	 * The log used by this iterator to log messages.
-	 */
-	private static final Logger LOG = LoggerFactory.getLogger(ReusingMergeMatchIterator.class);
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private TypePairComparator<T1, T2> comp;
-	
-	private ReusingKeyGroupedIterator<T1> iterator1;
-
-	private ReusingKeyGroupedIterator<T2> iterator2;
-	
-	private final TypeSerializer<T1> serializer1;
-	
-	private final TypeSerializer<T2> serializer2;
-	
-	private T1 copy1;
-	
-	private T1 spillHeadCopy;
-	
-	private T2 copy2;
-	
-	private T2 blockHeadCopy;
-	
-	private final NonReusingBlockResettableIterator<T2> blockIt;				// for N:M cross products with same key
-	
-	private final List<MemorySegment> memoryForSpillingIterator;
-	
-	private final MemoryManager memoryManager;
+public class ReusingMergeMatchIterator<T1, T2, O> extends AbstractMergeMatchIterator<T1, T2, O> {
 
-	private final IOManager ioManager;
-	
-	// --------------------------------------------------------------------------------------------
-	
 	public ReusingMergeMatchIterator(
 			MutableObjectIterator<T1> input1,
 			MutableObjectIterator<T2> input2,
@@ -92,344 +42,23 @@ public class ReusingMergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1
 			IOManager ioManager,
 			int numMemoryPages,
 			AbstractInvokable parentTask)
-	throws MemoryAllocationException
-	{
-		if (numMemoryPages < 2) {
-			throw new IllegalArgumentException("Merger needs at least 2 memory pages.");
-		}
-		
-		this.comp = pairComparator;
-		this.serializer1 = serializer1;
-		this.serializer2 = serializer2;
-		
+			throws MemoryAllocationException {
+		super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+
 		this.copy1 = serializer1.createInstance();
 		this.spillHeadCopy = serializer1.createInstance();
 		this.copy2 = serializer2.createInstance();
 		this.blockHeadCopy = serializer2.createInstance();
-		
-		this.memoryManager = memoryManager;
-		this.ioManager = ioManager;
-		
-		this.iterator1 = new ReusingKeyGroupedIterator<T1>(input1, this.serializer1, comparator1.duplicate());
-		this.iterator2 = new ReusingKeyGroupedIterator<T2>(input2, this.serializer2, comparator2.duplicate());
-		
-		final int numPagesForSpiller = numMemoryPages > 20 ? 2 : 1;
-		this.blockIt = new NonReusingBlockResettableIterator<T2>(this.memoryManager, this.serializer2,
-			(numMemoryPages - numPagesForSpiller), parentTask);
-		this.memoryForSpillingIterator = memoryManager.allocatePages(parentTask, numPagesForSpiller);
-	}
-
-
-	@Override
-	public void open() throws IOException {}
-
-
-	@Override
-	public void close() {
-		if (this.blockIt != null) {
-			try {
-				this.blockIt.close();
-			}
-			catch (Throwable t) {
-				LOG.error("Error closing block memory iterator: " + t.getMessage(), t);
-			}
-		}
-		
-		this.memoryManager.release(this.memoryForSpillingIterator);
 	}
-	
 
 	@Override
-	public void abort() {
-		close();
+	protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) {
+		return new ReusingKeyGroupedIterator<T>(input, serializer, comparator);
 	}
 
-	/**
-	 * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come 
-	 * from different inputs. The output of the <code>match()</code> method is forwarded.
-	 * <p>
-	 * This method first zig-zags between the two sorted inputs in order to find a common
-	 * key, and then calls the match stub with the cross product of the values.
-	 * 
-	 * @throws Exception Forwards all exceptions from the user code and the I/O system.
-	 * 
-	 * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(FlatJoinFunction, Collector)
-	 */
 	@Override
-	public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
-	throws Exception
-	{
-		if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
-			// consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon)
-			while (this.iterator1.nextKey());
-			while (this.iterator2.nextKey());
-			
-			return false;
-		}
-
-		final TypePairComparator<T1, T2> comparator = this.comp;
-		comparator.setReference(this.iterator1.getCurrent());
-		T2 current2 = this.iterator2.getCurrent();
-				
-		// zig zag
-		while (true) {
-			// determine the relation between the (possibly composite) keys
-			final int comp = comparator.compareToReference(current2);
-			
-			if (comp == 0) {
-				break;
-			}
-			
-			if (comp < 0) {
-				if (!this.iterator2.nextKey()) {
-					return false;
-				}
-				current2 = this.iterator2.getCurrent();
-			}
-			else {
-				if (!this.iterator1.nextKey()) {
-					return false;
-				}
-				comparator.setReference(this.iterator1.getCurrent());
-			}
-		}
-		
-		// here, we have a common key! call the match function with the cross product of the
-		// values
-		final ReusingKeyGroupedIterator<T1>.ValuesIterator values1 = this.iterator1.getValues();
-		final ReusingKeyGroupedIterator<T2>.ValuesIterator values2 = this.iterator2.getValues();
-		
-		final T1 firstV1 = values1.next();
-		final T2 firstV2 = values2.next();	
-			
-		final boolean v1HasNext = values1.hasNext();
-		final boolean v2HasNext = values2.hasNext();
-
-		// check if one side is already empty
-		// this check could be omitted if we put this in MatchTask.
-		// then we can derive the local strategy (with build side).
-		
-		if (v1HasNext) {
-			if (v2HasNext) {
-				// both sides contain more than one value
-				// TODO: Decide which side to spill and which to block!
-				crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector);
-			} else {
-				crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector);
-			}
-		} else {
-			if (v2HasNext) {
-				crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector);
-			} else {
-				// both sides contain only one value
-				matchFunction.join(firstV1, firstV2, collector);
-			}
-		}
-		return true;
+	protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) {
+		return serializer.copy(value, reuse);
 	}
 
-	/**
-	 * Crosses a single value from the first input with N values, all sharing a common key.
-	 * Effectively realizes a <i>1:N</i> match (join).
-	 * 
-	 * @param val1 The value form the <i>1</i> side.
-	 * @param firstValN The first of the values from the <i>N</i> side.
-	 * @param valsN Iterator over remaining <i>N</i> side values.
-	 *          
-	 * @throws Exception Forwards all exceptions thrown by the stub.
-	 */
-	private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
-			final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
-	throws Exception
-	{
-		this.copy1 = this.serializer1.copy(val1, this.copy1);
-		matchFunction.join(this.copy1, firstValN, collector);
-		
-		// set copy and match first element
-		boolean more = true;
-		do {
-			final T2 nRec = valsN.next();
-			
-			if (valsN.hasNext()) {
-				this.copy1 = this.serializer1.copy(val1, this.copy1);
-				matchFunction.join(this.copy1, nRec, collector);
-			} else {
-				matchFunction.join(val1, nRec, collector);
-				more = false;
-			}
-		}
-		while (more);
-	}
-	
-	/**
-	 * Crosses a single value from the second side with N values, all sharing a common key.
-	 * Effectively realizes a <i>N:1</i> match (join).
-	 * 
-	 * @param val1 The value form the <i>1</i> side.
-	 * @param firstValN The first of the values from the <i>N</i> side.
-	 * @param valsN Iterator over remaining <i>N</i> side values.
-	 *          
-	 * @throws Exception Forwards all exceptions thrown by the stub.
-	 */
-	private void crossSecond1withNValues(T2 val1, T1 firstValN,
-			Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector)
-	throws Exception
-	{
-		this.copy2 = this.serializer2.copy(val1, this.copy2);
-		matchFunction.join(firstValN, this.copy2, collector);
-		
-		// set copy and match first element
-		boolean more = true;
-		do {
-			final T1 nRec = valsN.next();
-			
-			if (valsN.hasNext()) {
-				this.copy2 = this.serializer2.copy(val1, this.copy2);
-				matchFunction.join(nRec,this.copy2,collector);
-			} else {
-				matchFunction.join(nRec, val1, collector);
-				more = false;
-			}
-		}
-		while (more);
-	}
-	
-	/**
-	 * @param firstV1
-	 * @param spillVals
-	 * @param firstV2
-	 * @param blockVals
-	 */
-	private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
-			final T2 firstV2, final Iterator<T2> blockVals,
-			final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
-	throws Exception
-	{
-		// ==================================================
-		// We have one first (head) element from both inputs (firstV1 and firstV2)
-		// We have an iterator for both inputs.
-		// we make the V1 side the spilling side and the V2 side the blocking side.
-		// In order to get the full cross product without unnecessary spilling, we do the
-		// following:
-		// 1) cross the heads
-		// 2) cross the head of the spilling side against the first block of the blocking side
-		// 3) cross the iterator of the spilling side with the head of the block side
-		// 4) cross the iterator of the spilling side with the first block
-		// ---------------------------------------------------
-		// If the blocking side has more than one block, we really need to make the spilling side fully
-		// resettable. For each further block on the block side, we do:
-		// 5) cross the head of the spilling side with the next block
-		// 6) cross the spilling iterator with the next block.
-		
-		// match the first values first
-		this.copy1 = this.serializer1.copy(firstV1, this.copy1);
-		this.blockHeadCopy = this.serializer2.copy(firstV2, this.blockHeadCopy);
-		
-		// --------------- 1) Cross the heads -------------------
-		matchFunction.join(this.copy1, firstV2, collector);
-		
-		// for the remaining values, we do a block-nested-loops join
-		SpillingResettableIterator<T1> spillIt = null;
-		
-		try {
-			// create block iterator on the second input
-			this.blockIt.reopen(blockVals);
-			
-			// ------------- 2) cross the head of the spilling side with the first block ------------------
-			while (this.blockIt.hasNext()) {
-				final T2 nextBlockRec = this.blockIt.next();
-				this.copy1 = this.serializer1.copy(firstV1, this.copy1);
-				matchFunction.join(this.copy1, nextBlockRec, collector);
-			}
-			this.blockIt.reset();
-			
-			// spilling is required if the blocked input has data beyond the current block.
-			// in that case, create the spilling iterator
-			final Iterator<T1> leftSideIter;
-			final boolean spillingRequired = this.blockIt.hasFurtherInput();
-			if (spillingRequired)
-			{
-				// more data than would fit into one block. we need to wrap the other side in a spilling iterator
-				// create spilling iterator on first input
-				spillIt = new SpillingResettableIterator<T1>(spillVals, this.serializer1,
-						this.memoryManager, this.ioManager, this.memoryForSpillingIterator);
-				leftSideIter = spillIt;
-				spillIt.open();
-				
-				this.spillHeadCopy = this.serializer1.copy(firstV1, this.spillHeadCopy);
-			}
-			else {
-				leftSideIter = spillVals;
-			}
-			
-			// cross the values in the v1 iterator against the current block
-			
-			while (leftSideIter.hasNext()) {
-				final T1 nextSpillVal = leftSideIter.next();
-				this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
-				
-				
-				// -------- 3) cross the iterator of the spilling side with the head of the block side --------
-				this.copy2 = this.serializer2.copy(this.blockHeadCopy, this.copy2);
-				matchFunction.join(this.copy1, this.copy2, collector);
-				
-				// -------- 4) cross the iterator of the spilling side with the first block --------
-				while (this.blockIt.hasNext()) {
-					T2 nextBlockRec = this.blockIt.next();
-					
-					// get instances of key and block value
-					this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
-					matchFunction.join(this.copy1, nextBlockRec, collector);
-				}
-				// reset block iterator
-				this.blockIt.reset();
-			}
-			
-			// if everything from the block-side fit into a single block, we are done.
-			// note that in this special case, we did not create a spilling iterator at all
-			if (!spillingRequired) {
-				return;
-			}
-			
-			// here we are, because we have more blocks on the block side
-			// loop as long as there are blocks from the blocked input
-			while (this.blockIt.nextBlock())
-			{
-				// rewind the spilling iterator
-				spillIt.reset();
-				
-				// ------------- 5) cross the head of the spilling side with the next block ------------
-				while (this.blockIt.hasNext()) {
-					this.copy1 = this.serializer1.copy(this.spillHeadCopy, this.copy1);
-					final T2 nextBlockVal = blockIt.next();
-					matchFunction.join(this.copy1, nextBlockVal, collector);
-				}
-				this.blockIt.reset();
-				
-				// -------- 6) cross the spilling iterator with the next block. ------------------
-				while (spillIt.hasNext())
-				{
-					// get value from resettable iterator
-					final T1 nextSpillVal = spillIt.next();
-					// cross value with block values
-					while (this.blockIt.hasNext()) {
-						// get instances of key and block value
-						final T2 nextBlockVal = this.blockIt.next();
-						this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1);
-						matchFunction.join(this.copy1, nextBlockVal, collector);
-					}
-					
-					// reset block iterator
-					this.blockIt.reset();
-				}
-				// reset v1 iterator
-				spillIt.reset();
-			}
-		}
-		finally {
-			if (spillIt != null) {
-				this.memoryForSpillingIterator.addAll(spillIt.close());
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
new file mode 100644
index 0000000..64e8298
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedIterator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public interface KeyGroupedIterator<E> {
+
+	boolean nextKey() throws IOException;
+
+	E getCurrent();
+
+	Iterator<E> getValues();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
index 3f28cfc..6f4448c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonReusingKeyGroupedIterator.java
@@ -29,7 +29,7 @@ import org.apache.flink.util.TraversableOnceException;
 /**
  * The key grouped iterator returns a key and all values that share the same key.
  */
-public final class NonReusingKeyGroupedIterator<E> {
+public final class NonReusingKeyGroupedIterator<E> implements KeyGroupedIterator<E> {
 	
 	private final MutableObjectIterator<E> iterator;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/0dc6849a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ReusingKeyGroupedIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ReusingKeyGroupedIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ReusingKeyGroupedIterator.java
index 4dc9dd3..1477f10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ReusingKeyGroupedIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ReusingKeyGroupedIterator.java
@@ -31,7 +31,7 @@ import org.apache.flink.util.TraversableOnceException;
  * The KeyValueIterator returns a key and all values that belong to the key (share the same key).
  * 
  */
-public final class ReusingKeyGroupedIterator<E> {
+public final class ReusingKeyGroupedIterator<E> implements KeyGroupedIterator<E> {
 	
 	private final MutableObjectIterator<E> iterator;
 
@@ -78,6 +78,7 @@ public final class ReusingKeyGroupedIterator<E> {
 	 * 
 	 * @return true if the input iterator has an other group of key-value pairs that share the same key.
 	 */
+	@Override
 	public boolean nextKey() throws IOException
 	{
 		// first element (or empty)
@@ -139,6 +140,7 @@ public final class ReusingKeyGroupedIterator<E> {
 		return this.comparator;
 	}
 	
+	@Override
 	public E getCurrent() {
 		return this.current;
 	}
@@ -150,6 +152,7 @@ public final class ReusingKeyGroupedIterator<E> {
 	 * 
 	 * @return Iterator over all values that belong to the current key.
 	 */
+	@Override
 	public ValuesIterator getValues() {
 		return this.valuesIterator;
 	}


[5/5] flink git commit: [FLINK-2105] Implement Sort-Merge Outer Join algorithm

Posted by fh...@apache.org.
[FLINK-2105] Implement Sort-Merge Outer Join algorithm

This closes #907


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/941ac6df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/941ac6df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/941ac6df

Branch: refs/heads/master
Commit: 941ac6dfd446d8e97e2fe2f589164978602adf94
Parents: df9f481
Author: r-pogalz <r....@campus.tu-berlin.de>
Authored: Mon Aug 3 12:59:48 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 23:51:27 2015 +0200

----------------------------------------------------------------------
 .../operators/sort/AbstractMergeIterator.java   |  58 +--
 .../sort/AbstractMergeOuterJoinIterator.java    | 189 ++++++++
 .../sort/NonReusingMergeOuterJoinIterator.java  |  60 +++
 .../sort/ReusingMergeOuterJoinIterator.java     |  63 +++
 ...bstractSortMergeOuterJoinIteratorITCase.java | 462 +++++++++++++++++++
 ...ReusingSortMergeInnerJoinIteratorITCase.java |   4 +-
 ...ReusingSortMergeOuterJoinIteratorITCase.java |  82 ++++
 ...ReusingSortMergeInnerJoinIteratorITCase.java |   4 +-
 ...ReusingSortMergeOuterJoinIteratorITCase.java |  82 ++++
 .../runtime/operators/testutils/Match.java      |   2 +-
 .../testutils/MatchRemovingJoiner.java          |  58 +++
 .../testutils/MatchRemovingMatcher.java         |  58 ---
 12 files changed, 1030 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
index 9a61c14..c01afc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java
@@ -115,20 +115,20 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
 	}
 
 	/**
-	 * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come
-	 * from different inputs. The output of the <code>match()</code> method is forwarded.
+	 * Calls the <code>JoinFunction#join()</code> method for all two key-value pairs that share the same key and come
+	 * from different inputs. The output of the <code>join()</code> method is forwarded.
 	 * <p>
 	 * This method first zig-zags between the two sorted inputs in order to find a common
-	 * key, and then calls the match stub with the cross product of the values.
+	 * key, and then calls the join stub with the cross product of the values.
 	 *
 	 * @throws Exception Forwards all exceptions from the user code and the I/O system.
 	 * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
 	 */
 	@Override
-	public abstract boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+	public abstract boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector)
 			throws Exception;
 
-	protected void crossMatchingGroup(Iterator<T1> values1, Iterator<T2> values2, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) throws Exception {
+	protected void crossMatchingGroup(Iterator<T1> values1, Iterator<T2> values2, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws Exception {
 		final T1 firstV1 = values1.next();
 		final T2 firstV2 = values2.next();
 
@@ -143,23 +143,23 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
 			if (v2HasNext) {
 				// both sides contain more than one value
 				// TODO: Decide which side to spill and which to block!
-				crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector);
+				crossMwithNValues(firstV1, values1, firstV2, values2, joinFunction, collector);
 			} else {
-				crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector);
+				crossSecond1withNValues(firstV2, firstV1, values1, joinFunction, collector);
 			}
 		} else {
 			if (v2HasNext) {
-				crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector);
+				crossFirst1withNValues(firstV1, firstV2, values2, joinFunction, collector);
 			} else {
 				// both sides contain only one value
-				matchFunction.join(firstV1, firstV2, collector);
+				joinFunction.join(firstV1, firstV2, collector);
 			}
 		}
 	}
 
 	/**
 	 * Crosses a single value from the first input with N values, all sharing a common key.
-	 * Effectively realizes a <i>1:N</i> match (join).
+	 * Effectively realizes a <i>1:N</i> join.
 	 *
 	 * @param val1      The value form the <i>1</i> side.
 	 * @param firstValN The first of the values from the <i>N</i> side.
@@ -167,21 +167,21 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
 	 * @throws Exception Forwards all exceptions thrown by the stub.
 	 */
 	private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
-										final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
+										final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector)
 			throws Exception {
 		T1 copy1 = createCopy(serializer1, val1, this.copy1);
-		matchFunction.join(copy1, firstValN, collector);
+		joinFunction.join(copy1, firstValN, collector);
 
-		// set copy and match first element
+		// set copy and join first element
 		boolean more = true;
 		do {
 			final T2 nRec = valsN.next();
 
 			if (valsN.hasNext()) {
 				copy1 = createCopy(serializer1, val1, this.copy1);
-				matchFunction.join(copy1, nRec, collector);
+				joinFunction.join(copy1, nRec, collector);
 			} else {
-				matchFunction.join(val1, nRec, collector);
+				joinFunction.join(val1, nRec, collector);
 				more = false;
 			}
 		}
@@ -190,7 +190,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
 
 	/**
 	 * Crosses a single value from the second side with N values, all sharing a common key.
-	 * Effectively realizes a <i>N:1</i> match (join).
+	 * Effectively realizes a <i>N:1</i> join.
 	 *
 	 * @param val1      The value form the <i>1</i> side.
 	 * @param firstValN The first of the values from the <i>N</i> side.
@@ -198,20 +198,20 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
 	 * @throws Exception Forwards all exceptions thrown by the stub.
 	 */
 	private void crossSecond1withNValues(T2 val1, T1 firstValN,
-										Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) throws Exception {
+										Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws Exception {
 		T2 copy2 = createCopy(serializer2, val1, this.copy2);
-		matchFunction.join(firstValN, copy2, collector);
+		joinFunction.join(firstValN, copy2, collector);
 
-		// set copy and match first element
+		// set copy and join first element
 		boolean more = true;
 		do {
 			final T1 nRec = valsN.next();
 
 			if (valsN.hasNext()) {
 				copy2 = createCopy(serializer2, val1, this.copy2);
-				matchFunction.join(nRec, copy2, collector);
+				joinFunction.join(nRec, copy2, collector);
 			} else {
-				matchFunction.join(nRec, val1, collector);
+				joinFunction.join(nRec, val1, collector);
 				more = false;
 			}
 		}
@@ -220,7 +220,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
 
 	private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
 									final T2 firstV2, final Iterator<T2> blockVals,
-									final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) throws Exception {
+									final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector) throws Exception {
 		// ==================================================
 		// We have one first (head) element from both inputs (firstV1 and firstV2)
 		// We have an iterator for both inputs.
@@ -237,13 +237,13 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
 		// 5) cross the head of the spilling side with the next block
 		// 6) cross the spilling iterator with the next block.
 
-		// match the first values first
+		// join the first values first
 		T1 copy1 = this.createCopy(serializer1, firstV1, this.copy1);
 		T2 blockHeadCopy = this.createCopy(serializer2, firstV2, this.blockHeadCopy);
 		T1 spillHeadCopy = null;
 
 		// --------------- 1) Cross the heads -------------------
-		matchFunction.join(copy1, firstV2, collector);
+		joinFunction.join(copy1, firstV2, collector);
 
 		// for the remaining values, we do a block-nested-loops join
 		SpillingResettableIterator<T1> spillIt = null;
@@ -256,7 +256,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
 			while (this.blockIt.hasNext()) {
 				final T2 nextBlockRec = this.blockIt.next();
 				copy1 = this.createCopy(serializer1, firstV1, this.copy1);
-				matchFunction.join(copy1, nextBlockRec, collector);
+				joinFunction.join(copy1, nextBlockRec, collector);
 			}
 			this.blockIt.reset();
 
@@ -286,7 +286,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
 
 				// -------- 3) cross the iterator of the spilling side with the head of the block side --------
 				T2 copy2 = this.createCopy(serializer2, blockHeadCopy, this.copy2);
-				matchFunction.join(copy1, copy2, collector);
+				joinFunction.join(copy1, copy2, collector);
 
 				// -------- 4) cross the iterator of the spilling side with the first block --------
 				while (this.blockIt.hasNext()) {
@@ -294,7 +294,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
 
 					// get instances of key and block value
 					copy1 = this.createCopy(serializer1, nextSpillVal, this.copy1);
-					matchFunction.join(copy1, nextBlockRec, collector);
+					joinFunction.join(copy1, nextBlockRec, collector);
 				}
 				// reset block iterator
 				this.blockIt.reset();
@@ -316,7 +316,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
 				while (this.blockIt.hasNext()) {
 					copy1 = this.createCopy(serializer1, spillHeadCopy, this.copy1);
 					final T2 nextBlockVal = blockIt.next();
-					matchFunction.join(copy1, nextBlockVal, collector);
+					joinFunction.join(copy1, nextBlockVal, collector);
 				}
 				this.blockIt.reset();
 
@@ -329,7 +329,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat
 						// get instances of key and block value
 						final T2 nextBlockVal = this.blockIt.next();
 						copy1 = this.createCopy(serializer1, nextSpillVal, this.copy1);
-						matchFunction.join(copy1, nextBlockVal, collector);
+						joinFunction.join(copy1, nextBlockVal, collector);
 					}
 
 					// reset block iterator

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
new file mode 100644
index 0000000..01b371e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ */
+public abstract class AbstractMergeOuterJoinIterator<T1, T2, O> extends AbstractMergeIterator<T1, T2, O> {
+
+	public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+	private final OuterJoinType outerJoinType;
+
+	private boolean initialized = false;
+	private boolean it1Empty = false;
+	private boolean it2Empty = false;
+
+
+	public AbstractMergeOuterJoinIterator(
+			OuterJoinType outerJoinType,
+			MutableObjectIterator<T1> input1,
+			MutableObjectIterator<T2> input2,
+			TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+			TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+			TypePairComparator<T1, T2> pairComparator,
+			MemoryManager memoryManager,
+			IOManager ioManager,
+			int numMemoryPages,
+			AbstractInvokable parentTask)
+			throws MemoryAllocationException {
+		super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+
+		this.outerJoinType = outerJoinType;
+	}
+
+	/**
+	 * Calls the <code>JoinFunction#join()</code> method for all two key-value pairs that share the same key and come
+	 * from different inputs. Furthermore, depending on the outer join type (LEFT, RIGHT, FULL), all key-value pairs where no
+	 * matching partner from the other input exists are joined with null.
+	 * The output of the <code>join()</code> method is forwarded.
+	 *
+	 * @throws Exception Forwards all exceptions from the user code and the I/O system.
+	 * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
+	 */
+	@Override
+	public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector) throws Exception {
+		if (!initialized) {
+			//first run, set iterators to first elements
+			it1Empty = !this.iterator1.nextKey();
+			it2Empty = !this.iterator2.nextKey();
+			initialized = true;
+		}
+
+		if (it1Empty && it2Empty) {
+			return false;
+		} else if (it2Empty) {
+			if (outerJoinType == OuterJoinType.LEFT || outerJoinType == OuterJoinType.FULL) {
+				joinLeftKeyValuesWithNull(iterator1.getValues(), joinFunction, collector);
+				it1Empty = !iterator1.nextKey();
+				return true;
+			} else {
+				//consume rest of left side
+				while (iterator1.nextKey()) ;
+				it1Empty = true;
+				return false;
+			}
+		} else if (it1Empty) {
+			if (outerJoinType == OuterJoinType.RIGHT || outerJoinType == OuterJoinType.FULL) {
+				joinRightKeyValuesWithNull(iterator2.getValues(), joinFunction, collector);
+				it2Empty = !iterator2.nextKey();
+				return true;
+			} else {
+				//consume rest of right side
+				while (iterator2.nextKey()) ;
+				it2Empty = true;
+				return false;
+			}
+		} else {
+			final TypePairComparator<T1, T2> comparator = super.pairComparator;
+			comparator.setReference(this.iterator1.getCurrent());
+			T2 current2 = this.iterator2.getCurrent();
+
+			// zig zag
+			while (true) {
+				// determine the relation between the (possibly composite) keys
+				final int comp = comparator.compareToReference(current2);
+
+				if (comp == 0) {
+					break;
+				}
+
+				if (comp < 0) {
+					//right key < left key
+					if (outerJoinType == OuterJoinType.RIGHT || outerJoinType == OuterJoinType.FULL) {
+						//join right key values with null in case of right or full outer join
+						joinRightKeyValuesWithNull(iterator2.getValues(), joinFunction, collector);
+						it2Empty = !iterator2.nextKey();
+						return true;
+					} else {
+						//skip this right key if it is a left outer join
+						if (!this.iterator2.nextKey()) {
+							//if right side is empty, join current left key values with null
+							joinLeftKeyValuesWithNull(iterator1.getValues(), joinFunction, collector);
+							it1Empty = !iterator1.nextKey();
+							it2Empty = true;
+							return true;
+						}
+						current2 = this.iterator2.getCurrent();
+					}
+				} else {
+					//right key > left key
+					if (outerJoinType == OuterJoinType.LEFT || outerJoinType == OuterJoinType.FULL) {
+						//join left key values with null in case of left or full outer join
+						joinLeftKeyValuesWithNull(iterator1.getValues(), joinFunction, collector);
+						it1Empty = !iterator1.nextKey();
+						return true;
+					} else {
+						//skip this left key if it is a right outer join
+						if (!this.iterator1.nextKey()) {
+							//if right side is empty, join current right key values with null
+							joinRightKeyValuesWithNull(iterator2.getValues(), joinFunction, collector);
+							it1Empty = true;
+							it2Empty = !iterator2.nextKey();
+							return true;
+						}
+						comparator.setReference(this.iterator1.getCurrent());
+					}
+				}
+			}
+
+			// here, we have a common key! call the join function with the cross product of the
+			// values
+			final Iterator<T1> values1 = this.iterator1.getValues();
+			final Iterator<T2> values2 = this.iterator2.getValues();
+
+			crossMatchingGroup(values1, values2, joinFunction, collector);
+			it1Empty = !iterator1.nextKey();
+			it2Empty = !iterator2.nextKey();
+			return true;
+		}
+	}
+
+	private void joinLeftKeyValuesWithNull(Iterator<T1> values, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws Exception {
+		while (values.hasNext()) {
+			T1 next = values.next();
+			this.copy1 = createCopy(serializer1, next, copy1);
+			joinFunction.join(copy1, null, collector);
+		}
+	}
+
+	private void joinRightKeyValuesWithNull(Iterator<T2> values, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws Exception {
+		while (values.hasNext()) {
+			T2 next = values.next();
+			this.copy2 = createCopy(serializer2, next, copy2);
+			joinFunction.join(null, copy2, collector);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
new file mode 100644
index 0000000..ac49ece
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class NonReusingMergeOuterJoinIterator<T1, T2, O> extends AbstractMergeOuterJoinIterator<T1, T2, O> {
+
+	public NonReusingMergeOuterJoinIterator(
+			OuterJoinType outerJoinType,
+			MutableObjectIterator<T1> input1,
+			MutableObjectIterator<T2> input2,
+			TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+			TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+			TypePairComparator<T1, T2> pairComparator,
+			MemoryManager memoryManager,
+			IOManager ioManager,
+			int numMemoryPages,
+			AbstractInvokable parentTask)
+			throws MemoryAllocationException {
+		super(outerJoinType, input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+	}
+
+	@Override
+	protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) {
+		return new NonReusingKeyGroupedIterator<T>(input, comparator);
+	}
+
+	@Override
+	protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) {
+		return serializer.copy(value);
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
new file mode 100644
index 0000000..0cefbc5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class ReusingMergeOuterJoinIterator<T1, T2, O> extends AbstractMergeOuterJoinIterator<T1, T2, O> {
+
+	public ReusingMergeOuterJoinIterator(
+			OuterJoinType outerJoinType,
+			MutableObjectIterator<T1> input1,
+			MutableObjectIterator<T2> input2,
+			TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+			TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+			TypePairComparator<T1, T2> pairComparator,
+			MemoryManager memoryManager,
+			IOManager ioManager,
+			int numMemoryPages,
+			AbstractInvokable parentTask)
+			throws MemoryAllocationException {
+		super(outerJoinType, input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+
+		this.copy1 = serializer1.createInstance();
+		this.spillHeadCopy = serializer1.createInstance();
+		this.copy2 = serializer2.createInstance();
+		this.blockHeadCopy = serializer2.createInstance();
+	}
+
+	@Override
+	protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) {
+		return new ReusingKeyGroupedIterator<T>(input, serializer, comparator);
+	}
+
+	@Override
+	protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) { return serializer.copy(value, reuse); }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
new file mode 100644
index 0000000..1fbe025
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import org.apache.flink.runtime.operators.testutils.*;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleConstantValueIterator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGeneratorIterator;
+import org.apache.flink.runtime.util.ResettableMutableObjectIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+public abstract class AbstractSortMergeOuterJoinIteratorITCase {
+
+	// total memory
+	private static final int MEMORY_SIZE = 1024 * 1024 * 16;
+	private static final int PAGES_FOR_BNLJN = 2;
+
+	// the size of the left and right inputs
+	private static final int INPUT_1_SIZE = 20000;
+
+	private static final int INPUT_2_SIZE = 1000;
+
+	// random seeds for the left and right input data generators
+	private static final long SEED1 = 561349061987311L;
+
+	private static final long SEED2 = 231434613412342L;
+
+	// dummy abstract task
+	private final AbstractInvokable parentTask = new DummyInvokable();
+
+	private IOManager ioManager;
+	private MemoryManager memoryManager;
+
+	private TupleTypeInfo<Tuple2<String, String>> typeInfo1;
+	private TupleTypeInfo<Tuple2<String, Integer>> typeInfo2;
+	private TupleSerializer<Tuple2<String, String>> serializer1;
+	private TupleSerializer<Tuple2<String, Integer>> serializer2;
+	private TypeComparator<Tuple2<String, String>> comparator1;
+	private TypeComparator<Tuple2<String, Integer>> comparator2;
+	private TypePairComparator<Tuple2<String, String>, Tuple2<String, Integer>> pairComp;
+
+
+	@Before
+	public void beforeTest() {
+		ExecutionConfig config = new ExecutionConfig();
+		config.disableObjectReuse();
+
+		typeInfo1 = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
+		typeInfo2 = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class);
+		serializer1 = typeInfo1.createSerializer(config);
+		serializer2 = typeInfo2.createSerializer(config);
+		comparator1 = typeInfo1.createComparator(new int[]{0}, new boolean[]{true}, 0, config);
+		comparator2 = typeInfo2.createComparator(new int[]{0}, new boolean[]{true}, 0, config);
+		pairComp = new GenericPairComparator<Tuple2<String, String>, Tuple2<String, Integer>>(comparator1, comparator2);
+
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+		this.ioManager = new IOManagerAsync();
+	}
+
+	@After
+	public void afterTest() {
+		if (this.ioManager != null) {
+			this.ioManager.shutdown();
+			if (!this.ioManager.isProperlyShutDown()) {
+				Assert.fail("I/O manager failed to properly shut down.");
+			}
+			this.ioManager = null;
+		}
+
+		if (this.memoryManager != null) {
+			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+					this.memoryManager.verifyEmpty());
+			this.memoryManager.shutdown();
+			this.memoryManager = null;
+		}
+	}
+
+	protected void testFullOuterWithSample() throws Exception {
+		CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of(
+				new Tuple2<String, String>("Jack", "Engineering"),
+				new Tuple2<String, String>("Tim", "Sales"),
+				new Tuple2<String, String>("Zed", "HR")
+		);
+		CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of(
+				new Tuple2<String, Integer>("Allison", 100),
+				new Tuple2<String, Integer>("Jack", 200),
+				new Tuple2<String, Integer>("Zed", 150),
+				new Tuple2<String, Integer>("Zed", 250)
+		);
+
+		OuterJoinType outerJoinType = OuterJoinType.FULL;
+		List<Tuple4<String, String, String, Object>> actual = computeOuterJoin(input1, input2, outerJoinType);
+
+		List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
+				new Tuple4<String, String, String, Object>(null, null, "Allison", 100),
+				new Tuple4<String, String, String, Object>("Jack", "Engineering", "Jack", 200),
+				new Tuple4<String, String, String, Object>("Tim", "Sales", null, null),
+				new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 150),
+				new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 250)
+		);
+
+		Assert.assertEquals(expected, actual);
+	}
+
+	protected void testLeftOuterWithSample() throws Exception {
+		CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of(
+				new Tuple2<String, String>("Jack", "Engineering"),
+				new Tuple2<String, String>("Tim", "Sales"),
+				new Tuple2<String, String>("Zed", "HR")
+		);
+		CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of(
+				new Tuple2<String, Integer>("Allison", 100),
+				new Tuple2<String, Integer>("Jack", 200),
+				new Tuple2<String, Integer>("Zed", 150),
+				new Tuple2<String, Integer>("Zed", 250)
+		);
+
+		List<Tuple4<String, String, String, Object>> actual = computeOuterJoin(input1, input2, OuterJoinType.LEFT);
+
+		List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
+				new Tuple4<String, String, String, Object>("Jack", "Engineering", "Jack", 200),
+				new Tuple4<String, String, String, Object>("Tim", "Sales", null, null),
+				new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 150),
+				new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 250)
+		);
+
+		Assert.assertEquals(expected, actual);
+	}
+
+	protected void testRightOuterWithSample() throws Exception {
+		CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of(
+				new Tuple2<String, String>("Jack", "Engineering"),
+				new Tuple2<String, String>("Tim", "Sales"),
+				new Tuple2<String, String>("Zed", "HR")
+		);
+		CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of(
+				new Tuple2<String, Integer>("Allison", 100),
+				new Tuple2<String, Integer>("Jack", 200),
+				new Tuple2<String, Integer>("Zed", 150),
+				new Tuple2<String, Integer>("Zed", 250)
+		);
+
+		List<Tuple4<String, String, String, Object>> actual = computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
+
+		List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
+				new Tuple4<String, String, String, Object>(null, null, "Allison", 100),
+				new Tuple4<String, String, String, Object>("Jack", "Engineering", "Jack", 200),
+				new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 150),
+				new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 250)
+		);
+
+		Assert.assertEquals(expected, actual);
+	}
+
+	protected void testRightSideEmpty() throws Exception {
+		CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of(
+				new Tuple2<String, String>("Jack", "Engineering"),
+				new Tuple2<String, String>("Tim", "Sales"),
+				new Tuple2<String, String>("Zed", "HR")
+		);
+		CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of();
+
+		List<Tuple4<String, String, String, Object>> actualLeft = computeOuterJoin(input1, input2, OuterJoinType.LEFT);
+		List<Tuple4<String, String, String, Object>> actualRight = computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
+		List<Tuple4<String, String, String, Object>> actualFull = computeOuterJoin(input1, input2, OuterJoinType.FULL);
+
+		List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
+				new Tuple4<String, String, String, Object>("Jack", "Engineering", null, null),
+				new Tuple4<String, String, String, Object>("Tim", "Sales", null, null),
+				new Tuple4<String, String, String, Object>("Zed", "HR", null, null)
+		);
+
+		Assert.assertEquals(expected, actualLeft);
+		Assert.assertEquals(expected, actualFull);
+		Assert.assertEquals(Collections.<Tuple4<String,String,String,Object>>emptyList(), actualRight);
+	}
+
+	protected void testLeftSideEmpty() throws Exception {
+		CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of();
+		CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of(
+				new Tuple2<String, Integer>("Allison", 100),
+				new Tuple2<String, Integer>("Jack", 200),
+				new Tuple2<String, Integer>("Zed", 150),
+				new Tuple2<String, Integer>("Zed", 250)
+		);
+
+		List<Tuple4<String, String, String, Object>> actualLeft = computeOuterJoin(input1, input2, OuterJoinType.LEFT);
+		List<Tuple4<String, String, String, Object>> actualRight = computeOuterJoin(input1, input2, OuterJoinType.RIGHT);
+		List<Tuple4<String, String, String, Object>> actualFull = computeOuterJoin(input1, input2, OuterJoinType.FULL);
+
+		List<Tuple4<String, String, String, Object>> expected = Arrays.asList(
+				new Tuple4<String, String, String, Object>(null, null, "Allison", 100),
+				new Tuple4<String, String, String, Object>(null, null, "Jack", 200),
+				new Tuple4<String, String, String, Object>(null, null, "Zed", 150),
+				new Tuple4<String, String, String, Object>(null, null, "Zed", 250)
+		);
+
+		Assert.assertEquals(Collections.<Tuple4<String,String,String,Object>>emptyList(), actualLeft);
+		Assert.assertEquals(expected, actualRight);
+		Assert.assertEquals(expected, actualFull);
+	}
+
+	private List<Tuple4<String, String, String, Object>> computeOuterJoin(ResettableMutableObjectIterator<Tuple2<String, String>> input1,
+																		  ResettableMutableObjectIterator<Tuple2<String, Integer>> input2,
+																		  OuterJoinType outerJoinType) throws Exception {
+		input1.reset();
+		input2.reset();
+		AbstractMergeOuterJoinIterator<Tuple2<String, String>, Tuple2<String, Integer>, Tuple4<String, String, String, Object>> iterator =
+				createOuterJoinIterator(outerJoinType, input1, input2, serializer1, comparator1, serializer2, comparator2,
+						pairComp, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+
+		List<Tuple4<String, String, String, Object>> actual = new ArrayList<Tuple4<String, String, String, Object>>();
+		ListCollector<Tuple4<String, String, String, Object>> collector = new ListCollector<Tuple4<String, String, String, Object>>(actual);
+		while (iterator.callWithNextKey(new SimpleTupleJoinFunction(), collector)) ;
+		iterator.close();
+
+		return actual;
+	}
+
+	protected void testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType outerJoinType, int input1Size, int input1Duplicates, int input1ValueLength,
+														float input1KeyDensity, int input2Size, int input2Duplicates, int input2ValueLength, float input2KeyDensity) {
+		TypeSerializer<Tuple2<Integer, String>> serializer1 = new TupleSerializer<Tuple2<Integer, String>>(
+				(Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class,
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+		TypeSerializer<Tuple2<Integer, String>> serializer2 = new TupleSerializer<Tuple2<Integer, String>>(
+				(Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class,
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+		TypeComparator<Tuple2<Integer, String>> comparator1 =  new TupleComparator<Tuple2<Integer, String>>(
+				new int[]{0},
+				new TypeComparator<?>[] { new IntComparator(true) },
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE });
+		TypeComparator<Tuple2<Integer, String>> comparator2 =  new TupleComparator<Tuple2<Integer, String>>(
+				new int[]{0},
+				new TypeComparator<?>[] { new IntComparator(true) },
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE });
+
+		TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator =
+				new GenericPairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>>(comparator1, comparator2);
+
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+		this.ioManager = new IOManagerAsync();
+
+		final int DUPLICATE_KEY = 13;
+
+		try {
+			final TupleGenerator generator1 = new TupleGenerator(SEED1, 500, input1KeyDensity, input1ValueLength, KeyMode.SORTED_SPARSE, ValueMode.RANDOM_LENGTH, null);
+			final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, input2KeyDensity, input2ValueLength, KeyMode.SORTED_SPARSE, ValueMode.RANDOM_LENGTH, null);
+
+			final TupleGeneratorIterator gen1Iter = new TupleGeneratorIterator(generator1, input1Size);
+			final TupleGeneratorIterator gen2Iter = new TupleGeneratorIterator(generator2, input2Size);
+
+			final TupleConstantValueIterator const1Iter = new TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", input1Duplicates);
+			final TupleConstantValueIterator const2Iter = new TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", input2Duplicates);
+
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
+
+			// collect expected data
+			final Map<Integer, Collection<Match>> expectedMatchesMap = joinValues(
+					collectData(input1),
+					collectData(input2),
+					outerJoinType);
+
+			// re-create the whole thing for actual processing
+
+			// reset the generators and iterators
+			generator1.reset();
+			generator2.reset();
+			const1Iter.reset();
+			const2Iter.reset();
+			gen1Iter.reset();
+			gen2Iter.reset();
+
+			inList1.clear();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+
+			inList2.clear();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+
+			input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
+			input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
+
+			final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
+					new MatchRemovingJoiner(expectedMatchesMap);
+
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
+
+
+			// we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it
+			// needs to spill for the duplicate keys
+			AbstractMergeOuterJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+					createOuterJoinIterator(
+							outerJoinType, input1, input2, serializer1, comparator1, serializer2, comparator2,
+							pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+
+			iterator.open();
+
+			while (iterator.callWithNextKey(joinFunction, collector)) ;
+
+			iterator.close();
+
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+
+	protected abstract <T1, T2> AbstractMergeOuterJoinIterator createOuterJoinIterator(OuterJoinType outerJoinType,
+																			  MutableObjectIterator<T1> input1,
+																			  MutableObjectIterator<T2> input2,
+																			  TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+																			  TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+																			  TypePairComparator<T1, T2> pairComparator,
+																			  MemoryManager memoryManager,
+																			  IOManager ioManager,
+																			  int numMemoryPages,
+																			  AbstractInvokable parentTask) throws Exception;
+
+	// --------------------------------------------------------------------------------------------
+	//                                    Utilities
+	// --------------------------------------------------------------------------------------------
+
+
+	private Map<Integer, Collection<Match>> joinValues(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap,
+			OuterJoinType outerJoinType) {
+		Map<Integer, Collection<Match>> map = new HashMap<Integer, Collection<Match>>();
+
+		for (Integer key : leftMap.keySet()) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
+
+			if (outerJoinType == OuterJoinType.RIGHT && rightValues == null) {
+				continue;
+			}
+
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<Match>());
+			}
+
+			Collection<Match> joinedValues = map.get(key);
+
+			for (String leftValue : leftValues) {
+				if (rightValues != null) {
+					for (String rightValue : rightValues) {
+						joinedValues.add(new Match(leftValue, rightValue));
+					}
+				} else {
+					joinedValues.add(new Match(leftValue, null));
+				}
+			}
+		}
+
+		if (outerJoinType == OuterJoinType.RIGHT || outerJoinType == OuterJoinType.FULL) {
+			for (Integer key : rightMap.keySet()) {
+				Collection<String> leftValues = leftMap.get(key);
+				Collection<String> rightValues = rightMap.get(key);
+
+				if (leftValues != null) {
+					continue;
+				}
+
+				if (!map.containsKey(key)) {
+					map.put(key, new ArrayList<Match>());
+				}
+
+				Collection<Match> joinedValues = map.get(key);
+
+				for (String rightValue : rightValues) {
+					joinedValues.add(new Match(null, rightValue));
+				}
+			}
+		}
+
+		return map;
+	}
+
+
+	private Map<Integer, Collection<String>> collectData(MutableObjectIterator<Tuple2<Integer, String>> iter)
+			throws Exception {
+		final Map<Integer, Collection<String>> map = new HashMap<Integer, Collection<String>>();
+		Tuple2<Integer, String> pair = new Tuple2<Integer, String>();
+
+		while ((pair = iter.next(pair)) != null) {
+			final Integer key = pair.getField(0);
+
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<String>());
+			}
+
+			Collection<String> values = map.get(key);
+			final String value = pair.getField(1);
+			values.add(value);
+		}
+
+		return map;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
index 7fc3734..6548052 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
@@ -135,7 +135,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase {
 					collectData(input2));
 
 			final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
-					new MatchRemovingMatcher(expectedMatchesMap);
+					new MatchRemovingJoiner(expectedMatchesMap);
 
 			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
 	
@@ -226,7 +226,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase {
 			input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
 			input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
 			
-			final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = new MatchRemovingMatcher(expectedMatchesMap);
+			final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = new MatchRemovingJoiner(expectedMatchesMap);
 			
 			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
new file mode 100644
index 0000000..1205bc1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+public class NonReusingSortMergeOuterJoinIteratorITCase  extends AbstractSortMergeOuterJoinIteratorITCase {
+
+	@Override
+	protected <T1, T2> AbstractMergeOuterJoinIterator createOuterJoinIterator(OuterJoinType outerJoinType, MutableObjectIterator<T1> input1,
+																			  MutableObjectIterator<T2> input2, TypeSerializer<T1> serializer1,
+																			  TypeComparator<T1> comparator1, TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+																			  TypePairComparator<T1, T2> pairComparator, MemoryManager memoryManager, IOManager ioManager,
+																			  int numMemoryPages, AbstractInvokable parentTask) throws Exception {
+		return new NonReusingMergeOuterJoinIterator(outerJoinType, input1, input2, serializer1, comparator1,
+				serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+	}
+
+	@Test
+	public void testFullOuterWithSample() throws Exception {
+		super.testFullOuterWithSample();
+	}
+
+	@Test
+	public void testLeftOuterWithSample() throws Exception {
+		super.testLeftOuterWithSample();
+	}
+
+	@Test
+	public void testRightOuterWithSample() throws Exception {
+		super.testRightOuterWithSample();
+	}
+
+	@Test
+	public void testRightSideEmpty() throws Exception {
+		super.testRightSideEmpty();
+	}
+
+	@Test
+	public void testLeftSideEmpty() throws Exception {
+		super.testLeftSideEmpty();
+	}
+
+	@Test
+	public void testFullOuterJoinWithHighNumberOfCommonKeys() {
+		testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.FULL, 200, 500, 2048, 0.02f, 200, 500, 2048, 0.02f);
+	}
+
+	@Test
+	public void testLeftOuterJoinWithHighNumberOfCommonKeys() {
+		testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.LEFT, 200, 10, 4096, 0.02f, 100, 4000, 2048, 0.02f);
+	}
+
+	@Test
+	public void testRightOuterJoinWithHighNumberOfCommonKeys() {
+		testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.RIGHT, 100, 10, 2048, 0.02f, 200, 4000, 4096, 0.02f);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
index e4eec86..39316e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
@@ -135,7 +135,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase {
 				collectData(input2));
 
 			final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
-					new MatchRemovingMatcher(expectedMatchesMap);
+					new MatchRemovingJoiner(expectedMatchesMap);
 
 			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
 
@@ -226,7 +226,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase {
 			input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
 			input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
 			
-			final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new MatchRemovingMatcher(expectedMatchesMap);
+			final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new MatchRemovingJoiner(expectedMatchesMap);
 			
 			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
new file mode 100644
index 0000000..b4fbd80
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+public class ReusingSortMergeOuterJoinIteratorITCase extends AbstractSortMergeOuterJoinIteratorITCase {
+
+	@Override
+	protected <T1, T2> AbstractMergeOuterJoinIterator createOuterJoinIterator(OuterJoinType outerJoinType, MutableObjectIterator<T1> input1,
+																			  MutableObjectIterator<T2> input2, TypeSerializer<T1> serializer1,
+																			  TypeComparator<T1> comparator1, TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+																			  TypePairComparator<T1, T2> pairComparator, MemoryManager memoryManager, IOManager ioManager,
+																			  int numMemoryPages, AbstractInvokable parentTask) throws Exception {
+		return new ReusingMergeOuterJoinIterator(outerJoinType, input1, input2, serializer1, comparator1,
+				serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+	}
+
+	@Test
+	public void testFullOuterWithSample() throws Exception {
+		super.testFullOuterWithSample();
+	}
+
+	@Test
+	public void testLeftOuterWithSample() throws Exception {
+		super.testLeftOuterWithSample();
+	}
+
+	@Test
+	public void testRightOuterWithSample() throws Exception {
+		super.testRightOuterWithSample();
+	}
+
+	@Test
+	public void testRightSideEmpty() throws Exception {
+		super.testRightSideEmpty();
+	}
+
+	@Test
+	public void testLeftSideEmpty() throws Exception {
+		super.testLeftSideEmpty();
+	}
+
+	@Test
+	public void testFullOuterJoinWithHighNumberOfCommonKeys() {
+		testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.FULL, 200, 500, 2048, 0.02f, 200, 500, 2048, 0.02f);
+	}
+
+	@Test
+	public void testLeftOuterJoinWithHighNumberOfCommonKeys() {
+		testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.LEFT, 200, 10, 4096, 0.02f, 100, 4000, 2048, 0.02f);
+	}
+
+	@Test
+	public void testRightOuterJoinWithHighNumberOfCommonKeys() {
+		testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.RIGHT, 100, 10, 2048, 0.02f, 200, 4000, 4096, 0.02f);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
index 539d864..4ac9093 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators.testutils;
 /**
  * Utility class for keeping track of matches in join operator tests.
  *
- * @see org.apache.flink.runtime.operators.testutils.MatchRemovingMatcher
+ * @see MatchRemovingJoiner
  */
 public class Match {
 	private final String left;

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java
new file mode 100644
index 0000000..e588d92
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.Collection;
+import java.util.Map;
+
+
+public final class MatchRemovingJoiner implements FlatJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple2<Integer,String>> {
+	private static final long serialVersionUID = 1L;
+
+	private final Map<Integer, Collection<Match>> toRemoveFrom;
+
+	public MatchRemovingJoiner(Map<Integer, Collection<Match>> map) {
+		this.toRemoveFrom = map;
+	}
+
+	@Override
+	public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception {
+		final Integer key = rec1 != null ? (Integer) rec1.getField(0) : (Integer) rec2.getField(0);
+		final String value1 = rec1 != null ? (String) rec1.getField(1) : null;
+		final String value2 = rec2 != null ? (String) rec2.getField(1) : null;
+
+		Collection<Match> matches = this.toRemoveFrom.get(key);
+		if (matches == null) {
+			Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
+		}
+
+		boolean contained = matches.remove(new Match(value1, value2));
+		if (!contained) {
+			Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2);
+		}
+		if (matches.isEmpty()) {
+			this.toRemoveFrom.remove(key);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
deleted file mode 100644
index f69b4d7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.testutils;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-import java.util.Collection;
-import java.util.Map;
-
-
-public final class MatchRemovingMatcher implements FlatJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple2<Integer,String>> {
-	private static final long serialVersionUID = 1L;
-
-	private final Map<Integer, Collection<Match>> toRemoveFrom;
-
-	public MatchRemovingMatcher(Map<Integer, Collection<Match>> map) {
-		this.toRemoveFrom = map;
-	}
-
-	@Override
-	public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception {
-		final Integer key = rec1 != null ? (Integer) rec1.getField(0) : (Integer) rec2.getField(0);
-		final String value1 = rec1 != null ? (String) rec1.getField(1) : null;
-		final String value2 = rec2 != null ? (String) rec2.getField(1) : null;
-
-		Collection<Match> matches = this.toRemoveFrom.get(key);
-		if (matches == null) {
-			Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
-		}
-
-		boolean contained = matches.remove(new Match(value1, value2));
-		if (!contained) {
-			Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2);
-		}
-		if (matches.isEmpty()) {
-			this.toRemoveFrom.remove(key);
-		}
-	}
-}


[4/5] flink git commit: [FLINK-2105] [tests] Move duplicate utility classes to testutil package

Posted by fh...@apache.org.
[FLINK-2105] [tests] Move duplicate utility classes to testutil package


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df9f4819
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df9f4819
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df9f4819

Branch: refs/heads/master
Commit: df9f4819b9368600c7531dbf4d4ec42c1cddea8f
Parents: db0b008
Author: r-pogalz <r....@campus.tu-berlin.de>
Authored: Mon Aug 3 12:59:01 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 21:35:27 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/operators/MatchDriver.java    |   8 +-
 .../sort/AbstractMergeInnerJoinIterator.java    | 108 ++++++
 .../sort/AbstractMergeMatchIterator.java        | 107 ------
 .../sort/NonReusingMergeInnerJoinIterator.java  |  59 +++
 .../sort/NonReusingMergeMatchIterator.java      |  59 ---
 .../sort/ReusingMergeInnerJoinIterator.java     |  64 ++++
 .../sort/ReusingMergeMatchIterator.java         |  64 ----
 ...ReusingSortMergeInnerJoinIteratorITCase.java | 318 ++++++++++++++++
 .../NonReusingSortMergeMatchIteratorITCase.java | 371 -------------------
 ...ReusingSortMergeInnerJoinIteratorITCase.java | 318 ++++++++++++++++
 .../ReusingSortMergeMatchIteratorITCase.java    | 371 -------------------
 .../operators/testutils/CollectionIterator.java |  61 +++
 .../runtime/operators/testutils/Match.java      |  63 ++++
 .../testutils/MatchRemovingMatcher.java         |  58 +++
 .../testutils/SimpleTupleJoinFunction.java      |  41 ++
 .../operators/util/HashVsSortMiniBenchmark.java |   6 +-
 16 files changed, 1097 insertions(+), 979 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
index 0381aab..e54fca5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.NonReusingMergeMatchIterator;
+import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.ReusingMergeMatchIterator;
+import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
@@ -126,7 +126,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT
 		if (this.objectReuseEnabled) {
 			switch (ls) {
 				case MERGE:
-					this.matchIterator = new ReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+					this.matchIterator = new ReusingMergeInnerJoinIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
 
 					break;
 				case HYBRIDHASH_BUILD_FIRST:
@@ -141,7 +141,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT
 		} else {
 			switch (ls) {
 				case MERGE:
-					this.matchIterator = new NonReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+					this.matchIterator = new NonReusingMergeInnerJoinIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
 
 					break;
 				case HYBRIDHASH_BUILD_FIRST:

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeInnerJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeInnerJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeInnerJoinIterator.java
new file mode 100644
index 0000000..e9ccf52
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeInnerJoinIterator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * joining through a sort-merge join strategy.
+ */
+public abstract class AbstractMergeInnerJoinIterator<T1, T2, O> extends AbstractMergeIterator<T1, T2, O> {
+
+	public AbstractMergeInnerJoinIterator(
+			MutableObjectIterator<T1> input1, MutableObjectIterator<T2> input2,
+			TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+			TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+			TypePairComparator<T1, T2> pairComparator,
+			MemoryManager memoryManager,
+			IOManager ioManager,
+			int numMemoryPages,
+			AbstractInvokable parentTask)
+			throws MemoryAllocationException {
+		super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+	}
+
+	/**
+	 * Calls the <code>JoinFunction#join()</code> method for all two key-value pairs that share the same key and come
+	 * from different inputs. The output of the <code>join()</code> method is forwarded.
+	 * <p/>
+	 * This method first zig-zags between the two sorted inputs in order to find a common
+	 * key, and then calls the join stub with the cross product of the values.
+	 *
+	 * @throws Exception Forwards all exceptions from the user code and the I/O system.
+	 * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
+	 */
+	@Override
+	public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector)
+			throws Exception {
+		if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
+			// consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon)
+			while (this.iterator1.nextKey()) ;
+			while (this.iterator2.nextKey()) ;
+
+			return false;
+		}
+
+		final TypePairComparator<T1, T2> comparator = this.pairComparator;
+		comparator.setReference(this.iterator1.getCurrent());
+		T2 current2 = this.iterator2.getCurrent();
+
+		// zig zag
+		while (true) {
+			// determine the relation between the (possibly composite) keys
+			final int comp = comparator.compareToReference(current2);
+
+			if (comp == 0) {
+				break;
+			}
+
+			if (comp < 0) {
+				if (!this.iterator2.nextKey()) {
+					return false;
+				}
+				current2 = this.iterator2.getCurrent();
+			} else {
+				if (!this.iterator1.nextKey()) {
+					return false;
+				}
+				comparator.setReference(this.iterator1.getCurrent());
+			}
+		}
+
+		// here, we have a common key! call the join function with the cross product of the
+		// values
+		final Iterator<T1> values1 = this.iterator1.getValues();
+		final Iterator<T2> values2 = this.iterator2.getValues();
+
+		crossMatchingGroup(values1, values2, joinFunction, collector);
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
deleted file mode 100644
index 791494d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.util.Iterator;
-
-/**
- * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
- * matching through a sort-merge join strategy.
- */
-public abstract class AbstractMergeMatchIterator<T1, T2, O> extends AbstractMergeIterator<T1, T2, O> {
-
-	public AbstractMergeMatchIterator(MutableObjectIterator<T1> input1, MutableObjectIterator<T2> input2,
-									TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
-									TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
-									TypePairComparator<T1, T2> pairComparator,
-									MemoryManager memoryManager,
-									IOManager ioManager,
-									int numMemoryPages,
-									AbstractInvokable parentTask)
-			throws MemoryAllocationException {
-		super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
-	}
-
-	/**
-	 * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come
-	 * from different inputs. The output of the <code>match()</code> method is forwarded.
-	 * <p>
-	 * This method first zig-zags between the two sorted inputs in order to find a common
-	 * key, and then calls the match stub with the cross product of the values.
-	 *
-	 * @throws Exception Forwards all exceptions from the user code and the I/O system.
-	 * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector)
-	 */
-	@Override
-	public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector)
-			throws Exception {
-		if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
-			// consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon)
-			while (this.iterator1.nextKey()) ;
-			while (this.iterator2.nextKey()) ;
-
-			return false;
-		}
-
-		final TypePairComparator<T1, T2> comparator = this.pairComparator;
-		comparator.setReference(this.iterator1.getCurrent());
-		T2 current2 = this.iterator2.getCurrent();
-
-		// zig zag
-		while (true) {
-			// determine the relation between the (possibly composite) keys
-			final int comp = comparator.compareToReference(current2);
-
-			if (comp == 0) {
-				break;
-			}
-
-			if (comp < 0) {
-				if (!this.iterator2.nextKey()) {
-					return false;
-				}
-				current2 = this.iterator2.getCurrent();
-			} else {
-				if (!this.iterator1.nextKey()) {
-					return false;
-				}
-				comparator.setReference(this.iterator1.getCurrent());
-			}
-		}
-
-		// here, we have a common key! call the match function with the cross product of the
-		// values
-		final Iterator<T1> values1 = this.iterator1.getValues();
-		final Iterator<T2> values2 = this.iterator2.getValues();
-
-		crossMatchingGroup(values1, values2, matchFunction, collector);
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeInnerJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeInnerJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeInnerJoinIterator.java
new file mode 100644
index 0000000..644084c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeInnerJoinIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
+import org.apache.flink.util.MutableObjectIterator;
+
+
+public class NonReusingMergeInnerJoinIterator<T1, T2, O> extends AbstractMergeInnerJoinIterator<T1, T2, O> {
+
+	public NonReusingMergeInnerJoinIterator(
+			MutableObjectIterator<T1> input1,
+			MutableObjectIterator<T2> input2,
+			TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+			TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+			TypePairComparator<T1, T2> pairComparator,
+			MemoryManager memoryManager,
+			IOManager ioManager,
+			int numMemoryPages,
+			AbstractInvokable parentTask)
+			throws MemoryAllocationException {
+		super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+	}
+
+	@Override
+	protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) {
+		return new NonReusingKeyGroupedIterator<T>(input, comparator);
+	}
+
+	@Override
+	protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) {
+		return serializer.copy(value);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
deleted file mode 100644
index 9705778..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.util.KeyGroupedIterator;
-import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-public class NonReusingMergeMatchIterator<T1, T2, O> extends AbstractMergeMatchIterator<T1, T2, O> {
-
-	public NonReusingMergeMatchIterator(
-			MutableObjectIterator<T1> input1,
-			MutableObjectIterator<T2> input2,
-			TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
-			TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
-			TypePairComparator<T1, T2> pairComparator,
-			MemoryManager memoryManager,
-			IOManager ioManager,
-			int numMemoryPages,
-			AbstractInvokable parentTask)
-			throws MemoryAllocationException {
-		super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
-	}
-
-	@Override
-	protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) {
-		return new NonReusingKeyGroupedIterator<T>(input, comparator);
-	}
-
-	@Override
-	protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) {
-		return serializer.copy(value);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeInnerJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeInnerJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeInnerJoinIterator.java
new file mode 100644
index 0000000..3a1a17a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeInnerJoinIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.KeyGroupedIterator;
+import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
+import org.apache.flink.util.MutableObjectIterator;
+
+
+public class ReusingMergeInnerJoinIterator<T1, T2, O> extends AbstractMergeInnerJoinIterator<T1, T2, O> {
+
+	public ReusingMergeInnerJoinIterator(
+			MutableObjectIterator<T1> input1,
+			MutableObjectIterator<T2> input2,
+			TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
+			TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
+			TypePairComparator<T1, T2> pairComparator,
+			MemoryManager memoryManager,
+			IOManager ioManager,
+			int numMemoryPages,
+			AbstractInvokable parentTask)
+			throws MemoryAllocationException {
+		super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
+
+		this.copy1 = serializer1.createInstance();
+		this.spillHeadCopy = serializer1.createInstance();
+		this.copy2 = serializer2.createInstance();
+		this.blockHeadCopy = serializer2.createInstance();
+	}
+
+	@Override
+	protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) {
+		return new ReusingKeyGroupedIterator<T>(input, serializer, comparator);
+	}
+
+	@Override
+	protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) {
+		return serializer.copy(value, reuse);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
deleted file mode 100644
index c9cf5a2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.util.KeyGroupedIterator;
-import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-public class ReusingMergeMatchIterator<T1, T2, O> extends AbstractMergeMatchIterator<T1, T2, O> {
-
-	public ReusingMergeMatchIterator(
-			MutableObjectIterator<T1> input1,
-			MutableObjectIterator<T2> input2,
-			TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1,
-			TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2,
-			TypePairComparator<T1, T2> pairComparator,
-			MemoryManager memoryManager,
-			IOManager ioManager,
-			int numMemoryPages,
-			AbstractInvokable parentTask)
-			throws MemoryAllocationException {
-		super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask);
-
-		this.copy1 = serializer1.createInstance();
-		this.spillHeadCopy = serializer1.createInstance();
-		this.copy2 = serializer2.createInstance();
-		this.blockHeadCopy = serializer2.createInstance();
-	}
-
-	@Override
-	protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) {
-		return new ReusingKeyGroupedIterator<T>(input, serializer, comparator);
-	}
-
-	@Override
-	protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) {
-		return serializer.copy(value, reuse);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
new file mode 100644
index 0000000..7fc3734
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.*;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+@SuppressWarnings("deprecation")
+public class NonReusingSortMergeInnerJoinIteratorITCase {
+	
+	// total memory
+	private static final int MEMORY_SIZE = 1024 * 1024 * 16;
+	private static final int PAGES_FOR_BNLJN = 2;
+
+	// the size of the left and right inputs
+	private static final int INPUT_1_SIZE = 20000;
+
+	private static final int INPUT_2_SIZE = 1000;
+
+	// random seeds for the left and right input data generators
+	private static final long SEED1 = 561349061987311L;
+
+	private static final long SEED2 = 231434613412342L;
+	
+	// dummy abstract task
+	private final AbstractInvokable parentTask = new DummyInvokable();
+
+	private IOManager ioManager;
+	private MemoryManager memoryManager;
+
+	private TypeSerializer<Tuple2<Integer, String>> serializer1;
+	private TypeSerializer<Tuple2<Integer, String>> serializer2;
+	private TypeComparator<Tuple2<Integer, String>> comparator1;
+	private TypeComparator<Tuple2<Integer, String>> comparator2;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator;
+	
+
+	@SuppressWarnings("unchecked")
+	@Before
+	public void beforeTest() {
+		serializer1 = new TupleSerializer<Tuple2<Integer, String>>(
+				(Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class,
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+		serializer2 = new TupleSerializer<Tuple2<Integer, String>>(
+				(Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class,
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+		comparator1 =  new TupleComparator<Tuple2<Integer, String>>(
+				new int[]{0},
+				new TypeComparator<?>[] { new IntComparator(true) },
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE });
+		comparator2 =  new TupleComparator<Tuple2<Integer, String>>(
+				new int[]{0},
+				new TypeComparator<?>[] { new IntComparator(true) },
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE });
+		pairComparator = new GenericPairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>>(comparator1, comparator2);
+		
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+		this.ioManager = new IOManagerAsync();
+	}
+
+	@After
+	public void afterTest() {
+		if (this.ioManager != null) {
+			this.ioManager.shutdown();
+			if (!this.ioManager.isProperlyShutDown()) {
+				Assert.fail("I/O manager failed to properly shut down.");
+			}
+			this.ioManager = null;
+		}
+		
+		if (this.memoryManager != null) {
+			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+				this.memoryManager.verifyEmpty());
+			this.memoryManager.shutdown();
+			this.memoryManager = null;
+		}
+	}
+
+	@Test
+	public void testMerge() {
+		try {
+
+			final TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+			// collect expected data
+			final Map<Integer, Collection<Match>> expectedMatchesMap = matchValues(
+					collectData(input1),
+					collectData(input2));
+
+			final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
+					new MatchRemovingMatcher(expectedMatchesMap);
+
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
+	
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+	
+			// compare with iterator values
+			NonReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new NonReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>(
+					input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
+					this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+	
+			iterator.open();
+			
+			while (iterator.callWithNextKey(joinFunction, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
+				Assert.assertTrue("Collection for key " + entry.getKey() + " is not empty", entry.getValue().isEmpty());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testMergeWithHighNumberOfCommonKeys()
+	{
+		// the size of the left and right inputs
+		final int INPUT_1_SIZE = 200;
+		final int INPUT_2_SIZE = 100;
+		
+		final int INPUT_1_DUPLICATES = 10;
+		final int INPUT_2_DUPLICATES = 4000;
+		final int DUPLICATE_KEY = 13;
+		
+		try {
+			final TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+
+			final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+			final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
+			
+			// collect expected data
+			final Map<Integer, Collection<Match>> expectedMatchesMap = matchValues(
+				collectData(input1),
+				collectData(input2));
+			
+			// re-create the whole thing for actual processing
+			
+			// reset the generators and iterators
+			generator1.reset();
+			generator2.reset();
+			const1Iter.reset();
+			const2Iter.reset();
+			gen1Iter.reset();
+			gen2Iter.reset();
+			
+			inList1.clear();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+			
+			inList2.clear();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+	
+			input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
+			input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
+			
+			final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = new MatchRemovingMatcher(expectedMatchesMap);
+			
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
+	
+			
+			// we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it
+			// needs to spill for the duplicate keys
+			NonReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new NonReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>(
+					input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
+					this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+	
+			iterator.open();
+			
+			while (iterator.callWithNextKey(joinFunction, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	
+	
+	// --------------------------------------------------------------------------------------------
+	//                                    Utilities
+	// --------------------------------------------------------------------------------------------
+
+	private Map<Integer, Collection<Match>> matchValues(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
+	{
+		Map<Integer, Collection<Match>> map = new HashMap<Integer, Collection<Match>>();
+
+		for (Integer key : leftMap.keySet()) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
+
+			if (rightValues == null) {
+				continue;
+			}
+
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<Match>());
+			}
+
+			Collection<Match> matchedValues = map.get(key);
+
+			for (String leftValue : leftValues) {
+				for (String rightValue : rightValues) {
+					matchedValues.add(new Match(leftValue, rightValue));
+				}
+			}
+		}
+
+		return map;
+	}
+
+
+	private Map<Integer, Collection<String>> collectData(MutableObjectIterator<Tuple2<Integer, String>> iter)
+			throws Exception
+	{
+		Map<Integer, Collection<String>> map = new HashMap<Integer, Collection<String>>();
+		Tuple2<Integer, String> pair = new Tuple2<Integer, String>();
+
+		while ((pair = iter.next(pair)) != null) {
+			final Integer key = pair.getField(0);
+
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<String>());
+			}
+
+			Collection<String> values = map.get(key);
+			final String value = pair.getField(1);
+			values.add(value);
+		}
+
+		return map;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java
deleted file mode 100644
index 757b2e7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class NonReusingSortMergeMatchIteratorITCase {
-	
-	// total memory
-	private static final int MEMORY_SIZE = 1024 * 1024 * 16;
-	private static final int PAGES_FOR_BNLJN = 2;
-
-	// the size of the left and right inputs
-	private static final int INPUT_1_SIZE = 20000;
-
-	private static final int INPUT_2_SIZE = 1000;
-
-	// random seeds for the left and right input data generators
-	private static final long SEED1 = 561349061987311L;
-
-	private static final long SEED2 = 231434613412342L;
-	
-	// dummy abstract task
-	private final AbstractInvokable parentTask = new DummyInvokable();
-
-	private IOManager ioManager;
-	private MemoryManager memoryManager;
-	
-	private TypeSerializer<Record> serializer1;
-	private TypeSerializer<Record> serializer2;
-	private TypeComparator<Record> comparator1;
-	private TypeComparator<Record> comparator2;
-	private TypePairComparator<Record, Record> pairComparator;
-	
-
-	@SuppressWarnings("unchecked")
-	@Before
-	public void beforeTest() {
-		this.serializer1 = RecordSerializer.get();
-		this.serializer2 = RecordSerializer.get();
-		this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
-		
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-		this.ioManager = new IOManagerAsync();
-	}
-
-	@After
-	public void afterTest() {
-		if (this.ioManager != null) {
-			this.ioManager.shutdown();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
-			this.ioManager = null;
-		}
-		
-		if (this.memoryManager != null) {
-			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
-				this.memoryManager.verifyEmpty());
-			this.memoryManager.shutdown();
-			this.memoryManager = null;
-		}
-	}
-
-
-	
-	@Test
-	public void testMerge() {
-		try {
-			
-			final TestData.Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-			final TestData.Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
-			
-			// collect expected data
-			final Map<TestData.Key, Collection<Match>> expectedMatchesMap = matchValues(
-				collectData(input1),
-				collectData(input2));
-			
-			final JoinFunction matcher = new MatchRemovingMatcher(expectedMatchesMap);
-			
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
-	
-			// reset the generators
-			generator1.reset();
-			generator2.reset();
-			input1.reset();
-			input2.reset();
-	
-			// compare with iterator values
-			NonReusingMergeMatchIterator<Record, Record, Record> iterator =
-				new NonReusingMergeMatchIterator<Record, Record, Record>(
-					input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
-					this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
-	
-			iterator.open();
-			
-			while (iterator.callWithNextKey(matcher, collector));
-			
-			iterator.close();
-	
-			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
-				Assert.assertTrue("Collection for key " + entry.getKey() + " is not empty", entry.getValue().isEmpty());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testMergeWithHighNumberOfCommonKeys()
-	{
-		// the size of the left and right inputs
-		final int INPUT_1_SIZE = 200;
-		final int INPUT_2_SIZE = 100;
-		
-		final int INPUT_1_DUPLICATES = 10;
-		final int INPUT_2_DUPLICATES = 4000;
-		final int DUPLICATE_KEY = 13;
-		
-		try {
-			final TestData.Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-			final TestData.Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-			
-			final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
-			
-			final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
-			final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
-			
-			final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
-			inList1.add(gen1Iter);
-			inList1.add(const1Iter);
-			
-			final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
-			inList2.add(gen2Iter);
-			inList2.add(const2Iter);
-			
-			MutableObjectIterator<Record> input1 = new MergeIterator<Record>(inList1, comparator1.duplicate());
-			MutableObjectIterator<Record> input2 = new MergeIterator<Record>(inList2, comparator2.duplicate());
-			
-			// collect expected data
-			final Map<TestData.Key, Collection<Match>> expectedMatchesMap = matchValues(
-				collectData(input1),
-				collectData(input2));
-			
-			// re-create the whole thing for actual processing
-			
-			// reset the generators and iterators
-			generator1.reset();
-			generator2.reset();
-			const1Iter.reset();
-			const2Iter.reset();
-			gen1Iter.reset();
-			gen2Iter.reset();
-			
-			inList1.clear();
-			inList1.add(gen1Iter);
-			inList1.add(const1Iter);
-			
-			inList2.clear();
-			inList2.add(gen2Iter);
-			inList2.add(const2Iter);
-	
-			input1 = new MergeIterator<Record>(inList1, comparator1.duplicate());
-			input2 = new MergeIterator<Record>(inList2, comparator2.duplicate());
-			
-			final JoinFunction matcher = new MatchRemovingMatcher(expectedMatchesMap);
-			
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
-	
-			
-			// we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it
-			// needs to spill for the duplicate keys
-			NonReusingMergeMatchIterator<Record, Record, Record> iterator =
-				new NonReusingMergeMatchIterator<Record, Record, Record>(
-					input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
-					this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
-	
-			iterator.open();
-			
-			while (iterator.callWithNextKey(matcher, collector));
-			
-			iterator.close();
-	
-			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
-				if (!entry.getValue().isEmpty()) {
-					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
-				}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-	
-	
-	
-	// --------------------------------------------------------------------------------------------
-	//                                    Utilities
-	// --------------------------------------------------------------------------------------------
-
-	private Map<TestData.Key, Collection<Match>> matchValues(
-			Map<TestData.Key, Collection<TestData.Value>> leftMap,
-			Map<TestData.Key, Collection<TestData.Value>> rightMap)
-	{
-		Map<TestData.Key, Collection<Match>> map = new HashMap<TestData.Key, Collection<Match>>();
-
-		for (TestData.Key key : leftMap.keySet()) {
-			Collection<TestData.Value> leftValues = leftMap.get(key);
-			Collection<TestData.Value> rightValues = rightMap.get(key);
-
-			if (rightValues == null) {
-				continue;
-			}
-
-			if (!map.containsKey(key)) {
-				map.put(key, new ArrayList<Match>());
-			}
-
-			Collection<Match> matchedValues = map.get(key);
-
-			for (TestData.Value leftValue : leftValues) {
-				for (TestData.Value rightValue : rightValues) {
-					matchedValues.add(new Match(leftValue, rightValue));
-				}
-			}
-		}
-
-		return map;
-	}
-
-	
-	private Map<TestData.Key, Collection<TestData.Value>> collectData(MutableObjectIterator<Record> iter)
-	throws Exception
-	{
-		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
-		Record pair = new Record();
-		
-		while ((pair = iter.next(pair)) != null) {
-			TestData.Key key = pair.getField(0, TestData.Key.class);
-			
-			if (!map.containsKey(key)) {
-				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
-			}
-
-			Collection<TestData.Value> values = map.get(key);
-			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
-		}
-
-		return map;
-	}
-
-	/**
-	 * Private class used for storage of the expected matches in a hashmap.
-	 */
-	private static class Match {
-		private final Value left;
-
-		private final Value right;
-
-		public Match(Value left, Value right) {
-			this.left = left;
-			this.right = right;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			Match o = (Match) obj;
-			return this.left.equals(o.left) && this.right.equals(o.right);
-		}
-		
-		@Override
-		public int hashCode() {
-			return this.left.hashCode() ^ this.right.hashCode();
-		}
-
-		@Override
-		public String toString() {
-			return left + ", " + right;
-		}
-	}
-	
-	private static final class MatchRemovingMatcher extends JoinFunction {
-		private static final long serialVersionUID = 1L;
-		
-		private final Map<TestData.Key, Collection<Match>> toRemoveFrom;
-		
-		protected MatchRemovingMatcher(Map<TestData.Key, Collection<Match>> map) {
-			this.toRemoveFrom = map;
-		}
-		
-		@Override
-		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
-			TestData.Key key = rec1.getField(0, TestData.Key.class);
-			TestData.Value value1 = rec1.getField(1, TestData.Value.class);
-			TestData.Value value2 = rec2.getField(1, TestData.Value.class);
-			
-			Collection<Match> matches = this.toRemoveFrom.get(key);
-			if (matches == null) {
-				Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
-			}
-			
-			boolean contained = matches.remove(new Match(value1, value2));
-			if (!contained) {
-				Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2);
-			}
-			if (matches.isEmpty()) {
-				this.toRemoveFrom.remove(key);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
new file mode 100644
index 0000000..e4eec86
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.*;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+@SuppressWarnings("deprecation")
+public class ReusingSortMergeInnerJoinIteratorITCase {
+
+	// total memory
+	private static final int MEMORY_SIZE = 1024 * 1024 * 16;
+	private static final int PAGES_FOR_BNLJN = 2;
+
+	// the size of the left and right inputs
+	private static final int INPUT_1_SIZE = 20000;
+
+	private static final int INPUT_2_SIZE = 1000;
+
+	// random seeds for the left and right input data generators
+	private static final long SEED1 = 561349061987311L;
+
+	private static final long SEED2 = 231434613412342L;
+
+	// dummy abstract task
+	private final AbstractInvokable parentTask = new DummyInvokable();
+
+	private IOManager ioManager;
+	private MemoryManager memoryManager;
+
+	private TypeSerializer<Tuple2<Integer, String>> serializer1;
+	private TypeSerializer<Tuple2<Integer, String>> serializer2;
+	private TypeComparator<Tuple2<Integer, String>> comparator1;
+	private TypeComparator<Tuple2<Integer, String>> comparator2;
+	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator;
+
+
+	@SuppressWarnings("unchecked")
+	@Before
+	public void beforeTest() {
+		serializer1 = new TupleSerializer<Tuple2<Integer, String>>(
+				(Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class,
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+		serializer2 = new TupleSerializer<Tuple2<Integer, String>>(
+				(Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class,
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+		comparator1 =  new TupleComparator<Tuple2<Integer, String>>(
+				new int[]{0},
+				new TypeComparator<?>[] { new IntComparator(true) },
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE });
+		comparator2 =  new TupleComparator<Tuple2<Integer, String>>(
+				new int[]{0},
+				new TypeComparator<?>[] { new IntComparator(true) },
+				new TypeSerializer<?>[] { IntSerializer.INSTANCE });
+		pairComparator = new GenericPairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>>(comparator1, comparator2);
+
+		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
+		this.ioManager = new IOManagerAsync();
+	}
+
+	@After
+	public void afterTest() {
+		if (this.ioManager != null) {
+			this.ioManager.shutdown();
+			if (!this.ioManager.isProperlyShutDown()) {
+				Assert.fail("I/O manager failed to properly shut down.");
+			}
+			this.ioManager = null;
+		}
+
+		if (this.memoryManager != null) {
+			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+				this.memoryManager.verifyEmpty());
+			this.memoryManager.shutdown();
+			this.memoryManager = null;
+		}
+	}
+
+	@Test
+	public void testMerge() {
+		try {
+
+			final TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+
+			final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+			// collect expected data
+			final Map<Integer, Collection<Match>> expectedMatchesMap = matchValues(
+				collectData(input1),
+				collectData(input2));
+
+			final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction =
+					new MatchRemovingMatcher(expectedMatchesMap);
+
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
+
+			// reset the generators
+			generator1.reset();
+			generator2.reset();
+			input1.reset();
+			input2.reset();
+
+			// compare with iterator values
+			ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>(
+					input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
+					this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+
+			iterator.open();
+
+			while (iterator.callWithNextKey(joinFunction, collector));
+
+			iterator.close();
+
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
+				Assert.assertTrue("Collection for key " + entry.getKey() + " is not empty", entry.getValue().isEmpty());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMergeWithHighNumberOfCommonKeys()
+	{
+		// the size of the left and right inputs
+		final int INPUT_1_SIZE = 200;
+		final int INPUT_2_SIZE = 100;
+
+		final int INPUT_1_DUPLICATES = 10;
+		final int INPUT_2_DUPLICATES = 4000;
+		final int DUPLICATE_KEY = 13;
+
+		try {
+			final TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+			
+			final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+			final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+			
+			final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+			final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+			
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+			
+			final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+			
+			MutableObjectIterator<Tuple2<Integer, String>> input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
+			MutableObjectIterator<Tuple2<Integer, String>> input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
+			
+			// collect expected data
+			final Map<Integer, Collection<Match>> expectedMatchesMap = matchValues(
+				collectData(input1),
+				collectData(input2));
+			
+			// re-create the whole thing for actual processing
+			
+			// reset the generators and iterators
+			generator1.reset();
+			generator2.reset();
+			const1Iter.reset();
+			const2Iter.reset();
+			gen1Iter.reset();
+			gen2Iter.reset();
+			
+			inList1.clear();
+			inList1.add(gen1Iter);
+			inList1.add(const1Iter);
+			
+			inList2.clear();
+			inList2.add(gen2Iter);
+			inList2.add(const2Iter);
+	
+			input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate());
+			input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate());
+			
+			final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new MatchRemovingMatcher(expectedMatchesMap);
+			
+			final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
+	
+			
+			// we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it
+			// needs to spill for the duplicate keys
+			ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+				new ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>(
+					input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
+					this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
+	
+			iterator.open();
+			
+			while (iterator.callWithNextKey(matcher, collector));
+			
+			iterator.close();
+	
+			// assert that each expected match was seen
+			for (Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
+				if (!entry.getValue().isEmpty()) {
+					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+	
+	
+	
+	// --------------------------------------------------------------------------------------------
+	//                                    Utilities
+	// --------------------------------------------------------------------------------------------
+
+	private Map<Integer, Collection<Match>> matchValues(
+			Map<Integer, Collection<String>> leftMap,
+			Map<Integer, Collection<String>> rightMap)
+	{
+		Map<Integer, Collection<Match>> map = new HashMap<Integer, Collection<Match>>();
+
+		for (Integer key : leftMap.keySet()) {
+			Collection<String> leftValues = leftMap.get(key);
+			Collection<String> rightValues = rightMap.get(key);
+
+			if (rightValues == null) {
+				continue;
+			}
+
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<Match>());
+			}
+
+			Collection<Match> matchedValues = map.get(key);
+
+			for (String leftValue : leftValues) {
+				for (String rightValue : rightValues) {
+					matchedValues.add(new Match(leftValue, rightValue));
+				}
+			}
+		}
+
+		return map;
+	}
+
+	
+	private Map<Integer, Collection<String>> collectData(MutableObjectIterator<Tuple2<Integer, String>> iter)
+	throws Exception
+	{
+		Map<Integer, Collection<String>> map = new HashMap<Integer, Collection<String>>();
+		Tuple2<Integer, String> pair = new Tuple2<Integer, String>();
+		
+		while ((pair = iter.next(pair)) != null) {
+			final Integer key = pair.getField(0);
+			
+			if (!map.containsKey(key)) {
+				map.put(key, new ArrayList<String>());
+			}
+
+			Collection<String> values = map.get(key);
+			final String value = pair.getField(1);
+			values.add(value);
+		}
+
+		return map;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java
deleted file mode 100644
index 474fa3c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.sort;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-@SuppressWarnings("deprecation")
-public class ReusingSortMergeMatchIteratorITCase {
-
-	// total memory
-	private static final int MEMORY_SIZE = 1024 * 1024 * 16;
-	private static final int PAGES_FOR_BNLJN = 2;
-
-	// the size of the left and right inputs
-	private static final int INPUT_1_SIZE = 20000;
-
-	private static final int INPUT_2_SIZE = 1000;
-
-	// random seeds for the left and right input data generators
-	private static final long SEED1 = 561349061987311L;
-
-	private static final long SEED2 = 231434613412342L;
-
-	// dummy abstract task
-	private final AbstractInvokable parentTask = new DummyInvokable();
-
-	private IOManager ioManager;
-	private MemoryManager memoryManager;
-
-	private TypeSerializer<Record> serializer1;
-	private TypeSerializer<Record> serializer2;
-	private TypeComparator<Record> comparator1;
-	private TypeComparator<Record> comparator2;
-	private TypePairComparator<Record, Record> pairComparator;
-
-
-	@SuppressWarnings("unchecked")
-	@Before
-	public void beforeTest() {
-		this.serializer1 = RecordSerializer.get();
-		this.serializer2 = RecordSerializer.get();
-		this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
-		this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
-
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-		this.ioManager = new IOManagerAsync();
-	}
-
-	@After
-	public void afterTest() {
-		if (this.ioManager != null) {
-			this.ioManager.shutdown();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
-			this.ioManager = null;
-		}
-
-		if (this.memoryManager != null) {
-			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
-				this.memoryManager.verifyEmpty());
-			this.memoryManager.shutdown();
-			this.memoryManager = null;
-		}
-	}
-
-
-
-	@Test
-	public void testMerge() {
-		try {
-
-			final Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-			final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-
-			final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
-
-			// collect expected data
-			final Map<TestData.Key, Collection<Match>> expectedMatchesMap = matchValues(
-				collectData(input1),
-				collectData(input2));
-
-			final JoinFunction matcher = new MatchRemovingMatcher(expectedMatchesMap);
-
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
-
-			// reset the generators
-			generator1.reset();
-			generator2.reset();
-			input1.reset();
-			input2.reset();
-
-			// compare with iterator values
-			ReusingMergeMatchIterator<Record, Record, Record> iterator =
-				new ReusingMergeMatchIterator<Record, Record, Record>(
-					input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
-					this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
-
-			iterator.open();
-
-			while (iterator.callWithNextKey(matcher, collector));
-
-			iterator.close();
-
-			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
-				Assert.assertTrue("Collection for key " + entry.getKey() + " is not empty", entry.getValue().isEmpty());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-
-	@Test
-	public void testMergeWithHighNumberOfCommonKeys()
-	{
-		// the size of the left and right inputs
-		final int INPUT_1_SIZE = 200;
-		final int INPUT_2_SIZE = 100;
-
-		final int INPUT_1_DUPLICATES = 10;
-		final int INPUT_2_DUPLICATES = 4000;
-		final int DUPLICATE_KEY = 13;
-
-		try {
-			final Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-			final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
-			
-			final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
-			final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
-			
-			final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
-			final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
-			
-			final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
-			inList1.add(gen1Iter);
-			inList1.add(const1Iter);
-			
-			final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
-			inList2.add(gen2Iter);
-			inList2.add(const2Iter);
-			
-			MutableObjectIterator<Record> input1 = new MergeIterator<Record>(inList1, comparator1.duplicate());
-			MutableObjectIterator<Record> input2 = new MergeIterator<Record>(inList2, comparator2.duplicate());
-			
-			// collect expected data
-			final Map<TestData.Key, Collection<Match>> expectedMatchesMap = matchValues(
-				collectData(input1),
-				collectData(input2));
-			
-			// re-create the whole thing for actual processing
-			
-			// reset the generators and iterators
-			generator1.reset();
-			generator2.reset();
-			const1Iter.reset();
-			const2Iter.reset();
-			gen1Iter.reset();
-			gen2Iter.reset();
-			
-			inList1.clear();
-			inList1.add(gen1Iter);
-			inList1.add(const1Iter);
-			
-			inList2.clear();
-			inList2.add(gen2Iter);
-			inList2.add(const2Iter);
-	
-			input1 = new MergeIterator<Record>(inList1, comparator1.duplicate());
-			input2 = new MergeIterator<Record>(inList2, comparator2.duplicate());
-			
-			final JoinFunction matcher = new MatchRemovingMatcher(expectedMatchesMap);
-			
-			final Collector<Record> collector = new DiscardingOutputCollector<Record>();
-	
-			
-			// we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it
-			// needs to spill for the duplicate keys
-			ReusingMergeMatchIterator<Record, Record, Record> iterator =
-				new ReusingMergeMatchIterator<Record, Record, Record>(
-					input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
-					this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask);
-	
-			iterator.open();
-			
-			while (iterator.callWithNextKey(matcher, collector));
-			
-			iterator.close();
-	
-			// assert that each expected match was seen
-			for (Entry<TestData.Key, Collection<Match>> entry : expectedMatchesMap.entrySet()) {
-				if (!entry.getValue().isEmpty()) {
-					Assert.fail("Collection for key " + entry.getKey() + " is not empty");
-				}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-	
-	
-	
-	// --------------------------------------------------------------------------------------------
-	//                                    Utilities
-	// --------------------------------------------------------------------------------------------
-
-	private Map<TestData.Key, Collection<Match>> matchValues(
-			Map<TestData.Key, Collection<TestData.Value>> leftMap,
-			Map<TestData.Key, Collection<TestData.Value>> rightMap)
-	{
-		Map<TestData.Key, Collection<Match>> map = new HashMap<TestData.Key, Collection<Match>>();
-
-		for (TestData.Key key : leftMap.keySet()) {
-			Collection<TestData.Value> leftValues = leftMap.get(key);
-			Collection<TestData.Value> rightValues = rightMap.get(key);
-
-			if (rightValues == null) {
-				continue;
-			}
-
-			if (!map.containsKey(key)) {
-				map.put(key, new ArrayList<Match>());
-			}
-
-			Collection<Match> matchedValues = map.get(key);
-
-			for (TestData.Value leftValue : leftValues) {
-				for (TestData.Value rightValue : rightValues) {
-					matchedValues.add(new Match(leftValue, rightValue));
-				}
-			}
-		}
-
-		return map;
-	}
-
-	
-	private Map<TestData.Key, Collection<TestData.Value>> collectData(MutableObjectIterator<Record> iter)
-	throws Exception
-	{
-		Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
-		Record pair = new Record();
-		
-		while ((pair = iter.next(pair)) != null) {
-			TestData.Key key = pair.getField(0, TestData.Key.class);
-			
-			if (!map.containsKey(key)) {
-				map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
-			}
-
-			Collection<TestData.Value> values = map.get(key);
-			values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
-		}
-
-		return map;
-	}
-
-	/**
-	 * Private class used for storage of the expected matches in a hashmap.
-	 */
-	private static class Match {
-		private final Value left;
-
-		private final Value right;
-
-		public Match(Value left, Value right) {
-			this.left = left;
-			this.right = right;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			Match o = (Match) obj;
-			return this.left.equals(o.left) && this.right.equals(o.right);
-		}
-		
-		@Override
-		public int hashCode() {
-			return this.left.hashCode() ^ this.right.hashCode();
-		}
-
-		@Override
-		public String toString() {
-			return left + ", " + right;
-		}
-	}
-	
-	private static final class MatchRemovingMatcher extends JoinFunction {
-		private static final long serialVersionUID = 1L;
-		
-		private final Map<TestData.Key, Collection<Match>> toRemoveFrom;
-		
-		protected MatchRemovingMatcher(Map<TestData.Key, Collection<Match>> map) {
-			this.toRemoveFrom = map;
-		}
-		
-		@Override
-		public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
-			TestData.Key key = rec1.getField(0, TestData.Key.class);
-			TestData.Value value1 = rec1.getField(1, TestData.Value.class);
-			TestData.Value value2 = rec2.getField(1, TestData.Value.class);
-			
-			Collection<Match> matches = this.toRemoveFrom.get(key);
-			if (matches == null) {
-				Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
-			}
-			
-			boolean contained = matches.remove(new Match(value1, value2));
-			if (!contained) {
-				Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2);
-			}
-			if (matches.isEmpty()) {
-				this.toRemoveFrom.remove(key);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/CollectionIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/CollectionIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/CollectionIterator.java
new file mode 100644
index 0000000..7fd1b6c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/CollectionIterator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.runtime.util.ResettableMutableObjectIterator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+
+
+public class CollectionIterator<T> implements ResettableMutableObjectIterator<T> {
+
+	private final Collection<T> collection;
+	private Iterator<T> iterator;
+
+	public CollectionIterator(Collection<T> collection) {
+		this.collection = collection;
+		this.iterator = collection.iterator();
+	}
+
+	@Override
+	public T next(T reuse) throws IOException {
+		return next();
+	}
+
+	@Override
+	public T next() throws IOException {
+		if (!iterator.hasNext()) {
+			return null;
+		} else {
+			return iterator.next();
+		}
+	}
+
+	@Override
+	public void reset() throws IOException {
+		iterator = collection.iterator();
+	}
+
+	public static <T> CollectionIterator<T> of(T... values) {
+		return new CollectionIterator<T>(Arrays.asList(values));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
new file mode 100644
index 0000000..539d864
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+/**
+ * Utility class for keeping track of matches in join operator tests.
+ *
+ * @see org.apache.flink.runtime.operators.testutils.MatchRemovingMatcher
+ */
+public class Match {
+	private final String left;
+
+	private final String right;
+
+	public Match(String left, String right) {
+		this.left = left;
+		this.right = right;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		Match o = (Match) obj;
+		if (left == null && o.left == null && right.equals(o.right)) {
+			return true;
+		} else if (right == null && o.right == null && left.equals(o.left)) {
+			return true;
+		} else {
+			return this.left.equals(o.left) && this.right.equals(o.right);
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		if (left == null) {
+			return right.hashCode();
+		} else if (right == null) {
+			return left.hashCode();
+		} else {
+			return this.left.hashCode() ^ this.right.hashCode();
+		}
+	}
+
+	@Override
+	public String toString() {
+		return left + ", " + right;
+	}
+}


[3/5] flink git commit: [FLINK-2105] [tests] Move duplicate utility classes to testutil package

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
new file mode 100644
index 0000000..f69b4d7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.Collection;
+import java.util.Map;
+
+
+public final class MatchRemovingMatcher implements FlatJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple2<Integer,String>> {
+	private static final long serialVersionUID = 1L;
+
+	private final Map<Integer, Collection<Match>> toRemoveFrom;
+
+	public MatchRemovingMatcher(Map<Integer, Collection<Match>> map) {
+		this.toRemoveFrom = map;
+	}
+
+	@Override
+	public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception {
+		final Integer key = rec1 != null ? (Integer) rec1.getField(0) : (Integer) rec2.getField(0);
+		final String value1 = rec1 != null ? (String) rec1.getField(1) : null;
+		final String value2 = rec2 != null ? (String) rec2.getField(1) : null;
+
+		Collection<Match> matches = this.toRemoveFrom.get(key);
+		if (matches == null) {
+			Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
+		}
+
+		boolean contained = matches.remove(new Match(value1, value2));
+		if (!contained) {
+			Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2);
+		}
+		if (matches.isEmpty()) {
+			this.toRemoveFrom.remove(key);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/SimpleTupleJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/SimpleTupleJoinFunction.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/SimpleTupleJoinFunction.java
new file mode 100644
index 0000000..06a62e5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/SimpleTupleJoinFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.util.Collector;
+
+/**
+ * Simple flat join function that joins two binary tuples and considers null cases.
+ */
+public class SimpleTupleJoinFunction implements FlatJoinFunction<Tuple2<String, String>, Tuple2<String, Integer>, Tuple4<String, String, String, Object>> {
+
+	@Override
+	public void join(Tuple2<String, String> first, Tuple2<String, Integer> second, Collector<Tuple4<String, String, String, Object>> out) throws Exception {
+		if (first == null) {
+			out.collect(new Tuple4<String, String, String, Object>(null, null, second.f0, second.f1));
+		} else if (second == null) {
+			out.collect(new Tuple4<String, String, String, Object>(first.f0, first.f1, null, null));
+		} else {
+			out.collect(new Tuple4<String, String, String, Object>(first.f0, first.f1, second.f0, second.f1));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 5d1ce7f..38d9992 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.ReusingMergeMatchIterator;
+import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
@@ -143,8 +143,8 @@ public class HashVsSortMiniBenchmark {
 			final MutableObjectIterator<Record> sortedInput2 = sorter2.getIterator();
 			
 			// compare with iterator values
-			ReusingMergeMatchIterator<Record, Record, Record> iterator =
-				new ReusingMergeMatchIterator<Record, Record, Record>(sortedInput1, sortedInput2,
+			ReusingMergeInnerJoinIterator<Record, Record, Record> iterator =
+				new ReusingMergeInnerJoinIterator<Record, Record, Record>(sortedInput1, sortedInput2,
 						this.serializer1.getSerializer(), this.comparator1, this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
 						this.memoryManager, this.ioManager, MEMORY_PAGES_FOR_MERGE, this.parentTask);
 			


[2/5] flink git commit: [FLINK-2105] Add support for sorted but sparse test data generation

Posted by fh...@apache.org.
[FLINK-2105] Add support for sorted but sparse test data generation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db0b0087
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db0b0087
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db0b0087

Branch: refs/heads/master
Commit: db0b0087b02985f55bcc6e65571b11ca33b0886f
Parents: 0dc6849
Author: Johann Kovacs <me...@jkovacs.de>
Authored: Fri Jul 10 17:26:05 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 21:35:27 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/testutils/TestData.java   | 207 +++++++++++++++++++
 1 file changed, 207 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db0b0087/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
index fd34a3b..8688d4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators.testutils;
 import java.util.Comparator;
 import java.util.Random;
 
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
@@ -279,6 +281,169 @@ public final class TestData {
 			this.counter = 0;
 		}
 	}
+
+	/**
+	 * Tuple2<Integer, String> generator.
+	 */
+	public static class TupleGenerator implements MutableObjectIterator<Tuple2<Integer, String>> {
+
+		public enum KeyMode {
+			SORTED, RANDOM, SORTED_SPARSE
+		};
+
+		public enum ValueMode {
+			FIX_LENGTH, RANDOM_LENGTH, CONSTANT
+		};
+
+		private static char[] alpha = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'a', 'b', 'c',
+				'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm' };
+
+		private final long seed;
+
+		private final int keyMax;
+
+		private final float keyDensity;
+
+		private final int valueLength;
+
+		private final KeyMode keyMode;
+
+		private final ValueMode valueMode;
+
+		private Random random;
+
+		private int counter;
+
+		private int key;
+		private String value;
+
+		public TupleGenerator(long seed, int keyMax, int valueLength) {
+			this(seed, keyMax, valueLength, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+		}
+
+		public TupleGenerator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode) {
+			this(seed, keyMax, valueLength, keyMode, valueMode, null);
+		}
+
+		public TupleGenerator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode, String constant) {
+			this(seed, keyMax, 1.0f, valueLength, keyMode, valueMode, constant);
+		}
+
+		public TupleGenerator(long seed, int keyMax, float keyDensity, int valueLength, KeyMode keyMode, ValueMode valueMode, String constant) {
+			this.seed = seed;
+			this.keyMax = keyMax;
+			this.keyDensity = keyDensity;
+			this.valueLength = valueLength;
+			this.keyMode = keyMode;
+			this.valueMode = valueMode;
+
+			this.random = new Random(seed);
+			this.counter = 0;
+
+			this.value = constant == null ? null : constant;
+		}
+
+		public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
+			this.key = nextKey();
+			if (this.valueMode != ValueMode.CONSTANT) {
+				this.value = randomString();
+			}
+			reuse.setFields(this.key, this.value);
+			return reuse;
+		}
+
+		public Tuple2<Integer, String> next() {
+			return next(new Tuple2<Integer, String>());
+		}
+
+		public boolean next(org.apache.flink.types.Value[] target) {
+			this.key = nextKey();
+			// TODO change this to something proper
+			((IntValue)target[0]).setValue(this.key);
+			((IntValue)target[1]).setValue(random.nextInt());
+			return true;
+		}
+
+		private int nextKey() {
+			if (keyMode == KeyMode.SORTED) {
+				return ++counter;
+			} else if (keyMode == KeyMode.SORTED_SPARSE) {
+				int max = (int) (1 / keyDensity);
+				counter += random.nextInt(max) + 1;
+				return counter;
+			} else {
+				return Math.abs(random.nextInt() % keyMax) + 1;
+			}
+		}
+
+		public void reset() {
+			this.random = new Random(seed);
+			this.counter = 0;
+		}
+
+		private String randomString() {
+			int length;
+
+			if (valueMode == ValueMode.FIX_LENGTH) {
+				length = valueLength;
+			} else {
+				length = valueLength - random.nextInt(valueLength / 3);
+			}
+
+			StringBuilder sb = new StringBuilder();
+			for (int i = 0; i < length; i++) {
+				sb.append(alpha[random.nextInt(alpha.length)]);
+			}
+			return sb.toString();
+		}
+
+	}
+
+
+	/**
+	 * Record reader mock.
+	 */
+	public static class TupleGeneratorIterator implements MutableObjectIterator<Tuple2<Integer, String>> {
+
+		private final TupleGenerator generator;
+
+		private final int numberOfRecords;
+
+		private int counter;
+
+		public TupleGeneratorIterator(TupleGenerator generator, int numberOfRecords) {
+			this.generator = generator;
+			this.generator.reset();
+			this.numberOfRecords = numberOfRecords;
+			this.counter = 0;
+		}
+
+		@Override
+		public Tuple2<Integer, String> next(Tuple2<Integer, String> target) {
+			if (counter < numberOfRecords) {
+				counter++;
+				return generator.next(target);
+			}
+			else {
+				return null;
+			}
+		}
+
+		@Override
+		public Tuple2<Integer, String> next() {
+			if (counter < numberOfRecords) {
+				counter++;
+				return generator.next();
+			}
+			else {
+				return null;
+			}
+		}
+
+		public void reset() {
+			this.counter = 0;
+		}
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -325,4 +490,46 @@ public final class TestData {
 			this.pos = 0;
 		}
 	}
+
+	public static class TupleConstantValueIterator implements MutableObjectIterator<Tuple2<Integer, String>> {
+
+		private int key;
+		private String value;
+
+		private final String valueValue;
+
+
+		private final int numPairs;
+
+		private int pos;
+
+
+		public TupleConstantValueIterator(int keyValue, String valueValue, int numPairs) {
+			this.key = keyValue;
+			this.valueValue = valueValue;
+			this.numPairs = numPairs;
+		}
+
+		@Override
+		public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
+			if (pos < this.numPairs) {
+				this.value = this.valueValue + ' ' + pos;
+				reuse.setFields(this.key, this.value);
+				pos++;
+				return reuse;
+			}
+			else {
+				return null;
+			}
+		}
+
+		@Override
+		public Tuple2<Integer, String> next() {
+			return next(new Tuple2<Integer, String>());
+		}
+
+		public void reset() {
+			this.pos = 0;
+		}
+	}
 }