You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:15 UTC

[09/10] flink git commit: [FLINK-2107] Add hash-based strategies for left and right outer joins.

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java
new file mode 100644
index 0000000..3b940c2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
+ * internally to match the records with equal key. The build side of the hash is the first input of the match.
+ * This implementation DOES NOT reuse objects.
+ */
+public class NonReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIteratorBase implements JoinTaskIterator<V1, V2, O> {
+
+	protected final MutableHashTable<V1, V2> hashJoin;
+
+	protected final TypeSerializer<V2> probeSideSerializer;
+
+	private final MemoryManager memManager;
+
+	private final MutableObjectIterator<V1> firstInput;
+
+	private final MutableObjectIterator<V2> secondInput;
+
+	private final boolean joinWithEmptyBuildSide;
+
+	private volatile boolean running = true;
+
+	// --------------------------------------------------------------------------------------------
+
+	public NonReusingBuildFirstHashJoinIterator(
+			MutableObjectIterator<V1> firstInput,
+			MutableObjectIterator<V2> secondInput,
+			TypeSerializer<V1> serializer1,
+			TypeComparator<V1> comparator1,
+			TypeSerializer<V2> serializer2,
+			TypeComparator<V2> comparator2,
+			TypePairComparator<V2, V1> pairComparator,
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean joinWithEmptyBuildSide,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
+		this.memManager = memManager;
+		this.firstInput = firstInput;
+		this.secondInput = secondInput;
+		this.probeSideSerializer = serializer2;
+
+		if(useBitmapFilters && joinWithEmptyBuildSide) {
+			throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
+		}
+		this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
+
+		this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2,
+				pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void open() throws IOException, MemoryAllocationException, InterruptedException {
+		this.hashJoin.open(this.firstInput, this.secondInput);
+	}
+	
+
+	@Override
+	public void close() {
+		// close the join
+		this.hashJoin.close();
+		
+		// free the memory
+		final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
+		this.memManager.release(segments);
+	}
+
+	@Override
+	public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
+	throws Exception
+	{
+		if (this.hashJoin.nextRecord())
+		{
+			// we have a next record, get the iterators to the probe and build side values
+			final MutableHashTable.HashBucketIterator<V1, V2> buildSideIterator = this.hashJoin.getBuildSideIterator();
+			V1 nextBuildSideRecord;
+			
+			// get the first build side value
+			if ((nextBuildSideRecord = buildSideIterator.next()) != null) {
+				V1 tmpRec;
+				final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
+				
+				// check if there is another build-side value
+				if ((tmpRec = buildSideIterator.next()) != null) {
+					// more than one build-side value --> copy the probe side
+					V2 probeCopy;
+					probeCopy = this.probeSideSerializer.copy(probeRecord);
+					
+					// call match on the first pair
+					matchFunction.join(nextBuildSideRecord, probeCopy, collector);
+					
+					// call match on the second pair
+					probeCopy = this.probeSideSerializer.copy(probeRecord);
+					matchFunction.join(tmpRec, probeCopy, collector);
+					
+					while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) {
+						// call match on the next pair
+						// make sure we restore the value of the probe side record
+						probeCopy = this.probeSideSerializer.copy(probeRecord);
+						matchFunction.join(nextBuildSideRecord, probeCopy, collector);
+					}
+				}
+				else {
+					// only single pair matches
+					matchFunction.join(nextBuildSideRecord, probeRecord, collector);
+				}
+			}
+			else if(joinWithEmptyBuildSide) {
+				// build side is empty, join with null
+				final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
+
+				matchFunction.join(null, probeRecord, collector);
+			}
+			return true;
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public void abort() {
+		this.running = false;
+		this.hashJoin.abort();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
deleted file mode 100644
index dbdb5b2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.io.IOException;
-import java.util.List;
-
-
-/**
- * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
- * internally to match the records with equal key. The build side of the hash is the first input of the match.
- * This implementation DOES NOT reuse objects.
- */
-public class NonReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchIteratorBase implements JoinTaskIterator<V1, V2, O> {
-
-	protected final MutableHashTable<V1, V2> hashJoin;
-
-	protected final TypeSerializer<V2> probeSideSerializer;
-
-	private final MemoryManager memManager;
-
-	private final MutableObjectIterator<V1> firstInput;
-
-	private final MutableObjectIterator<V2> secondInput;
-
-	private volatile boolean running = true;
-
-	// --------------------------------------------------------------------------------------------
-
-	public NonReusingBuildFirstHashMatchIterator(
-			MutableObjectIterator<V1> firstInput,
-			MutableObjectIterator<V2> secondInput,
-			TypeSerializer<V1> serializer1,
-			TypeComparator<V1> comparator1,
-			TypeSerializer<V2> serializer2,
-			TypeComparator<V2> comparator2,
-			TypePairComparator<V2, V1> pairComparator,
-			MemoryManager memManager, IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBitmapFilters) throws MemoryAllocationException {
-		
-		this.memManager = memManager;
-		this.firstInput = firstInput;
-		this.secondInput = secondInput;
-		this.probeSideSerializer = serializer2;
-
-		this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2,
-				pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void open() throws IOException, MemoryAllocationException, InterruptedException {
-		this.hashJoin.open(this.firstInput, this.secondInput);
-	}
-	
-
-	@Override
-	public void close() {
-		// close the join
-		this.hashJoin.close();
-		
-		// free the memory
-		final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
-		this.memManager.release(segments);
-	}
-
-	@Override
-	public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
-	throws Exception
-	{
-		if (this.hashJoin.nextRecord())
-		{
-			// we have a next record, get the iterators to the probe and build side values
-			final MutableHashTable.HashBucketIterator<V1, V2> buildSideIterator = this.hashJoin.getBuildSideIterator();
-			V1 nextBuildSideRecord;
-			
-			// get the first build side value
-			if ((nextBuildSideRecord = buildSideIterator.next()) != null) {
-				V1 tmpRec;
-				final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
-				
-				// check if there is another build-side value
-				if ((tmpRec = buildSideIterator.next()) != null) {
-					// more than one build-side value --> copy the probe side
-					V2 probeCopy;
-					probeCopy = this.probeSideSerializer.copy(probeRecord);
-					
-					// call match on the first pair
-					matchFunction.join(nextBuildSideRecord, probeCopy, collector);
-					
-					// call match on the second pair
-					probeCopy = this.probeSideSerializer.copy(probeRecord);
-					matchFunction.join(tmpRec, probeCopy, collector);
-					
-					while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) {
-						// call match on the next pair
-						// make sure we restore the value of the probe side record
-						probeCopy = this.probeSideSerializer.copy(probeRecord);
-						matchFunction.join(nextBuildSideRecord, probeCopy, collector);
-					}
-				}
-				else {
-					// only single pair matches
-					matchFunction.join(nextBuildSideRecord, probeRecord, collector);
-				}
-			}
-			return true;
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public void abort() {
-		this.running = false;
-		this.hashJoin.abort();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java
new file mode 100644
index 0000000..77521af
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+
+public class NonReusingBuildFirstReOpenableHashJoinIterator<V1, V2, O> extends NonReusingBuildFirstHashJoinIterator<V1, V2, O> {
+
+
+	private final ReOpenableMutableHashTable<V1, V2> reopenHashTable;
+
+	public NonReusingBuildFirstReOpenableHashJoinIterator(
+			MutableObjectIterator<V1> firstInput,
+			MutableObjectIterator<V2> secondInput,
+			TypeSerializer<V1> serializer1,
+			TypeComparator<V1> comparator1,
+			TypeSerializer<V2> serializer2,
+			TypeComparator<V2> comparator2,
+			TypePairComparator<V2, V1> pairComparator,
+			MemoryManager memManager,
+			IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean joinWithEmptyBuildSide,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
+		super(firstInput, secondInput, serializer1, comparator1, serializer2,
+				comparator2, pairComparator, memManager, ioManager, ownerTask,
+				memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
+		
+		reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
+	}
+
+	@Override
+	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+			TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
+			TypePairComparator<PT, BT> pairComparator,
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
+		final int numPages = memManager.computeNumberOfPages(memoryFraction);
+		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
+		
+		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager, useBitmapFilters);
+	}
+
+	/**
+	 * Set new input for probe side
+	 * @throws java.io.IOException
+	 */
+	public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
+		reopenHashTable.reopenProbe(probeInput);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
deleted file mode 100644
index b51c3b1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.io.IOException;
-import java.util.List;
-
-public class NonReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends NonReusingBuildFirstHashMatchIterator<V1, V2, O> {
-
-
-	private final ReOpenableMutableHashTable<V1, V2> reopenHashTable;
-
-	public NonReusingBuildFirstReOpenableHashMatchIterator(
-			MutableObjectIterator<V1> firstInput,
-			MutableObjectIterator<V2> secondInput,
-			TypeSerializer<V1> serializer1,
-			TypeComparator<V1> comparator1,
-			TypeSerializer<V2> serializer2,
-			TypeComparator<V2> comparator2,
-			TypePairComparator<V2, V1> pairComparator,
-			MemoryManager memManager,
-			IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBitmapFilters) throws MemoryAllocationException {
-		
-		super(firstInput, secondInput, serializer1, comparator1, serializer2,
-				comparator2, pairComparator, memManager, ioManager, ownerTask,
-				memoryFraction, useBitmapFilters);
-		
-		reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
-	}
-
-	@Override
-	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
-			TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
-			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
-			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBitmapFilters) throws MemoryAllocationException {
-		
-		final int numPages = memManager.computeNumberOfPages(memoryFraction);
-		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		
-		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
-				buildSideComparator, probeSideComparator, pairComparator,
-				memorySegments, ioManager, useBitmapFilters);
-	}
-
-	/**
-	 * Set new input for probe side
-	 * @throws java.io.IOException
-	 */
-	public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
-		reopenHashTable.reopenProbe(probeInput);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java
new file mode 100644
index 0000000..9ea0b74
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
+ * internally to match the records with equal key. The build side of the hash is the second input of the match.  
+ */
+public class NonReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinIteratorBase implements JoinTaskIterator<V1, V2, O> {
+
+	protected final MutableHashTable<V2, V1> hashJoin;
+
+	protected final TypeSerializer<V1> probeSideSerializer;
+
+	private final MemoryManager memManager;
+
+	private final MutableObjectIterator<V1> firstInput;
+
+	private final MutableObjectIterator<V2> secondInput;
+
+	private final boolean joinWithEmptyBuildSide;
+
+	private volatile boolean running = true;
+
+	// --------------------------------------------------------------------------------------------
+
+	public NonReusingBuildSecondHashJoinIterator(
+			MutableObjectIterator<V1> firstInput,
+			MutableObjectIterator<V2> secondInput,
+			TypeSerializer<V1> serializer1,
+			TypeComparator<V1> comparator1,
+			TypeSerializer<V2> serializer2,
+			TypeComparator<V2> comparator2,
+			TypePairComparator<V1, V2> pairComparator,
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean joinWithEmptyBuildSide,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
+		this.memManager = memManager;
+		this.firstInput = firstInput;
+		this.secondInput = secondInput;
+		this.probeSideSerializer = serializer1;
+
+		if(useBitmapFilters && joinWithEmptyBuildSide) {
+			throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
+		}
+		this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
+		
+		this.hashJoin = getHashJoin(serializer2, comparator2, serializer1,
+				comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void open() throws IOException, MemoryAllocationException, InterruptedException {
+		this.hashJoin.open(this.secondInput, this.firstInput);
+	}
+
+	@Override
+	public void close() {
+		// close the join
+		this.hashJoin.close();
+		
+		// free the memory
+		final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
+		this.memManager.release(segments);
+	}
+
+	@Override
+	public boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
+	throws Exception
+	{
+		if (this.hashJoin.nextRecord())
+		{
+			// we have a next record, get the iterators to the probe and build side values
+			final MutableHashTable.HashBucketIterator<V2, V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
+			V2 nextBuildSideRecord;
+			
+			// get the first build side value
+			if ((nextBuildSideRecord = buildSideIterator.next()) != null) {
+				V2 tmpRec;
+				final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
+				
+				// check if there is another build-side value
+				if ((tmpRec = buildSideIterator.next()) != null) {
+					// more than one build-side value --> copy the probe side
+					V1 probeCopy;
+					probeCopy = this.probeSideSerializer.copy(probeRecord);
+
+					// call match on the first pair
+					matchFunction.join(probeCopy, nextBuildSideRecord, collector);
+					
+					// call match on the second pair
+					probeCopy = this.probeSideSerializer.copy(probeRecord);
+					matchFunction.join(probeCopy, tmpRec, collector);
+					
+					while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) {
+						// call match on the next pair
+						// make sure we restore the value of the probe side record
+						probeCopy = this.probeSideSerializer.copy(probeRecord);
+						matchFunction.join(probeCopy, nextBuildSideRecord, collector);
+					}
+				}
+				else {
+					// only single pair matches
+					matchFunction.join(probeRecord, nextBuildSideRecord, collector);
+				}
+			}
+			else if(joinWithEmptyBuildSide) {
+				// build side is empty, join with null
+				final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
+
+				matchFunction.join(probeRecord, null, collector);
+			}
+			return true;
+		}
+		else {
+			return false;
+		}
+	}
+	
+	@Override
+	public void abort() {
+		this.running = false;
+		this.hashJoin.abort();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
deleted file mode 100644
index 26dba7b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.io.IOException;
-import java.util.List;
-
-
-/**
- * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
- * internally to match the records with equal key. The build side of the hash is the second input of the match.  
- */
-public class NonReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatchIteratorBase implements JoinTaskIterator<V1, V2, O> {
-
-	protected final MutableHashTable<V2, V1> hashJoin;
-
-	protected final TypeSerializer<V1> probeSideSerializer;
-
-	private final MemoryManager memManager;
-
-	private final MutableObjectIterator<V1> firstInput;
-
-	private final MutableObjectIterator<V2> secondInput;
-
-	private volatile boolean running = true;
-
-	// --------------------------------------------------------------------------------------------
-
-	public NonReusingBuildSecondHashMatchIterator(
-			MutableObjectIterator<V1> firstInput,
-			MutableObjectIterator<V2> secondInput,
-			TypeSerializer<V1> serializer1,
-			TypeComparator<V1> comparator1,
-			TypeSerializer<V2> serializer2,
-			TypeComparator<V2> comparator2,
-			TypePairComparator<V1, V2> pairComparator,
-			MemoryManager memManager, IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBitmapFilters) throws MemoryAllocationException {
-		
-		this.memManager = memManager;
-		this.firstInput = firstInput;
-		this.secondInput = secondInput;
-		this.probeSideSerializer = serializer1;
-		
-		this.hashJoin = getHashJoin(serializer2, comparator2, serializer1,
-				comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void open() throws IOException, MemoryAllocationException, InterruptedException {
-		this.hashJoin.open(this.secondInput, this.firstInput);
-	}
-
-	@Override
-	public void close() {
-		// close the join
-		this.hashJoin.close();
-		
-		// free the memory
-		final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
-		this.memManager.release(segments);
-	}
-
-	@Override
-	public boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
-	throws Exception
-	{
-		if (this.hashJoin.nextRecord())
-		{
-			// we have a next record, get the iterators to the probe and build side values
-			final MutableHashTable.HashBucketIterator<V2, V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
-			V2 nextBuildSideRecord;
-			
-			// get the first build side value
-			if ((nextBuildSideRecord = buildSideIterator.next()) != null) {
-				V2 tmpRec;
-				final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
-				
-				// check if there is another build-side value
-				if ((tmpRec = buildSideIterator.next()) != null) {
-					// more than one build-side value --> copy the probe side
-					V1 probeCopy;
-					probeCopy = this.probeSideSerializer.copy(probeRecord);
-
-					// call match on the first pair
-					matchFunction.join(probeCopy, nextBuildSideRecord, collector);
-					
-					// call match on the second pair
-					probeCopy = this.probeSideSerializer.copy(probeRecord);
-					matchFunction.join(probeCopy, tmpRec, collector);
-					
-					while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) {
-						// call match on the next pair
-						// make sure we restore the value of the probe side record
-						probeCopy = this.probeSideSerializer.copy(probeRecord);
-						matchFunction.join(probeCopy, nextBuildSideRecord, collector);
-					}
-				}
-				else {
-					// only single pair matches
-					matchFunction.join(probeRecord, nextBuildSideRecord, collector);
-				}
-			}
-			return true;
-		}
-		else {
-			return false;
-		}
-	}
-	
-	@Override
-	public void abort() {
-		this.running = false;
-		this.hashJoin.abort();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java
new file mode 100644
index 0000000..c9c9165
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+
+public class NonReusingBuildSecondReOpenableHashJoinIterator<V1, V2, O> extends NonReusingBuildSecondHashJoinIterator<V1, V2, O> {
+
+
+	private final ReOpenableMutableHashTable<V2, V1> reopenHashTable;
+
+	public NonReusingBuildSecondReOpenableHashJoinIterator(
+			MutableObjectIterator<V1> firstInput,
+			MutableObjectIterator<V2> secondInput,
+			TypeSerializer<V1> serializer1,
+			TypeComparator<V1> comparator1,
+			TypeSerializer<V2> serializer2,
+			TypeComparator<V2> comparator2,
+			TypePairComparator<V1, V2> pairComparator,
+			MemoryManager memManager,
+			IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean joinWithEmptyBuildSide,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
+		super(firstInput, secondInput, serializer1, comparator1, serializer2,
+				comparator2, pairComparator, memManager, ioManager, ownerTask,
+				memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
+		
+		reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
+	}
+
+	@Override
+	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+			TypeSerializer<BT> buildSideSerializer,
+			TypeComparator<BT> buildSideComparator,
+			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
+			TypePairComparator<PT, BT> pairComparator,
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
+		final int numPages = memManager.computeNumberOfPages(memoryFraction);
+		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
+		
+		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager, useBitmapFilters);
+	}
+
+	/**
+	 * Set new input for probe side
+	 * @throws java.io.IOException
+	 */
+	public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
+		reopenHashTable.reopenProbe(probeInput);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
deleted file mode 100644
index 92b0fff..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.io.IOException;
-import java.util.List;
-
-public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends NonReusingBuildSecondHashMatchIterator<V1, V2, O> {
-
-
-	private final ReOpenableMutableHashTable<V2, V1> reopenHashTable;
-
-	public NonReusingBuildSecondReOpenableHashMatchIterator(
-			MutableObjectIterator<V1> firstInput,
-			MutableObjectIterator<V2> secondInput,
-			TypeSerializer<V1> serializer1,
-			TypeComparator<V1> comparator1,
-			TypeSerializer<V2> serializer2,
-			TypeComparator<V2> comparator2,
-			TypePairComparator<V1, V2> pairComparator,
-			MemoryManager memManager,
-			IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBitmapFilters) throws MemoryAllocationException {
-		
-		super(firstInput, secondInput, serializer1, comparator1, serializer2,
-				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
-		
-		reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
-	}
-
-	@Override
-	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
-			TypeSerializer<BT> buildSideSerializer,
-			TypeComparator<BT> buildSideComparator,
-			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
-			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBitmapFilters) throws MemoryAllocationException {
-		
-		final int numPages = memManager.computeNumberOfPages(memoryFraction);
-		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		
-		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
-				buildSideComparator, probeSideComparator, pairComparator,
-				memorySegments, ioManager, useBitmapFilters);
-	}
-
-	/**
-	 * Set new input for probe side
-	 * @throws java.io.IOException
-	 */
-	public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
-		reopenHashTable.reopenProbe(probeInput);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java
new file mode 100644
index 0000000..c1e601d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
+ * internally to match the records with equal key. The build side of the hash is the first input of the match.
+ */
+public class ReusingBuildFirstHashJoinIterator<V1, V2, O> extends HashJoinIteratorBase implements JoinTaskIterator<V1, V2, O> {
+	
+	protected final MutableHashTable<V1, V2> hashJoin;
+	
+	private final V1 nextBuildSideObject;
+
+	private final V1 tempBuildSideRecord;
+
+	protected final TypeSerializer<V2> probeSideSerializer;
+	
+	private final MemoryManager memManager;
+	
+	private final MutableObjectIterator<V1> firstInput;
+	
+	private final MutableObjectIterator<V2> secondInput;
+
+	private final boolean joinWithEmptyBuildSide;
+	
+	private volatile boolean running = true;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public ReusingBuildFirstHashJoinIterator(
+			MutableObjectIterator<V1> firstInput,
+			MutableObjectIterator<V2> secondInput,
+			TypeSerializer<V1> serializer1,
+			TypeComparator<V1> comparator1,
+			TypeSerializer<V2> serializer2,
+			TypeComparator<V2> comparator2,
+			TypePairComparator<V2, V1> pairComparator,
+			MemoryManager memManager,
+			IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean joinWithEmptyBuildSide,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
+		this.memManager = memManager;
+		this.firstInput = firstInput;
+		this.secondInput = secondInput;
+		this.probeSideSerializer = serializer2;
+
+		if(useBitmapFilters && joinWithEmptyBuildSide) {
+			throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
+		}
+		this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
+		
+		this.nextBuildSideObject = serializer1.createInstance();
+		this.tempBuildSideRecord = serializer1.createInstance();
+
+		this.hashJoin = getHashJoin(serializer1, comparator1, serializer2,
+				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void open() throws IOException, MemoryAllocationException, InterruptedException {
+		this.hashJoin.open(this.firstInput, this.secondInput);
+	}
+	
+
+	@Override
+	public void close() {
+		// close the join
+		this.hashJoin.close();
+		
+		// free the memory
+		final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
+		this.memManager.release(segments);
+	}
+
+	@Override
+	public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
+	throws Exception
+	{
+		if (this.hashJoin.nextRecord())
+		{
+			// we have a next record, get the iterators to the probe and build side values
+			final MutableHashTable.HashBucketIterator<V1, V2> buildSideIterator = this.hashJoin.getBuildSideIterator();
+			V1 nextBuildSideRecord = this.nextBuildSideObject;
+			
+			// get the first build side value
+			if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
+				V1 tmpRec = this.tempBuildSideRecord;
+				final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
+				
+				// check if there is another build-side value
+				if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
+
+					// call match on the first pair
+					matchFunction.join(nextBuildSideRecord, probeRecord, collector);
+					
+					// call match on the second pair
+					matchFunction.join(tmpRec, probeRecord, collector);
+					
+					while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
+						// call match on the next pair
+						// make sure we restore the value of the probe side record
+						matchFunction.join(nextBuildSideRecord, probeRecord, collector);
+					}
+				}
+				else {
+					// only single pair matches
+					matchFunction.join(nextBuildSideRecord, probeRecord, collector);
+				}
+			}
+			else if(joinWithEmptyBuildSide) {
+				// build side is empty, join with null
+				final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
+
+				matchFunction.join(null, probeRecord, collector);
+			}
+			return true;
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public void abort() {
+		this.running = false;
+		this.hashJoin.abort();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
deleted file mode 100644
index 65dfd89..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-/**
- * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
- * internally to match the records with equal key. The build side of the hash is the first input of the match.
- */
-public class ReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchIteratorBase implements JoinTaskIterator<V1, V2, O> {
-	
-	protected final MutableHashTable<V1, V2> hashJoin;
-	
-	private final V1 nextBuildSideObject;
-
-	private final V1 tempBuildSideRecord;
-
-	protected final TypeSerializer<V2> probeSideSerializer;
-	
-	private final MemoryManager memManager;
-	
-	private final MutableObjectIterator<V1> firstInput;
-	
-	private final MutableObjectIterator<V2> secondInput;
-	
-	private volatile boolean running = true;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public ReusingBuildFirstHashMatchIterator(
-			MutableObjectIterator<V1> firstInput,
-			MutableObjectIterator<V2> secondInput,
-			TypeSerializer<V1> serializer1,
-			TypeComparator<V1> comparator1,
-			TypeSerializer<V2> serializer2,
-			TypeComparator<V2> comparator2,
-			TypePairComparator<V2, V1> pairComparator,
-			MemoryManager memManager,
-			IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBitmapFilters) throws MemoryAllocationException {
-		
-		this.memManager = memManager;
-		this.firstInput = firstInput;
-		this.secondInput = secondInput;
-		this.probeSideSerializer = serializer2;
-		
-		this.nextBuildSideObject = serializer1.createInstance();
-		this.tempBuildSideRecord = serializer1.createInstance();
-
-		this.hashJoin = getHashJoin(serializer1, comparator1, serializer2,
-				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void open() throws IOException, MemoryAllocationException, InterruptedException {
-		this.hashJoin.open(this.firstInput, this.secondInput);
-	}
-	
-
-	@Override
-	public void close() {
-		// close the join
-		this.hashJoin.close();
-		
-		// free the memory
-		final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
-		this.memManager.release(segments);
-	}
-
-	@Override
-	public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
-	throws Exception
-	{
-		if (this.hashJoin.nextRecord())
-		{
-			// we have a next record, get the iterators to the probe and build side values
-			final MutableHashTable.HashBucketIterator<V1, V2> buildSideIterator = this.hashJoin.getBuildSideIterator();
-			V1 nextBuildSideRecord = this.nextBuildSideObject;
-			
-			// get the first build side value
-			if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
-				V1 tmpRec = this.tempBuildSideRecord;
-				final V2 probeRecord = this.hashJoin.getCurrentProbeRecord();
-				
-				// check if there is another build-side value
-				if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
-
-					// call match on the first pair
-					matchFunction.join(nextBuildSideRecord, probeRecord, collector);
-					
-					// call match on the second pair
-					matchFunction.join(tmpRec, probeRecord, collector);
-					
-					while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
-						// call match on the next pair
-						// make sure we restore the value of the probe side record
-						matchFunction.join(nextBuildSideRecord, probeRecord, collector);
-					}
-				}
-				else {
-					// only single pair matches
-					matchFunction.join(nextBuildSideRecord, probeRecord, collector);
-				}
-			}
-			return true;
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public void abort() {
-		this.running = false;
-		this.hashJoin.abort();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java
new file mode 100644
index 0000000..1cc3f91
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class ReusingBuildFirstReOpenableHashJoinIterator<V1, V2, O> extends ReusingBuildFirstHashJoinIterator<V1, V2, O> {
+
+	
+	private final ReOpenableMutableHashTable<V1, V2> reopenHashTable;
+	
+	public ReusingBuildFirstReOpenableHashJoinIterator(
+			MutableObjectIterator<V1> firstInput,
+			MutableObjectIterator<V2> secondInput,
+			TypeSerializer<V1> serializer1,
+			TypeComparator<V1> comparator1,
+			TypeSerializer<V2> serializer2,
+			TypeComparator<V2> comparator2,
+			TypePairComparator<V2, V1> pairComparator,
+			MemoryManager memManager,
+			IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean joinWithEmptyBuildSide,
+			boolean useBitmapFilters)
+		throws MemoryAllocationException
+	{
+		super(firstInput, secondInput, serializer1, comparator1, serializer2,
+				comparator2, pairComparator, memManager, ioManager, ownerTask,
+				memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
+		reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
+	}
+
+	@Override
+	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+			TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
+			TypePairComparator<PT, BT> pairComparator,
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
+		final int numPages = memManager.computeNumberOfPages(memoryFraction);
+		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
+		
+		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager, useBitmapFilters);
+	}
+	
+	/**
+	 * Set new input for probe side
+	 * @throws IOException 
+	 */
+	public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
+		reopenHashTable.reopenProbe(probeInput);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
deleted file mode 100644
index 5635865..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class ReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends ReusingBuildFirstHashMatchIterator<V1, V2, O> {
-
-	
-	private final ReOpenableMutableHashTable<V1, V2> reopenHashTable;
-	
-	public ReusingBuildFirstReOpenableHashMatchIterator(
-			MutableObjectIterator<V1> firstInput,
-			MutableObjectIterator<V2> secondInput,
-			TypeSerializer<V1> serializer1,
-			TypeComparator<V1> comparator1,
-			TypeSerializer<V2> serializer2,
-			TypeComparator<V2> comparator2,
-			TypePairComparator<V2, V1> pairComparator,
-			MemoryManager memManager,
-			IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBitmapFilters)
-		throws MemoryAllocationException
-	{
-		super(firstInput, secondInput, serializer1, comparator1, serializer2,
-				comparator2, pairComparator, memManager, ioManager, ownerTask,
-				memoryFraction, useBitmapFilters);
-		reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
-	}
-
-	@Override
-	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
-			TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
-			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
-			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBitmapFilters) throws MemoryAllocationException {
-		
-		final int numPages = memManager.computeNumberOfPages(memoryFraction);
-		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		
-		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
-				buildSideComparator, probeSideComparator, pairComparator,
-				memorySegments, ioManager, useBitmapFilters);
-	}
-	
-	/**
-	 * Set new input for probe side
-	 * @throws IOException 
-	 */
-	public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
-		reopenHashTable.reopenProbe(probeInput);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java
new file mode 100644
index 0000000..4402665
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+
+/**
+ * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
+ * internally to match the records with equal key. The build side of the hash is the second input of the match.  
+ */
+public class ReusingBuildSecondHashJoinIterator<V1, V2, O> extends HashJoinIteratorBase implements JoinTaskIterator<V1, V2, O> {
+	
+	protected final MutableHashTable<V2, V1> hashJoin;
+	
+	private final V2 nextBuildSideObject;
+	
+	private final V2 tempBuildSideRecord;
+	
+	protected final TypeSerializer<V1> probeSideSerializer;
+	
+	private final MemoryManager memManager;
+	
+	private final MutableObjectIterator<V1> firstInput;
+	
+	private final MutableObjectIterator<V2> secondInput;
+
+	private final boolean joinWithEmptyBuildSide;
+	
+	private volatile boolean running = true;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public ReusingBuildSecondHashJoinIterator(
+			MutableObjectIterator<V1> firstInput,
+			MutableObjectIterator<V2> secondInput,
+			TypeSerializer<V1> serializer1,
+			TypeComparator<V1> comparator1,
+			TypeSerializer<V2> serializer2,
+			TypeComparator<V2> comparator2,
+			TypePairComparator<V1, V2> pairComparator,
+			MemoryManager memManager,
+			IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean joinWithEmptyBuildSide,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
+		this.memManager = memManager;
+		this.firstInput = firstInput;
+		this.secondInput = secondInput;
+		this.probeSideSerializer = serializer1;
+
+		if(useBitmapFilters && joinWithEmptyBuildSide) {
+			throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side");
+		}
+		this.joinWithEmptyBuildSide = joinWithEmptyBuildSide;
+		
+		this.nextBuildSideObject = serializer2.createInstance();
+		this.tempBuildSideRecord = serializer2.createInstance();
+
+		this.hashJoin = getHashJoin(serializer2, comparator2, serializer1, comparator1, pairComparator,
+			memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void open() throws IOException, MemoryAllocationException, InterruptedException {
+		this.hashJoin.open(this.secondInput, this.firstInput);
+	}
+
+	@Override
+	public void close() {
+		// close the join
+		this.hashJoin.close();
+		
+		// free the memory
+		final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
+		this.memManager.release(segments);
+	}
+
+	@Override
+	public boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
+	throws Exception
+	{
+		if (this.hashJoin.nextRecord())
+		{
+			// we have a next record, get the iterators to the probe and build side values
+			final MutableHashTable.HashBucketIterator<V2, V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
+			V2 nextBuildSideRecord = this.nextBuildSideObject;
+			
+			// get the first build side value
+			if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
+				V2 tmpRec = this.tempBuildSideRecord;
+				final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
+				
+				// check if there is another build-side value
+				if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
+					// call match on the first pair
+					matchFunction.join(probeRecord, nextBuildSideRecord, collector);
+					
+					// call match on the second pair
+					matchFunction.join(probeRecord, tmpRec, collector);
+					
+					while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
+						// call match on the next pair
+						// make sure we restore the value of the probe side record
+						matchFunction.join(probeRecord, nextBuildSideRecord, collector);
+					}
+				}
+				else {
+					// only single pair matches
+					matchFunction.join(probeRecord, nextBuildSideRecord, collector);
+				}
+			}
+			else if(joinWithEmptyBuildSide) {
+				// build side is empty, join with null
+				final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
+
+				matchFunction.join(probeRecord, null, collector);
+			}
+			return true;
+		}
+		else {
+			return false;
+		}
+	}
+	
+	@Override
+	public void abort() {
+		this.running = false;
+		this.hashJoin.abort();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
deleted file mode 100644
index 156f259..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-/**
- * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that uses a hybrid-hash-join
- * internally to match the records with equal key. The build side of the hash is the second input of the match.  
- */
-public class ReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatchIteratorBase implements JoinTaskIterator<V1, V2, O> {
-	
-	protected final MutableHashTable<V2, V1> hashJoin;
-	
-	private final V2 nextBuildSideObject;
-	
-	private final V2 tempBuildSideRecord;
-	
-	protected final TypeSerializer<V1> probeSideSerializer;
-	
-	private final MemoryManager memManager;
-	
-	private final MutableObjectIterator<V1> firstInput;
-	
-	private final MutableObjectIterator<V2> secondInput;
-	
-	private volatile boolean running = true;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public ReusingBuildSecondHashMatchIterator(
-			MutableObjectIterator<V1> firstInput,
-			MutableObjectIterator<V2> secondInput,
-			TypeSerializer<V1> serializer1,
-			TypeComparator<V1> comparator1,
-			TypeSerializer<V2> serializer2,
-			TypeComparator<V2> comparator2,
-			TypePairComparator<V1, V2> pairComparator,
-			MemoryManager memManager,
-			IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBitmapFilters) throws MemoryAllocationException {
-		
-		this.memManager = memManager;
-		this.firstInput = firstInput;
-		this.secondInput = secondInput;
-		this.probeSideSerializer = serializer1;
-		
-		this.nextBuildSideObject = serializer2.createInstance();
-		this.tempBuildSideRecord = serializer2.createInstance();
-
-		this.hashJoin = getHashJoin(serializer2, comparator2, serializer1, comparator1, pairComparator,
-			memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void open() throws IOException, MemoryAllocationException, InterruptedException {
-		this.hashJoin.open(this.secondInput, this.firstInput);
-	}
-
-	@Override
-	public void close() {
-		// close the join
-		this.hashJoin.close();
-		
-		// free the memory
-		final List<MemorySegment> segments = this.hashJoin.getFreedMemory();
-		this.memManager.release(segments);
-	}
-
-	@Override
-	public boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector)
-	throws Exception
-	{
-		if (this.hashJoin.nextRecord())
-		{
-			// we have a next record, get the iterators to the probe and build side values
-			final MutableHashTable.HashBucketIterator<V2, V1> buildSideIterator = this.hashJoin.getBuildSideIterator();
-			V2 nextBuildSideRecord = this.nextBuildSideObject;
-			
-			// get the first build side value
-			if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) {
-				V2 tmpRec = this.tempBuildSideRecord;
-				final V1 probeRecord = this.hashJoin.getCurrentProbeRecord();
-				
-				// check if there is another build-side value
-				if ((tmpRec = buildSideIterator.next(tmpRec)) != null) {
-					// call match on the first pair
-					matchFunction.join(probeRecord, nextBuildSideRecord, collector);
-					
-					// call match on the second pair
-					matchFunction.join(probeRecord, tmpRec, collector);
-					
-					while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) {
-						// call match on the next pair
-						// make sure we restore the value of the probe side record
-						matchFunction.join(probeRecord, nextBuildSideRecord, collector);
-					}
-				}
-				else {
-					// only single pair matches
-					matchFunction.join(probeRecord, nextBuildSideRecord, collector);
-				}
-			}
-			return true;
-		}
-		else {
-			return false;
-		}
-	}
-	
-	@Override
-	public void abort() {
-		this.running = false;
-		this.hashJoin.abort();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java
new file mode 100644
index 0000000..190398f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class ReusingBuildSecondReOpenableHashJoinIterator<V1, V2, O> extends ReusingBuildSecondHashJoinIterator<V1, V2, O> {
+
+	
+	private final ReOpenableMutableHashTable<V2, V1> reopenHashTable;
+	
+	public ReusingBuildSecondReOpenableHashJoinIterator(
+			MutableObjectIterator<V1> firstInput,
+			MutableObjectIterator<V2> secondInput,
+			TypeSerializer<V1> serializer1,
+			TypeComparator<V1> comparator1,
+			TypeSerializer<V2> serializer2,
+			TypeComparator<V2> comparator2,
+			TypePairComparator<V1, V2> pairComparator,
+			MemoryManager memManager,
+			IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean joinWithEmptyBuildSide,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
+		super(firstInput, secondInput, serializer1, comparator1, serializer2,
+				comparator2, pairComparator, memManager, ioManager, ownerTask,
+				memoryFraction, joinWithEmptyBuildSide, useBitmapFilters);
+		
+		reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
+	}
+
+	@Override
+	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+			TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
+			TypePairComparator<PT, BT> pairComparator,
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
+		final int numPages = memManager.computeNumberOfPages(memoryFraction);
+		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
+		
+		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager, useBitmapFilters);
+	}
+	
+	/**
+	 * Set new input for probe side
+	 * @throws IOException 
+	 */
+	public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
+		reopenHashTable.reopenProbe(probeInput);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
deleted file mode 100644
index a0791fe..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class ReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends ReusingBuildSecondHashMatchIterator<V1, V2, O> {
-
-	
-	private final ReOpenableMutableHashTable<V2, V1> reopenHashTable;
-	
-	public ReusingBuildSecondReOpenableHashMatchIterator(
-			MutableObjectIterator<V1> firstInput,
-			MutableObjectIterator<V2> secondInput,
-			TypeSerializer<V1> serializer1,
-			TypeComparator<V1> comparator1,
-			TypeSerializer<V2> serializer2,
-			TypeComparator<V2> comparator2,
-			TypePairComparator<V1, V2> pairComparator,
-			MemoryManager memManager,
-			IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBitmapFilters) throws MemoryAllocationException {
-		
-		super(firstInput, secondInput, serializer1, comparator1, serializer2,
-				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
-		
-		reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
-	}
-
-	@Override
-	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
-			TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
-			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
-			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager,
-			AbstractInvokable ownerTask,
-			double memoryFraction,
-			boolean useBitmapFilters) throws MemoryAllocationException {
-		
-		final int numPages = memManager.computeNumberOfPages(memoryFraction);
-		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		
-		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
-				buildSideComparator, probeSideComparator, pairComparator,
-				memorySegments, ioManager, useBitmapFilters);
-	}
-	
-	/**
-	 * Set new input for probe side
-	 * @throws IOException 
-	 */
-	public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
-		reopenHashTable.reopenProbe(probeInput);
-	}
-}