You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/06 18:52:15 UTC

[1/5] flink git commit: [FLINK-2492] [runtime] Rename former 'match' classes to 'join' to reflect consistent naming scheme.

Repository: flink
Updated Branches:
  refs/heads/master 5a788ec23 -> 0b73b4387


[FLINK-2492] [runtime] Rename former 'match' classes to 'join' to reflect consistent naming scheme.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/685086a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/685086a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/685086a3

Branch: refs/heads/master
Commit: 685086a3dd9afcec2eec76485298bc7b3f031a3c
Parents: a5b84b2
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 6 15:18:14 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 6 16:56:36 2015 +0200

----------------------------------------------------------------------
 .../plantranslate/JobGraphGenerator.java        |   4 +-
 .../AbstractCachedBuildSideJoinDriver.java      | 204 +++++++++++++++++++
 .../AbstractCachedBuildSideMatchDriver.java     | 204 -------------------
 .../operators/BuildFirstCachedJoinDriver.java   |  27 +++
 .../operators/BuildFirstCachedMatchDriver.java  |  27 ---
 .../operators/BuildSecondCachedJoinDriver.java  |  27 +++
 .../operators/BuildSecondCachedMatchDriver.java |  27 ---
 .../flink/runtime/operators/DriverStrategy.java |  10 +-
 .../flink/runtime/operators/JoinDriver.java     | 185 +++++++++++++++++
 .../flink/runtime/operators/MatchDriver.java    | 191 -----------------
 .../runtime/operators/CachedMatchTaskTest.java  |  22 +-
 .../operators/MatchTaskExternalITCase.java      |   6 +-
 .../flink/runtime/operators/MatchTaskTest.java  |  46 ++---
 .../ConnectedComponentsNepheleITCase.java       |   4 +-
 .../CustomCompensatableDanglingPageRank.java    |   4 +-
 ...mpensatableDanglingPageRankWithCombiner.java |   4 +-
 .../CompensatableDanglingPageRank.java          |   4 +-
 17 files changed, 495 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 6fd2796..d440063 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -70,7 +70,7 @@ import org.apache.flink.runtime.operators.DataSourceTask;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver;
 import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver;
-import org.apache.flink.runtime.operators.MatchDriver;
+import org.apache.flink.runtime.operators.JoinDriver;
 import org.apache.flink.runtime.operators.NoOpDriver;
 import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
@@ -336,7 +336,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 					}
 					
 					// adjust the driver
-					if (conf.getDriver().equals(MatchDriver.class)) {
+					if (conf.getDriver().equals(JoinDriver.class)) {
 						conf.setDriver(inputNum == 0 ? JoinWithSolutionSetFirstDriver.class : JoinWithSolutionSetSecondDriver.class);
 					}
 					else if (conf.getDriver().equals(CoGroupDriver.class)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
new file mode 100644
index 0000000..aff8d01
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashMatchIterator;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends JoinDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+
+	private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;
+	
+	private final int buildSideIndex;
+	
+	private final int probeSideIndex;
+
+	private boolean objectReuseEnabled = false;
+
+
+	protected AbstractCachedBuildSideJoinDriver(int buildSideIndex, int probeSideIndex) {
+		this.buildSideIndex = buildSideIndex;
+		this.probeSideIndex = probeSideIndex;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean isInputResettable(int inputNum) {
+		if (inputNum < 0 || inputNum > 1) {
+			throw new IndexOutOfBoundsException();
+		}
+		return inputNum == buildSideIndex;
+	}
+
+	@Override
+	public void initialize() throws Exception {
+		TaskConfig config = this.taskContext.getTaskConfig();
+		
+		TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
+		TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
+		TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
+		TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
+		MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
+		MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);
+
+		TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = 
+				this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
+
+		double availableMemory = config.getRelativeMemoryDriver();
+
+		ExecutionConfig executionConfig = taskContext.getExecutionConfig();
+		objectReuseEnabled = executionConfig.isObjectReuseEnabled();
+
+		if (objectReuseEnabled) {
+			if (buildSideIndex == 0 && probeSideIndex == 1) {
+
+				matchIterator = new ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
+						input1, input2,
+						serializer1, comparator1,
+						serializer2, comparator2,
+						pairComparatorFactory.createComparator21(comparator1, comparator2),
+						this.taskContext.getMemoryManager(),
+						this.taskContext.getIOManager(),
+						this.taskContext.getOwningNepheleTask(),
+						availableMemory);
+
+
+			} else if (buildSideIndex == 1 && probeSideIndex == 0) {
+
+				matchIterator = new ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
+						input1, input2,
+						serializer1, comparator1,
+						serializer2, comparator2,
+						pairComparatorFactory.createComparator12(comparator1, comparator2),
+						this.taskContext.getMemoryManager(),
+						this.taskContext.getIOManager(),
+						this.taskContext.getOwningNepheleTask(),
+						availableMemory);
+
+			} else {
+				throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
+			}
+		} else {
+			if (buildSideIndex == 0 && probeSideIndex == 1) {
+
+				matchIterator = new NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
+						input1, input2,
+						serializer1, comparator1,
+						serializer2, comparator2,
+						pairComparatorFactory.createComparator21(comparator1, comparator2),
+						this.taskContext.getMemoryManager(),
+						this.taskContext.getIOManager(),
+						this.taskContext.getOwningNepheleTask(),
+						availableMemory);
+
+
+			} else if (buildSideIndex == 1 && probeSideIndex == 0) {
+
+				matchIterator = new NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
+						input1, input2,
+						serializer1, comparator1,
+						serializer2, comparator2,
+						pairComparatorFactory.createComparator12(comparator1, comparator2),
+						this.taskContext.getMemoryManager(),
+						this.taskContext.getIOManager(),
+						this.taskContext.getOwningNepheleTask(),
+						availableMemory);
+
+			} else {
+				throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
+			}
+		}
+		
+		this.matchIterator.open();
+	}
+
+	@Override
+	public void prepare() throws Exception {
+		// nothing
+	}
+
+	@Override
+	public void run() throws Exception {
+
+		final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
+		final Collector<OT> collector = this.taskContext.getOutputCollector();
+		
+		while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));
+			
+	}
+
+	@Override
+	public void cleanup() throws Exception {}
+	
+	@Override
+	public void reset() throws Exception {
+		
+		MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
+		MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);
+
+		if (objectReuseEnabled) {
+			if (buildSideIndex == 0 && probeSideIndex == 1) {
+				final ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+
+				matchIterator.reopenProbe(input2);
+			} else {
+				final ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+				matchIterator.reopenProbe(input1);
+			}
+		} else {
+			if (buildSideIndex == 0 && probeSideIndex == 1) {
+				final NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+
+				matchIterator.reopenProbe(input2);
+			} else {
+				final NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
+				matchIterator.reopenProbe(input1);
+			}
+		}
+	}
+
+	@Override
+	public void teardown() {
+		this.running = false;
+		if (this.matchIterator != null) {
+			this.matchIterator.close();
+		}
+	}
+
+	@Override
+	public void cancel() {
+		this.running = false;
+		if (this.matchIterator != null) {
+			this.matchIterator.abort();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
deleted file mode 100644
index f3b2dfd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashMatchIterator;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends MatchDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
-
-	private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;
-	
-	private final int buildSideIndex;
-	
-	private final int probeSideIndex;
-
-	private boolean objectReuseEnabled = false;
-
-
-	protected AbstractCachedBuildSideMatchDriver(int buildSideIndex, int probeSideIndex) {
-		this.buildSideIndex = buildSideIndex;
-		this.probeSideIndex = probeSideIndex;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean isInputResettable(int inputNum) {
-		if (inputNum < 0 || inputNum > 1) {
-			throw new IndexOutOfBoundsException();
-		}
-		return inputNum == buildSideIndex;
-	}
-
-	@Override
-	public void initialize() throws Exception {
-		TaskConfig config = this.taskContext.getTaskConfig();
-		
-		TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
-		TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
-		TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
-		TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
-		MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
-		MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);
-
-		TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = 
-				this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
-
-		double availableMemory = config.getRelativeMemoryDriver();
-
-		ExecutionConfig executionConfig = taskContext.getExecutionConfig();
-		objectReuseEnabled = executionConfig.isObjectReuseEnabled();
-
-		if (objectReuseEnabled) {
-			if (buildSideIndex == 0 && probeSideIndex == 1) {
-
-				matchIterator = new ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
-						input1, input2,
-						serializer1, comparator1,
-						serializer2, comparator2,
-						pairComparatorFactory.createComparator21(comparator1, comparator2),
-						this.taskContext.getMemoryManager(),
-						this.taskContext.getIOManager(),
-						this.taskContext.getOwningNepheleTask(),
-						availableMemory);
-
-
-			} else if (buildSideIndex == 1 && probeSideIndex == 0) {
-
-				matchIterator = new ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
-						input1, input2,
-						serializer1, comparator1,
-						serializer2, comparator2,
-						pairComparatorFactory.createComparator12(comparator1, comparator2),
-						this.taskContext.getMemoryManager(),
-						this.taskContext.getIOManager(),
-						this.taskContext.getOwningNepheleTask(),
-						availableMemory);
-
-			} else {
-				throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
-			}
-		} else {
-			if (buildSideIndex == 0 && probeSideIndex == 1) {
-
-				matchIterator = new NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(
-						input1, input2,
-						serializer1, comparator1,
-						serializer2, comparator2,
-						pairComparatorFactory.createComparator21(comparator1, comparator2),
-						this.taskContext.getMemoryManager(),
-						this.taskContext.getIOManager(),
-						this.taskContext.getOwningNepheleTask(),
-						availableMemory);
-
-
-			} else if (buildSideIndex == 1 && probeSideIndex == 0) {
-
-				matchIterator = new NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(
-						input1, input2,
-						serializer1, comparator1,
-						serializer2, comparator2,
-						pairComparatorFactory.createComparator12(comparator1, comparator2),
-						this.taskContext.getMemoryManager(),
-						this.taskContext.getIOManager(),
-						this.taskContext.getOwningNepheleTask(),
-						availableMemory);
-
-			} else {
-				throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
-			}
-		}
-		
-		this.matchIterator.open();
-	}
-
-	@Override
-	public void prepare() throws Exception {
-		// nothing
-	}
-
-	@Override
-	public void run() throws Exception {
-
-		final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
-		final Collector<OT> collector = this.taskContext.getOutputCollector();
-		
-		while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));
-			
-	}
-
-	@Override
-	public void cleanup() throws Exception {}
-	
-	@Override
-	public void reset() throws Exception {
-		
-		MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
-		MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);
-
-		if (objectReuseEnabled) {
-			if (buildSideIndex == 0 && probeSideIndex == 1) {
-				final ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
-
-				matchIterator.reopenProbe(input2);
-			} else {
-				final ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
-				matchIterator.reopenProbe(input1);
-			}
-		} else {
-			if (buildSideIndex == 0 && probeSideIndex == 1) {
-				final NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
-
-				matchIterator.reopenProbe(input2);
-			} else {
-				final NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator;
-				matchIterator.reopenProbe(input1);
-			}
-		}
-	}
-
-	@Override
-	public void teardown() {
-		this.running = false;
-		if (this.matchIterator != null) {
-			this.matchIterator.close();
-		}
-	}
-
-	@Override
-	public void cancel() {
-		this.running = false;
-		if (this.matchIterator != null) {
-			this.matchIterator.abort();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedJoinDriver.java
new file mode 100644
index 0000000..6da221f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedJoinDriver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+public class BuildFirstCachedJoinDriver<IT1, IT2, OT> extends AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> {
+
+	public BuildFirstCachedJoinDriver() {
+		super(0, 1);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedMatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedMatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedMatchDriver.java
deleted file mode 100644
index c141767..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildFirstCachedMatchDriver.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-public class BuildFirstCachedMatchDriver<IT1, IT2, OT> extends AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> {
-
-	public BuildFirstCachedMatchDriver() {
-		super(0, 1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedJoinDriver.java
new file mode 100644
index 0000000..44824c5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedJoinDriver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+public class BuildSecondCachedJoinDriver<IT1, IT2, OT> extends AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> {
+
+	public BuildSecondCachedJoinDriver() {
+		super(1, 0);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedMatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedMatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedMatchDriver.java
deleted file mode 100644
index eed03ab..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BuildSecondCachedMatchDriver.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-public class BuildSecondCachedMatchDriver<IT1, IT2, OT> extends AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> {
-
-	public BuildSecondCachedMatchDriver() {
-		super(1, 0);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 4a0035c..3aadf2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -72,7 +72,7 @@ public enum DriverStrategy {
 	ALL_GROUP_COMBINE(AllGroupCombineDriver.class, null, PIPELINED, 0),
 
 	// both inputs are merged, but materialized to the side for block-nested-loop-join among values with equal key
-	MERGE(MatchDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
+	MERGE(JoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
 
 	// co-grouping inputs
 	CO_GROUP(CoGroupDriver.class, null, PIPELINED, PIPELINED, 2),
@@ -81,13 +81,13 @@ public enum DriverStrategy {
 	
 	
 	// the first input is build side, the second side is probe side of a hybrid hash table
-	HYBRIDHASH_BUILD_FIRST(MatchDriver.class, null, FULL_DAM, MATERIALIZING, 2),
+	HYBRIDHASH_BUILD_FIRST(JoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
 	// the second input is build side, the first side is probe side of a hybrid hash table
-	HYBRIDHASH_BUILD_SECOND(MatchDriver.class, null, MATERIALIZING, FULL_DAM, 2),
+	HYBRIDHASH_BUILD_SECOND(JoinDriver.class, null, MATERIALIZING, FULL_DAM, 2),
 	// a cached variant of HYBRIDHASH_BUILD_FIRST, that can only be used inside of iterations
-	HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedMatchDriver.class, null, FULL_DAM, MATERIALIZING, 2),
+	HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2),
 	//  cached variant of HYBRIDHASH_BUILD_SECOND, that can only be used inside of iterations
-	HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedMatchDriver.class, null, MATERIALIZING, FULL_DAM, 2),
+	HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2),
 	
 	// the second input is inner loop, the first input is outer loop and block-wise processed
 	NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, 0),

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
new file mode 100644
index 0000000..af3da55
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
+import org.apache.flink.runtime.operators.util.JoinTaskIterator;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * The join driver implements the logic of a join operator at runtime. It instantiates either
+ * hash or sort-merge based strategies to find joining pairs of records.
+ * 
+ * @see org.apache.flink.api.common.functions.FlatJoinFunction
+ */
+public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+	
+	protected static final Logger LOG = LoggerFactory.getLogger(JoinDriver.class);
+	
+	protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
+	
+	private volatile JoinTaskIterator<IT1, IT2, OT> joinIterator; // the iterator that does the actual join 
+	
+	protected volatile boolean running;
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
+		this.taskContext = context;
+		this.running = true;
+	}
+
+	@Override
+	public int getNumberOfInputs() {
+		return 2;
+	}
+
+	@Override
+	public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
+		@SuppressWarnings("unchecked")
+		final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
+		return clazz;
+	}
+	
+	@Override
+	public int getNumberOfDriverComparators() {
+		return 2;
+	}
+
+	@Override
+	public void prepare() throws Exception{
+		final TaskConfig config = this.taskContext.getTaskConfig();
+		
+		// obtain task manager's memory manager and I/O manager
+		final MemoryManager memoryManager = this.taskContext.getMemoryManager();
+		final IOManager ioManager = this.taskContext.getIOManager();
+		
+		// set up memory and I/O parameters
+		final double fractionAvailableMemory = config.getRelativeMemoryDriver();
+		final int numPages = memoryManager.computeNumberOfPages(fractionAvailableMemory);
+		
+		// test minimum memory requirements
+		final DriverStrategy ls = config.getDriverStrategy();
+		
+		final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0);
+		final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1);
+
+		// get the key positions and types
+		final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
+		final TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
+		final TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
+		final TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
+		
+		final TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = config.getPairComparatorFactory(
+				this.taskContext.getUserCodeClassLoader());
+		if (pairComparatorFactory == null) {
+			throw new Exception("Missing pair comparator factory for join driver");
+		}
+
+		ExecutionConfig executionConfig = taskContext.getExecutionConfig();
+		boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Join Driver object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+		}
+
+		// create and return joining iterator according to provided local strategy.
+		if (objectReuseEnabled) {
+			switch (ls) {
+				case MERGE:
+					this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+
+					break;
+				case HYBRIDHASH_BUILD_FIRST:
+					this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+					break;
+				case HYBRIDHASH_BUILD_SECOND:
+					this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+					break;
+				default:
+					throw new Exception("Unsupported driver strategy for join driver: " + ls.name());
+			}
+		} else {
+			switch (ls) {
+				case MERGE:
+					this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+
+					break;
+				case HYBRIDHASH_BUILD_FIRST:
+					this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+					break;
+				case HYBRIDHASH_BUILD_SECOND:
+					this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+					break;
+				default:
+					throw new Exception("Unsupported driver strategy for join driver: " + ls.name());
+			}
+		}
+		
+		// open the iterator - this triggers the sorting or hash-table building
+		// and blocks until the iterator is ready
+		this.joinIterator.open();
+		
+		if (LOG.isDebugEnabled()) {
+			LOG.debug(this.taskContext.formatLogString("join task iterator ready."));
+		}
+	}
+
+	@Override
+	public void run() throws Exception {
+		final FlatJoinFunction<IT1, IT2, OT> joinStub = this.taskContext.getStub();
+		final Collector<OT> collector = this.taskContext.getOutputCollector();
+		final JoinTaskIterator<IT1, IT2, OT> joinIterator = this.joinIterator;
+		
+		while (this.running && joinIterator.callWithNextKey(joinStub, collector));
+	}
+
+	@Override
+	public void cleanup() throws Exception {
+		if (this.joinIterator != null) {
+			this.joinIterator.close();
+			this.joinIterator = null;
+		}
+	}
+	
+	@Override
+	public void cancel() {
+		this.running = false;
+		if (this.joinIterator != null) {
+			this.joinIterator.abort();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
deleted file mode 100644
index e54fca5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
-import org.apache.flink.runtime.operators.util.JoinTaskIterator;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
- * Match task which is executed by a Task Manager. The task has two inputs and one or multiple outputs.
- * It is provided with a JoinFunction implementation.
- * <p>
- * The MatchTask matches all pairs of records that share the same key and come from different inputs. Each pair of 
- * matching records is handed to the <code>match()</code> method of the JoinFunction.
- * 
- * @see org.apache.flink.api.common.functions.FlatJoinFunction
- */
-public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
-	
-	protected static final Logger LOG = LoggerFactory.getLogger(MatchDriver.class);
-	
-	protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
-	
-	private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;		// the iterator that does the actual matching
-	
-	protected volatile boolean running;
-
-	private boolean objectReuseEnabled = false;
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
-		this.taskContext = context;
-		this.running = true;
-	}
-
-	@Override
-	public int getNumberOfInputs() {
-		return 2;
-	}
-
-	@Override
-	public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
-		@SuppressWarnings("unchecked")
-		final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
-		return clazz;
-	}
-	
-	@Override
-	public int getNumberOfDriverComparators() {
-		return 2;
-	}
-
-	@Override
-	public void prepare() throws Exception{
-		final TaskConfig config = this.taskContext.getTaskConfig();
-		
-		// obtain task manager's memory manager and I/O manager
-		final MemoryManager memoryManager = this.taskContext.getMemoryManager();
-		final IOManager ioManager = this.taskContext.getIOManager();
-		
-		// set up memory and I/O parameters
-		final double fractionAvailableMemory = config.getRelativeMemoryDriver();
-		final int numPages = memoryManager.computeNumberOfPages(fractionAvailableMemory);
-		
-		// test minimum memory requirements
-		final DriverStrategy ls = config.getDriverStrategy();
-		
-		final MutableObjectIterator<IT1> in1 = this.taskContext.getInput(0);
-		final MutableObjectIterator<IT2> in2 = this.taskContext.getInput(1);
-
-		// get the key positions and types
-		final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
-		final TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
-		final TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
-		final TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
-		
-		final TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = config.getPairComparatorFactory(
-				this.taskContext.getUserCodeClassLoader());
-		if (pairComparatorFactory == null) {
-			throw new Exception("Missing pair comparator factory for Match driver");
-		}
-
-		ExecutionConfig executionConfig = taskContext.getExecutionConfig();
-		this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("MatchDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
-		}
-
-		// create and return MatchTaskIterator according to provided local strategy.
-		if (this.objectReuseEnabled) {
-			switch (ls) {
-				case MERGE:
-					this.matchIterator = new ReusingMergeInnerJoinIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
-
-					break;
-				case HYBRIDHASH_BUILD_FIRST:
-					this.matchIterator = new ReusingBuildFirstHashMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
-					break;
-				case HYBRIDHASH_BUILD_SECOND:
-					this.matchIterator = new ReusingBuildSecondHashMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
-					break;
-				default:
-					throw new Exception("Unsupported driver strategy for Match driver: " + ls.name());
-			}
-		} else {
-			switch (ls) {
-				case MERGE:
-					this.matchIterator = new NonReusingMergeInnerJoinIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
-
-					break;
-				case HYBRIDHASH_BUILD_FIRST:
-					this.matchIterator = new NonReusingBuildFirstHashMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
-					break;
-				case HYBRIDHASH_BUILD_SECOND:
-					this.matchIterator = new NonReusingBuildSecondHashMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
-					break;
-				default:
-					throw new Exception("Unsupported driver strategy for Match driver: " + ls.name());
-			}
-		}
-		
-		// open MatchTaskIterator - this triggers the sorting or hash-table building
-		// and blocks until the iterator is ready
-		this.matchIterator.open();
-		
-		if (LOG.isDebugEnabled()) {
-			LOG.debug(this.taskContext.formatLogString("Match task iterator ready."));
-		}
-	}
-
-	@Override
-	public void run() throws Exception {
-		final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
-		final Collector<OT> collector = this.taskContext.getOutputCollector();
-		final JoinTaskIterator<IT1, IT2, OT> matchIterator = this.matchIterator;
-		
-		while (this.running && matchIterator.callWithNextKey(matchStub, collector));
-	}
-
-	@Override
-	public void cleanup() throws Exception {
-		if (this.matchIterator != null) {
-			this.matchIterator.close();
-			this.matchIterator = null;
-		}
-	}
-	
-	@Override
-	public void cancel() {
-		this.running = false;
-		if (this.matchIterator != null) {
-			this.matchIterator.abort();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
index a57287a..c93c302 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
@@ -78,7 +78,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
 		getTaskConfig().setRelativeMemoryDriver(1.0f);
 		
-		BuildFirstCachedMatchDriver<Record, Record, Record> testTask = new BuildFirstCachedMatchDriver<Record, Record, Record>();
+		BuildFirstCachedJoinDriver<Record, Record, Record> testTask = new BuildFirstCachedJoinDriver<Record, Record, Record>();
 		
 		try {
 			testResettableDriver(testTask, MockMatchStub.class, 3);
@@ -109,7 +109,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
 		getTaskConfig().setRelativeMemoryDriver(1.0f);
 		
-		BuildSecondCachedMatchDriver<Record, Record, Record> testTask = new BuildSecondCachedMatchDriver<Record, Record, Record>();
+		BuildSecondCachedJoinDriver<Record, Record, Record> testTask = new BuildSecondCachedJoinDriver<Record, Record, Record>();
 		
 		try {
 			testResettableDriver(testTask, MockMatchStub.class, 3);
@@ -140,7 +140,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
 		getTaskConfig().setRelativeMemoryDriver(1.0f);
 		
-		BuildFirstCachedMatchDriver<Record, Record, Record> testTask = new BuildFirstCachedMatchDriver<Record, Record, Record>();
+		BuildFirstCachedJoinDriver<Record, Record, Record> testTask = new BuildFirstCachedJoinDriver<Record, Record, Record>();
 		
 		try {
 			testResettableDriver(testTask, MockMatchStub.class, 3);
@@ -171,7 +171,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
 		getTaskConfig().setRelativeMemoryDriver(1.0f);
 		
-		BuildSecondCachedMatchDriver<Record, Record, Record> testTask = new BuildSecondCachedMatchDriver<Record, Record, Record>();
+		BuildSecondCachedJoinDriver<Record, Record, Record> testTask = new BuildSecondCachedJoinDriver<Record, Record, Record>();
 		
 		try {
 			testResettableDriver(testTask, MockMatchStub.class, 3);
@@ -202,7 +202,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
 		getTaskConfig().setRelativeMemoryDriver(1.0f);
 		
-		BuildFirstCachedMatchDriver<Record, Record, Record> testTask = new BuildFirstCachedMatchDriver<Record, Record, Record>();
+		BuildFirstCachedJoinDriver<Record, Record, Record> testTask = new BuildFirstCachedJoinDriver<Record, Record, Record>();
 		
 		try {
 			testResettableDriver(testTask, MockMatchStub.class, 3);
@@ -233,7 +233,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
 		getTaskConfig().setRelativeMemoryDriver(1.0f);
 		
-		BuildFirstCachedMatchDriver<Record, Record, Record> testTask = new BuildFirstCachedMatchDriver<Record, Record, Record>();
+		BuildFirstCachedJoinDriver<Record, Record, Record> testTask = new BuildFirstCachedJoinDriver<Record, Record, Record>();
 		
 		try {
 			testResettableDriver(testTask, MockFailingMatchStub.class, 3);
@@ -263,7 +263,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
 		getTaskConfig().setRelativeMemoryDriver(1.0f);
 		
-		BuildSecondCachedMatchDriver<Record, Record, Record> testTask = new BuildSecondCachedMatchDriver<Record, Record, Record>();
+		BuildSecondCachedJoinDriver<Record, Record, Record> testTask = new BuildSecondCachedJoinDriver<Record, Record, Record>();
 		
 		try {
 			testResettableDriver(testTask, MockFailingMatchStub.class, 3);
@@ -294,7 +294,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
 		getTaskConfig().setRelativeMemoryDriver(1.0f);
 		
-		final BuildFirstCachedMatchDriver<Record, Record, Record> testTask = new BuildFirstCachedMatchDriver<Record, Record, Record>();
+		final BuildFirstCachedJoinDriver<Record, Record, Record> testTask = new BuildFirstCachedJoinDriver<Record, Record, Record>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		
@@ -338,7 +338,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
 		getTaskConfig().setRelativeMemoryDriver(1.0f);
 		
-		final BuildSecondCachedMatchDriver<Record, Record, Record> testTask = new BuildSecondCachedMatchDriver<Record, Record, Record>();
+		final BuildSecondCachedJoinDriver<Record, Record, Record> testTask = new BuildSecondCachedJoinDriver<Record, Record, Record>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		
@@ -382,7 +382,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(1.0f);
 		
-		final BuildFirstCachedMatchDriver<Record, Record, Record> testTask = new BuildFirstCachedMatchDriver<Record, Record, Record>();
+		final BuildFirstCachedJoinDriver<Record, Record, Record> testTask = new BuildFirstCachedJoinDriver<Record, Record, Record>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		
@@ -426,7 +426,7 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(1.0f);
 		
-		final BuildSecondCachedMatchDriver<Record, Record, Record> testTask = new BuildSecondCachedMatchDriver<Record, Record, Record>();
+		final BuildSecondCachedJoinDriver<Record, Record, Record> testTask = new BuildSecondCachedJoinDriver<Record, Record, Record>();
 		
 		
 		final AtomicBoolean success = new AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
index 29be8f8..30c1610 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
@@ -80,7 +80,7 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -113,7 +113,7 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
-		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			testDriver(testTask, MockMatchStub.class);
@@ -144,7 +144,7 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
-		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			testDriver(testTask, MockMatchStub.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index 16aea69..8fbf05e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -90,7 +90,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -124,7 +124,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -160,7 +160,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -196,7 +196,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -232,7 +232,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -268,7 +268,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
@@ -304,7 +304,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
@@ -339,7 +339,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
@@ -375,7 +375,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
@@ -404,7 +404,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
@@ -455,7 +455,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
@@ -506,7 +506,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
 		setNumFileHandlesForSort(4);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
@@ -556,7 +556,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
-		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			testDriver(testTask, MockMatchStub.class);
@@ -587,7 +587,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
-		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			testDriver(testTask, MockMatchStub.class);
@@ -618,7 +618,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
-		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			testDriver(testTask, MockMatchStub.class);
@@ -649,7 +649,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
-		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			testDriver(testTask, MockMatchStub.class);
@@ -680,7 +680,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
-		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			testDriver(testTask, MockMatchStub.class);
@@ -711,7 +711,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
-		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			testDriver(testTask, MockFailingMatchStub.class);
@@ -741,7 +741,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
-		MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		try {
 			testDriver(testTask, MockFailingMatchStub.class);
@@ -773,7 +773,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 			getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
 			getTaskConfig().setRelativeMemoryDriver(hash_frac);
 
-			final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 
 			final AtomicBoolean success = new AtomicBoolean(false);
 
@@ -827,7 +827,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 			getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
 			getTaskConfig().setRelativeMemoryDriver(hash_frac);
 
-			final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 
 			final AtomicBoolean success = new AtomicBoolean(false);
 
@@ -876,7 +876,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		
@@ -920,7 +920,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(hash_frac);
 		
-		final MatchDriver<Record, Record, Record> testTask = new MatchDriver<Record, Record, Record>();
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 8e9266d..7a3639b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -47,7 +47,7 @@ import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
+import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.GroupReduceDriver;
@@ -276,7 +276,7 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			headConfig.setIterationHeadIndexOfSyncOutput(2);
 
 			// the driver
-			headConfig.setDriver(BuildSecondCachedMatchDriver.class);
+			headConfig.setDriver(BuildSecondCachedJoinDriver.class);
 			headConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
 			headConfig.setStubWrapper(
 				new UserCodeClassWrapper<NeighborWithComponentIDJoin>(NeighborWithComponentIDJoin.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index c29e868..a6e6b6e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
+import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
 import org.apache.flink.runtime.operators.CoGroupDriver;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
@@ -204,7 +204,7 @@ public class CustomCompensatableDanglingPageRank {
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 		intermediateConfig.setIterationId(ITERATION_ID);
 //		intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
-		intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
+		intermediateConfig.setDriver(BuildSecondCachedJoinDriver.class);
 		intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
 		intermediateConfig.setRelativeMemoryDriver((double) matchMemory / totalMemoryConsumption);
 		intermediateConfig.addInputToGroup(0);

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index de9e0a1..0bf258f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
+import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
 import org.apache.flink.runtime.operators.CoGroupDriver;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
@@ -204,7 +204,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 		intermediateConfig.setIterationId(ITERATION_ID);
 //		intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
-		intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
+		intermediateConfig.setDriver(BuildSecondCachedJoinDriver.class);
 		intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
 		intermediateConfig.setRelativeMemoryDriver((double)matchMemory/totalMemoryConsumption);
 		intermediateConfig.addInputToGroup(0);

http://git-wip-us.apache.org/repos/asf/flink/blob/685086a3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index eb2ccdc..78038b3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -37,7 +37,7 @@ import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
+import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
 import org.apache.flink.runtime.operators.CoGroupDriver;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
@@ -183,7 +183,7 @@ public class CompensatableDanglingPageRank {
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 		intermediateConfig.setIterationId(ITERATION_ID);
 //		intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
-		intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
+		intermediateConfig.setDriver(BuildSecondCachedJoinDriver.class);
 		intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
 		intermediateConfig.setRelativeMemoryDriver((double)matchMemory/totalMemoryConsumption);
 		intermediateConfig.addInputToGroup(0);


[3/5] flink git commit: [FLINK-2240] Add bloom filter to filter probe records during hash join.

Posted by se...@apache.org.
[FLINK-2240] Add bloom filter to filter probe records during hash join.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61dcae39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61dcae39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61dcae39

Branch: refs/heads/master
Commit: 61dcae391cb3b45ba3aff47d4d9163889d2958a4
Parents: 685086a
Author: chengxiang li <ch...@intel.com>
Authored: Fri Jul 3 23:53:47 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 6 17:14:39 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  13 +-
 .../operators/hash/MutableHashTable.java        | 145 +++++++++-
 .../runtime/operators/util/BloomFilter.java     | 226 ++++++++++++++++
 .../MutableHashTablePerformanceBenchmark.java   | 268 +++++++++++++++++++
 .../runtime/operators/util/BloomFilterTest.java | 162 +++++++++++
 5 files changed, 806 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index c76741b..dad2d99 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -178,7 +178,7 @@ public final class ConfigConstants {
 	 * for hybrid hash joins. 
 	 */
 	public static final String DEFAULT_SPILLING_MAX_FAN_KEY = "taskmanager.runtime.max-fan";
-
+	
 	/**
 	 * Key for the default spilling threshold. When more than the threshold memory of the sort buffers is full, the
 	 * sorter will start spilling to disk.
@@ -190,6 +190,12 @@ public final class ConfigConstants {
 	 * A value of 0 indicates infinite waiting.
 	 */
 	public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout";
+	
+	/**
+	 * While spill probe record to disk during probe phase, whether enable bloom filter to filter the probe records
+	 * to minimize the spilled probe records count.
+	 */
+	public static final String HASHJOIN_ENABLE_BLOOMFILTER = "hashjoin.bloomfilter.enabled";
 
 	// ------------------------ YARN Configuration ------------------------
 
@@ -552,6 +558,11 @@ public final class ConfigConstants {
 	 * The default timeout for filesystem stream opening: infinite (means max long milliseconds).
 	 */
 	public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;
+	
+	/**
+	 * Enable bloom filter for hash join as it promote hash join performance most of the time.
+	 */
+	public static final boolean DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER = true;
 
 	// ------------------------ YARN Configuration ------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 7f07cfb..4a57986 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.hash;
 
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.iterative.io.HashPartitionIterator;
+import org.apache.flink.runtime.operators.util.BloomFilter;
 import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -194,6 +195,11 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 * Constant for the bucket status, indicating that the bucket is in memory.
 	 */
 	private static final byte BUCKET_STATUS_IN_MEMORY = 0;
+
+	/**
+	 * Constant for the bucket status, indicating that the bucket has filter.
+	 */
+	private static final byte BUCKET_STATUS_IN_FILTER = 1;
 	
 	// ------------------------------------------------------------------------
 	//                              Members
@@ -348,6 +354,8 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	
 	private boolean running = true;
 
+	private BloomFilter bloomFilter;
+	
 	// ------------------------------------------------------------------------
 	//                         Construction and Teardown
 	// ------------------------------------------------------------------------
@@ -469,12 +477,19 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 				this.recordComparator.setReference(next);
 				this.bucketIterator.set(bucket, p.overflowSegments, p, hash, bucketInSegmentOffset);
 				return true;
-			}
-			else {
-				p.insertIntoProbeBuffer(next);
+			} else {
+				byte status = bucket.get(bucketInSegmentOffset + HEADER_STATUS_OFFSET);
+				if (status == BUCKET_STATUS_IN_FILTER) {
+					this.bloomFilter.setBitsLocation(bucket, bucketInSegmentOffset + BUCKET_HEADER_LENGTH);
+					// Use BloomFilter to filter out all the probe records which would not match any key in spilled build table buckets.
+					if (this.bloomFilter.testHash(hash)) {
+						p.insertIntoProbeBuffer(next);
+					}
+				} else {
+					p.insertIntoProbeBuffer(next);
+				}
 			}
 		}
-		
 		// -------------- partition done ---------------
 		
 		return false;
@@ -710,6 +725,27 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 			p.finalizeBuildPhase(this.ioManager, this.currentEnumerator, this.writeBehindBuffers);
 		}
 	}
+
+	private void initBloomFilter(int numBuckets) {
+		int avgNumRecordsPerBucket = getEstimatedMaxBucketEntries(this.availableMemory.size(), this.segmentSize,
+			numBuckets, this.avgRecordLen);
+		// Assign all bucket size to bloom filter except bucket header length.
+		int byteSize = HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH;
+		this.bloomFilter = new BloomFilter(avgNumRecordsPerBucket, byteSize);
+		if (LOG.isDebugEnabled()) {
+			double fpp = BloomFilter.estimateFalsePositiveProbability(avgNumRecordsPerBucket, byteSize << 3);
+			LOG.debug(String.format("Create BloomFilter with average input entries per bucket[%d], bytes size[%d], false positive probability[%f].",
+				avgNumRecordsPerBucket, byteSize, fpp));
+		}
+	}
+
+	final private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
+		final long totalSize = ((long) bufferSize) * numBuffers;
+		final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
+		final long maxNumRecordsStorable = (MAX_RECURSION_DEPTH + 1) * numRecordsStorable;
+		final long maxNumRecordsPerBucket = maxNumRecordsStorable / numBuckets;
+		return (int) maxNumRecordsPerBucket;
+	}
 	
 	/**
 	 * @param p
@@ -816,7 +852,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 				final int hashCode = hash(btComparator.hash(rec), nextRecursionLevel);
 				insertIntoTable(rec, hashCode);
 			}
-
+			
 			// finalize the partitions
 			for (int i = 0; i < this.partitionsBeingBuilt.size(); i++) {
 				HashPartition<BT, PT> part = this.partitionsBeingBuilt.get(i);
@@ -853,6 +889,14 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		if (pointer != -1) {
 			// record was inserted into an in-memory partition. a pointer must be inserted into the buckets
 			insertBucketEntry(p, bucket, bucketInSegmentPos, hashCode, pointer);
+		} else {
+			byte status = bucket.get(bucketInSegmentPos + HEADER_STATUS_OFFSET);
+			if (status == BUCKET_STATUS_IN_FILTER) {
+				// While partition has been spilled, relocation bloom filter bits for current bucket,
+				// and build bloom filter with hashcode.
+				this.bloomFilter.setBitsLocation(bucket, bucketInSegmentPos + BUCKET_HEADER_LENGTH);
+				this.bloomFilter.addHash(hashCode);
+			}
 		}
 	}
 	
@@ -1047,6 +1091,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 		this.buckets = table;
 		this.numBuckets = numBuckets;
+		
+		boolean enableBloomFilter = GlobalConfiguration.getBoolean(
+			ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
+		if (enableBloomFilter) {
+			initBloomFilter(numBuckets);
+		}
 	}
 	
 	/**
@@ -1088,6 +1138,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 		final HashPartition<BT, PT> p = partitions.get(largestPartNum);
 		
+		boolean enableBloomFilter = GlobalConfiguration.getBoolean(
+			ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
+		if (enableBloomFilter) {
+			buildBloomFilterForBucketsInPartition(largestPartNum, p);
+		}
+		
 		// spill the partition
 		int numBuffersFreed = p.spillPartition(this.availableMemory, this.ioManager, 
 										this.currentEnumerator.next(), this.writeBehindBuffers);
@@ -1101,6 +1157,81 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		return largestPartNum;
 	}
 	
+	final protected void buildBloomFilterForBucketsInPartition(int partNum, HashPartition<BT, PT> partition) {
+		// Find all the buckets which belongs to this partition, and build bloom filter for each bucket(include its overflow buckets).
+		final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
+		for (MemorySegment segment : this.buckets) {
+			for (int i = 0; i < bucketsPerSegment; i++) {
+				final int bucketInSegmentOffset = i * HASH_BUCKET_SIZE;
+				byte partitionNumber = segment.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
+				if (partitionNumber == partNum) {
+					byte status = segment.get(bucketInSegmentOffset + HEADER_STATUS_OFFSET);
+					if (status == BUCKET_STATUS_IN_MEMORY) {
+						buildBloomFilterForBucket(bucketInSegmentOffset, segment, partition);
+					}
+				}
+			}
+		}
+	}
+	
+	/**
+	 * Set all the bucket memory except bucket header as the bit set of bloom filter, and use hash code of build records
+	 * to build bloom filter.
+	 *
+	 * @param bucketInSegmentPos
+	 * @param bucket
+	 * @param p
+	 */
+	final void buildBloomFilterForBucket(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
+		final int count = bucket.getShort(bucketInSegmentPos + HEADER_COUNT_OFFSET);
+		int[] hashCodes = new int[count];
+		// As the hashcode and bloom filter occupy same bytes, so we read all hashcode out at first and then write back to bloom filter.
+		for (int i = 0; i < count; i++) {
+			hashCodes[i] = bucket.getInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + i * HASH_CODE_LEN);
+		}
+		this.bloomFilter.setBitsLocation(bucket, bucketInSegmentPos + BUCKET_HEADER_LENGTH);
+		for (int hashCode : hashCodes) {
+			this.bloomFilter.addHash(hashCode);
+		}
+		buildBloomFilterForExtraOverflowSegments(bucketInSegmentPos, bucket, p);
+	}
+	
+	final private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
+		int totalCount = 0;
+		boolean skip = false;
+		long forwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
+		while (forwardPointer != BUCKET_FORWARD_POINTER_NOT_SET) {
+			final int overflowSegNum = (int) (forwardPointer >>> 32);
+			if (overflowSegNum < 0 || overflowSegNum >= p.numOverflowSegments) {
+				skip = true;
+				break;
+			}
+			MemorySegment overflowSegment = p.overflowSegments[overflowSegNum];
+			int bucketInOverflowSegmentOffset = (int) (forwardPointer & 0xffffffff);
+			
+			final int count = overflowSegment.getShort(bucketInOverflowSegmentOffset + HEADER_COUNT_OFFSET);
+			totalCount += count;
+			// The bits size of bloom filter per bucket is 112 * 8, while expected input entries is greater than 2048, the fpp would higher than 0.9,
+			// which make the bloom filter an overhead instead of optimization.
+			if (totalCount > 2048) {
+				skip = true;
+				break;
+			}
+			
+			for (int i = 0; i < count; i++) {
+				int hashCode = overflowSegment.getInt(bucketInOverflowSegmentOffset + BUCKET_HEADER_LENGTH + i * HASH_CODE_LEN);
+				this.bloomFilter.addHash(hashCode);
+			}
+			
+			forwardPointer = overflowSegment.getLong(bucketInOverflowSegmentOffset + HEADER_FORWARD_OFFSET);
+			
+		}
+		
+		if (!skip) {
+			bucket.put(bucketInSegmentPos + HEADER_STATUS_OFFSET, BUCKET_STATUS_IN_FILTER);
+		}
+	}
+
 	/**
 	 * This method makes sure that at least a certain number of memory segments is in the list of free segments.
 	 * Free memory can be in the list of free segments, or in the return-queue where segments used to write behind are

http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
new file mode 100644
index 0000000..947a56b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.core.memory.MemorySegment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * BloomFilter is a probabilistic data structure for set membership check. BloomFilters are
+ * highly space efficient when compared to using a HashSet. Because of the probabilistic nature of
+ * bloom filter false positive (element not present in bloom filter but test() says true) are
+ * possible but false negatives are not possible (if element is present then test() will never
+ * say false). The false positive probability is configurable depending on which storage requirement
+ * may increase or decrease. Lower the false positive probability greater is the space requirement.
+ * Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
+ * During the creation of bloom filter expected number of entries must be specified. If the number
+ * of insertions exceed the specified initial number of entries then false positive probability will
+ * increase accordingly.
+ * <p/>
+ * Internally, this implementation of bloom filter uses MemorySegment to store BitSet, BloomFilter and
+ * BitSet are designed to be able to switch between different MemorySegments, so that Flink can share
+ * the same BloomFilter/BitSet object instance for different bloom filters.
+ * <p/>
+ * Part of this class refers to the implementation from Apache Hive project
+ * https://github.com/apache/hive/blob/master/common/src/java/org/apache/hive/common/util/BloomFilter.java
+ */
+
+public class BloomFilter {
+	
+	protected BitSet bitSet;
+	protected int expectedEntries;
+	protected int numHashFunctions;
+	
+	public BloomFilter(int expectedEntries, int byteSize) {
+		checkArgument(expectedEntries > 0, "expectedEntries should be > 0");
+		this.expectedEntries = expectedEntries;
+		this.numHashFunctions = optimalNumOfHashFunctions(expectedEntries, byteSize << 3);
+		this.bitSet = new BitSet(byteSize);
+	}
+	
+	public void setBitsLocation(MemorySegment memorySegment, int offset) {
+		this.bitSet.setMemorySegment(memorySegment, offset);
+	}
+	
+	/**
+	 * Compute optimal bits number with given input entries and expected false positive probability.
+	 *
+	 * @param inputEntries
+	 * @param fpp
+	 * @return optimal bits number
+	 */
+	public static int optimalNumOfBits(long inputEntries, double fpp) {
+		int numBits = (int) (-inputEntries * Math.log(fpp) / (Math.log(2) * Math.log(2)));
+		return numBits;
+	}
+	
+	/**
+	 * Compute the false positive probability based on given input entries and bits size.
+	 * Note: this is just the math expected value, you should not expect the fpp in real case would under the return value for certain.
+	 *
+	 * @param inputEntries
+	 * @param bitSize
+	 * @return
+	 */
+	public static double estimateFalsePositiveProbability(long inputEntries, int bitSize) {
+		int numFunction = optimalNumOfHashFunctions(inputEntries, bitSize);
+		double p = Math.pow(Math.E, -(double) numFunction * inputEntries / bitSize);
+		double estimatedFPP = Math.pow(1 - p, numFunction);
+		return estimatedFPP;
+	}
+	
+	/**
+	 * compute the optimal hash function number with given input entries and bits size, which would
+	 * make the false positive probability lowest.
+	 *
+	 * @param expectEntries
+	 * @param bitSize
+	 * @return hash function number
+	 */
+	static int optimalNumOfHashFunctions(long expectEntries, long bitSize) {
+		return Math.max(1, (int) Math.round((double) bitSize / expectEntries * Math.log(2)));
+	}
+	
+	public void addHash(int hash32) {
+		int hash1 = hash32;
+		int hash2 = hash32 >>> 16;
+		
+		for (int i = 1; i <= numHashFunctions; i++) {
+			int combinedHash = hash1 + (i * hash2);
+			// hashcode should be positive, flip all the bits if it's negative
+			if (combinedHash < 0) {
+				combinedHash = ~combinedHash;
+			}
+			int pos = combinedHash % bitSet.bitSize();
+			bitSet.set(pos);
+		}
+	}
+		
+	public boolean testHash(int hash32) {
+		int hash1 = hash32;
+		int hash2 = hash32 >>> 16;
+		
+		for (int i = 1; i <= numHashFunctions; i++) {
+			int combinedHash = hash1 + (i * hash2);
+			// hashcode should be positive, flip all the bits if it's negative
+			if (combinedHash < 0) {
+				combinedHash = ~combinedHash;
+			}
+			int pos = combinedHash % bitSet.bitSize();
+			if (!bitSet.get(pos)) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+	public void reset() {
+		this.bitSet.clear();
+	}
+	
+	@Override
+	public String toString() {
+		StringBuilder output = new StringBuilder();
+		output.append("BloomFilter:\n");
+		output.append("\thash function number:").append(numHashFunctions).append("\n");
+		output.append(bitSet);
+		return output.toString();
+	}
+	
+	/**
+	 * Bare metal bit set implementation. For performance reasons, this implementation does not check
+	 * for index bounds nor expand the bit set size if the specified index is greater than the size.
+	 */
+	public class BitSet {
+		private MemorySegment memorySegment;
+		// MemorySegment byte array offset.
+		private int offset;
+		// MemorySegment byte size.
+		private int length;
+		private final int LONG_POSITION_MASK = 0xffffffc0;
+		
+		public BitSet(int byteSize) {
+			Preconditions.checkArgument(byteSize > 0, "bits size should be greater than 0.");
+			Preconditions.checkArgument(byteSize << 29 == 0, "bytes size should be integral multiple of long size(8 Bytes).");
+			this.length = byteSize;
+		}
+		
+		public void setMemorySegment(MemorySegment memorySegment, int offset) {
+			this.memorySegment = memorySegment;
+			this.offset = offset;
+		}
+		
+		/**
+		 * Sets the bit at specified index.
+		 *
+		 * @param index - position
+		 */
+		public void set(int index) {
+			int longIndex = (index & LONG_POSITION_MASK) >>> 3;
+			long current = memorySegment.getLong(offset + longIndex);
+			current |= (1L << index);
+			memorySegment.putLong(offset + longIndex, current);
+		}
+		
+		/**
+		 * Returns true if the bit is set in the specified index.
+		 *
+		 * @param index - position
+		 * @return - value at the bit position
+		 */
+		public boolean get(int index) {
+			int longIndex = (index & LONG_POSITION_MASK) >>> 3;
+			long current = memorySegment.getLong(offset + longIndex);
+			return (current & (1L << index)) != 0;
+		}
+		
+		/**
+		 * Number of bits
+		 */
+		public int bitSize() {
+			return length << 3;
+		}
+		
+		public MemorySegment getMemorySegment() {
+			return this.memorySegment;
+		}
+		
+		/**
+		 * Clear the bit set.
+		 */
+		public void clear() {
+			long zeroValue = 0L;
+			for (int i = 0; i < (length / 8); i++) {
+				memorySegment.putLong(offset + i * 8, zeroValue);
+			}
+		}
+		
+		@Override
+		public String toString() {
+			StringBuilder output = new StringBuilder();
+			output.append("BitSet:\n");
+			output.append("\tMemorySegment:").append(memorySegment.size()).append("\n");
+			output.append("\tOffset:").append(offset).append("\n");
+			output.append("\tLength:").append(length).append("\n");
+			return output.toString();
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
new file mode 100644
index 0000000..452e4c1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.types.StringPair;
+import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+public class MutableHashTablePerformanceBenchmark {
+	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+	
+	private MemoryManager memManager;
+	private IOManager ioManager;
+	
+	private TypeSerializer<StringPair> pairBuildSideAccesssor;
+	private TypeSerializer<StringPair> pairProbeSideAccesssor;
+	private TypeComparator<StringPair> pairBuildSideComparator;
+	private TypeComparator<StringPair> pairProbeSideComparator;
+	private TypePairComparator<StringPair, StringPair> pairComparator;
+	
+	private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char.";
+	
+	
+	@Before
+	public void setup() {
+		this.pairBuildSideAccesssor = new StringPairSerializer();
+		this.pairProbeSideAccesssor = new StringPairSerializer();
+		this.pairBuildSideComparator = new StringPairComparator();
+		this.pairProbeSideComparator = new StringPairComparator();
+		this.pairComparator = new StringPairPairComparator();
+		
+		this.memManager = new DefaultMemoryManager(64 * 1024 * 1024, 1);
+		this.ioManager = new IOManagerAsync();
+	}
+	
+	@After
+	public void tearDown() {
+		// shut down I/O manager and Memory Manager and verify the correct shutdown
+		this.ioManager.shutdown();
+		if (!this.ioManager.isProperlyShutDown()) {
+			fail("I/O manager was not property shut down.");
+		}
+		if (!this.memManager.verifyEmpty()) {
+			fail("Not all memory was properly released to the memory manager --> Memory Leak.");
+		}
+	}
+	
+	@Test
+	public void compareMutableHashTablePerformance1() throws IOException {
+		// ----------------------------------------------90% filtered during probe spill phase-----------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 10;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = 500000;
+		
+		long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+		long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+		
+		System.out.println("HybridHashJoin2:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+		System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+	}
+	
+	@Test
+	public void compareMutableHashTablePerformance2() throws IOException {
+		// ----------------------------------------------80% filtered during probe spill phase-----------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 5;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = 1000000;
+		
+		long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+		long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+		
+		System.out.println("HybridHashJoin3:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+		System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+	}
+	
+	@Test
+	public void compareMutableHashTablePerformance3() throws IOException {
+		// ----------------------------------------------50% filtered during probe spill phase-------------------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 2;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = 2500000;
+		
+		long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+		long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+		
+		System.out.println("HybridHashJoin4:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+		System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+	}
+	
+	@Test
+	public void compareMutableHashTablePerformance4() throws IOException {
+		// ----------------------------------------------0% filtered during probe spill phase-----------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 1;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = probeSize / buildStep;
+		
+		long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+		long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+		
+		System.out.println("HybridHashJoin5:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+		System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+	}
+	
+	private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize,
+		int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException {
+		
+		InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
+		InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
+		
+		Configuration conf = new Configuration();
+		conf.setBoolean(ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, enableBloomFilter);
+		GlobalConfiguration.includeConfiguration(conf);
+		
+		// allocate the memory for the HashTable
+		List<MemorySegment> memSegments;
+		try {
+			// 33 is minimum number of pages required to perform hash join this inputs
+			memSegments = this.memManager.allocatePages(MEM_OWNER, (int) (this.memManager.getMemorySize() / this.memManager.getPageSize()));
+		} catch (MemoryAllocationException maex) {
+			fail("Memory for the Join could not be provided.");
+			return -1;
+		}
+		
+		// ----------------------------------------------------------------------------------------
+		
+		long start = System.currentTimeMillis();
+		final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
+			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
+			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
+			memSegments, ioManager);
+		join.open(buildIterator, probeIterator);
+		
+		final StringPair recordReuse = new StringPair();
+		int numRecordsInJoinResult = 0;
+		
+		while (join.nextRecord()) {
+			MutableHashTable.HashBucketIterator<StringPair, StringPair> buildSide = join.getBuildSideIterator();
+			while (buildSide.next(recordReuse) != null) {
+				numRecordsInJoinResult++;
+			}
+		}
+		Assert.assertEquals("Wrong number of records in join result.", expectedResultSize, numRecordsInJoinResult);
+		
+		join.close();
+		long cost = System.currentTimeMillis() - start;
+		// ----------------------------------------------------------------------------------------
+		
+		this.memManager.release(join.getFreedMemory());
+		return cost;
+	}
+	
+	
+	static class InputIterator implements MutableObjectIterator<StringPair> {
+		
+		private int numLeft;
+		private int distance;
+		private int scope;
+		
+		public InputIterator(int size, int distance, int scope) {
+			this.numLeft = size;
+			this.distance = distance;
+			this.scope = scope;
+		}
+		
+		@Override
+		public StringPair next(StringPair reuse) throws IOException {
+			if (this.numLeft > 0) {
+				numLeft--;
+				int currentKey = (numLeft * distance) % scope;
+				reuse.setKey(Integer.toString(currentKey));
+				reuse.setValue(COMMENT);
+				return reuse;
+			} else {
+				return null;
+			}
+		}
+		
+		@Override
+		public StringPair next() throws IOException {
+			return next(new StringPair());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
new file mode 100644
index 0000000..cbbeca0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class BloomFilterTest {
+	
+	private static BloomFilter bloomFilter;
+	private static final int INPUT_SIZE = 1024;
+	private static final double FALSE_POSITIVE_PROBABILITY = 0.05;
+	
+	@BeforeClass
+	public static void init() {
+		int bitsSize = BloomFilter.optimalNumOfBits(INPUT_SIZE, FALSE_POSITIVE_PROBABILITY);
+		bitsSize = bitsSize + (Long.SIZE - (bitsSize % Long.SIZE));
+		int byteSize = bitsSize >>> 3;
+		MemorySegment memorySegment = new MemorySegment(new byte[byteSize]);
+		bloomFilter = new BloomFilter(INPUT_SIZE, byteSize);
+		bloomFilter.setBitsLocation(memorySegment, 0);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testBloomFilterArguments1() {
+		new BloomFilter(-1, 128);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testBloomFilterArguments2() {
+		new BloomFilter(0, 128);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testBloomFilterArguments3() {
+		new BloomFilter(1024, -1);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testBloomFilterArguments4() {
+		new BloomFilter(1024, 0);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testBloomFilterArguments5() {
+		new BloomFilter(1024, 21);
+	}
+	
+	@Test
+	public void testBloomNumBits() {
+		assertEquals(0, BloomFilter.optimalNumOfBits(0, 0));
+		assertEquals(0, BloomFilter.optimalNumOfBits(0, 1));
+		assertEquals(0, BloomFilter.optimalNumOfBits(1, 1));
+		assertEquals(7, BloomFilter.optimalNumOfBits(1, 0.03));
+		assertEquals(72, BloomFilter.optimalNumOfBits(10, 0.03));
+		assertEquals(729, BloomFilter.optimalNumOfBits(100, 0.03));
+		assertEquals(7298, BloomFilter.optimalNumOfBits(1000, 0.03));
+		assertEquals(72984, BloomFilter.optimalNumOfBits(10000, 0.03));
+		assertEquals(729844, BloomFilter.optimalNumOfBits(100000, 0.03));
+		assertEquals(7298440, BloomFilter.optimalNumOfBits(1000000, 0.03));
+		assertEquals(6235224, BloomFilter.optimalNumOfBits(1000000, 0.05));
+	}
+	
+	@Test
+	public void testBloomFilterNumHashFunctions() {
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(-1, -1));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(0, 0));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10, 0));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10, 10));
+		assertEquals(7, BloomFilter.optimalNumOfHashFunctions(10, 100));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(100, 100));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(1000, 100));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10000, 100));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(100000, 100));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(1000000, 100));
+	}
+	
+	@Test
+	public void testBloomFilterFalsePositiveProbability() {
+		assertEquals(7298440, BloomFilter.optimalNumOfBits(1000000, 0.03));
+		assertEquals(6235224, BloomFilter.optimalNumOfBits(1000000, 0.05));
+		assertEquals(4792529, BloomFilter.optimalNumOfBits(1000000, 0.1));
+		assertEquals(3349834, BloomFilter.optimalNumOfBits(1000000, 0.2));
+		assertEquals(2505911, BloomFilter.optimalNumOfBits(1000000, 0.3));
+		assertEquals(1907139, BloomFilter.optimalNumOfBits(1000000, 0.4));
+		
+		// Make sure the estimated fpp error is less than 1%.
+		assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 7298440) - 0.03) < 0.01);
+		assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 6235224) - 0.05) < 0.01);
+		assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 4792529) - 0.1) < 0.01);
+		assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 3349834) - 0.2) < 0.01);
+		assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 2505911) - 0.3) < 0.01);
+		assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 1907139) - 0.4) < 0.01);
+	}
+	
+	@Test
+	public void testHashcodeInput() {
+		bloomFilter.reset();
+		int val1 = "val1".hashCode();
+		int val2 = "val2".hashCode();
+		int val3 = "val3".hashCode();
+		int val4 = "val4".hashCode();
+		int val5 = "val5".hashCode();
+		
+		assertFalse(bloomFilter.testHash(val1));
+		assertFalse(bloomFilter.testHash(val2));
+		assertFalse(bloomFilter.testHash(val3));
+		assertFalse(bloomFilter.testHash(val4));
+		assertFalse(bloomFilter.testHash(val5));
+		bloomFilter.addHash(val1);
+		assertTrue(bloomFilter.testHash(val1));
+		assertFalse(bloomFilter.testHash(val2));
+		assertFalse(bloomFilter.testHash(val3));
+		assertFalse(bloomFilter.testHash(val4));
+		assertFalse(bloomFilter.testHash(val5));
+		bloomFilter.addHash(val2);
+		assertTrue(bloomFilter.testHash(val1));
+		assertTrue(bloomFilter.testHash(val2));
+		assertFalse(bloomFilter.testHash(val3));
+		assertFalse(bloomFilter.testHash(val4));
+		assertFalse(bloomFilter.testHash(val5));
+		bloomFilter.addHash(val3);
+		assertTrue(bloomFilter.testHash(val1));
+		assertTrue(bloomFilter.testHash(val2));
+		assertTrue(bloomFilter.testHash(val3));
+		assertFalse(bloomFilter.testHash(val4));
+		assertFalse(bloomFilter.testHash(val5));
+		bloomFilter.addHash(val4);
+		assertTrue(bloomFilter.testHash(val1));
+		assertTrue(bloomFilter.testHash(val2));
+		assertTrue(bloomFilter.testHash(val3));
+		assertTrue(bloomFilter.testHash(val4));
+		assertFalse(bloomFilter.testHash(val5));
+		bloomFilter.addHash(val5);
+		assertTrue(bloomFilter.testHash(val1));
+		assertTrue(bloomFilter.testHash(val2));
+		assertTrue(bloomFilter.testHash(val3));
+		assertTrue(bloomFilter.testHash(val4));
+		assertTrue(bloomFilter.testHash(val5));
+	}
+}
\ No newline at end of file


[2/5] flink git commit: [hotfix] Increase timeout for YARN tests to 180 seconds to prevent occasional CI failures.

Posted by se...@apache.org.
[hotfix] Increase timeout for YARN tests to 180 seconds to prevent occasional CI failures.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5b84b2b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5b84b2b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5b84b2b

Branch: refs/heads/master
Commit: a5b84b2b8284bcdaa649050b5090d79d8b58344c
Parents: 5a788ec
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 6 16:55:32 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 6 16:56:36 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/yarn/YarnTestBase.java     | 36 ++++++++++++--------
 1 file changed, 21 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a5b84b2b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index 23b8940..2d22700 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -425,13 +425,15 @@ public abstract class YarnTestBase {
 		System.setErr(new PrintStream(errContent));
 
 
-		final int START_TIMEOUT_SECONDS = 60;
-
+		// we wait for at most three minutes
+		final int START_TIMEOUT_SECONDS = 180;
+		final long deadline = System.currentTimeMillis() + (START_TIMEOUT_SECONDS * 1000);
+		
 		Runner runner = new Runner(args, type);
 		runner.start();
 
 		boolean expectedStringSeen = false;
-		for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
+		do {
 			sleep(1000);
 			String outContentString = outContent.toString();
 			String errContentString = errContent.toString();
@@ -448,8 +450,7 @@ public abstract class YarnTestBase {
 				}
 			}
 			// check output for correct TaskManager startup.
-			if(outContentString.contains(terminateAfterString)
-					|| errContentString.contains(terminateAfterString) ) {
+			if (outContentString.contains(terminateAfterString) || errContentString.contains(terminateAfterString) ) {
 				expectedStringSeen = true;
 				LOG.info("Found expected output in redirected streams");
 				// send "stop" command to command line interface
@@ -457,23 +458,28 @@ public abstract class YarnTestBase {
 				runner.sendStop();
 				// wait for the thread to stop
 				try {
-					runner.join(10000);
-				} catch (InterruptedException e) {
+					runner.join(30000);
+				}
+				catch (InterruptedException e) {
 					LOG.debug("Interrupted while stopping runner", e);
 				}
 				LOG.warn("RunWithArgs runner stopped.");
-				break;
 			}
-			// check if thread died
-			if(!runner.isAlive()) {
-				sendOutput();
-				if(runner.getReturnValue() != 0) {
-					Assert.fail("Runner thread died before the test was finished. Return value = " + runner.getReturnValue());
-				} else {
-					LOG.info("Runner stopped earlier than expected with return value = 0");
+			else {
+				// check if thread died
+				if (!runner.isAlive()) {
+					sendOutput();
+					if (runner.getReturnValue() != 0) {
+						Assert.fail("Runner thread died before the test was finished. Return value = "
+								+ runner.getReturnValue());
+					} else {
+						LOG.info("Runner stopped earlier than expected with return value = 0");
+					}
 				}
 			}
 		}
+		while (!expectedStringSeen && System.currentTimeMillis() < deadline);
+		
 		sendOutput();
 		Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " +
 				"expected string did not show up", expectedStringSeen);


[4/5] flink git commit: [FLINK-2240] [runtime] Pass flag to configure use of bloom filters through runtime configuration.

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index 7172887..d302487 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -238,7 +238,7 @@ public class ReusingReOpenableHashTableITCase {
 				new ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
 						buildInput, probeInput, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
+					this.memoryManager, ioManager, this.parentTask, 1.0, true);
 		
 		iterator.open();
 		// do first join with both inputs
@@ -276,7 +276,7 @@ public class ReusingReOpenableHashTableITCase {
 	//
 	//
 	
-	private final MutableObjectIterator<Record> getProbeInput(final int numKeys,
+	private MutableObjectIterator<Record> getProbeInput(final int numKeys,
 			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
 		MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
 		MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
@@ -289,8 +289,7 @@ public class ReusingReOpenableHashTableITCase {
 	}
 	
 	@Test
-	public void testSpillingHashJoinWithMassiveCollisions() throws IOException
-	{
+	public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
 		// the following two values are known to have a hash-code collision on the initial level.
 		// we use them to make sure one partition grows over-proportionally large
 		final int REPEATED_VALUE_1 = 40559;
@@ -311,9 +310,6 @@ public class ReusingReOpenableHashTableITCase {
 		builds.add(build2);
 		builds.add(build3);
 		MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
-	
-		
-		
 
 		// allocate the memory for the HashTable
 		List<MemorySegment> memSegments;
@@ -333,7 +329,7 @@ public class ReusingReOpenableHashTableITCase {
 		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager);
+				memSegments, ioManager, true);
 		
 		for(int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
@@ -347,9 +343,8 @@ public class ReusingReOpenableHashTableITCase {
 			Record record;
 			final Record recordReuse = new Record();
 
-			while (join.nextRecord())
-			{
-				int numBuildValues = 0;
+			while (join.nextRecord()) {
+				long numBuildValues = 0;
 		
 				final Record probeRec = join.getCurrentProbeRecord();
 				int key = probeRec.getField(0, IntValue.class).getValue();
@@ -369,10 +364,10 @@ public class ReusingReOpenableHashTableITCase {
 				
 				Long contained = map.get(key);
 				if (contained == null) {
-					contained = Long.valueOf(numBuildValues);
+					contained = numBuildValues;
 				}
 				else {
-					contained = Long.valueOf(contained.longValue() + numBuildValues);
+					contained = contained + numBuildValues;
 				}
 				
 				map.put(key, contained);
@@ -449,8 +444,9 @@ public class ReusingReOpenableHashTableITCase {
 		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager);
-		for(int probe = 0; probe < NUM_PROBES; probe++) {
+				memSegments, ioManager, true);
+		
+		for (int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
 			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
 			if(probe == 0) {
@@ -463,7 +459,7 @@ public class ReusingReOpenableHashTableITCase {
 
 			while (join.nextRecord())
 			{	
-				int numBuildValues = 0;
+				long numBuildValues = 0;
 				
 				final Record probeRec = join.getCurrentProbeRecord();
 				int key = probeRec.getField(0, IntValue.class).getValue();
@@ -483,10 +479,10 @@ public class ReusingReOpenableHashTableITCase {
 				
 				Long contained = map.get(key);
 				if (contained == null) {
-					contained = Long.valueOf(numBuildValues);
+					contained = numBuildValues;
 				}
 				else {
-					contained = Long.valueOf(contained.longValue() + numBuildValues);
+					contained = contained + numBuildValues;
 				}
 				
 				map.put(key, contained);
@@ -526,5 +522,4 @@ public class ReusingReOpenableHashTableITCase {
 		}
 		return copy;
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 0aab5fe..642ac7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -69,11 +70,11 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 	private final List<UnilateralSortMerger<Record>> sorters;
 	
 	private final AbstractInvokable owner;
-	
-	private final Configuration config;
-	
+
 	private final TaskConfig taskConfig;
 	
+	private final TaskManagerRuntimeInfo taskManageInfo;
+	
 	protected final long perSortMem;
 
 	protected final double perSortFractionMem;
@@ -111,11 +112,9 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 		this.sorters = new ArrayList<UnilateralSortMerger<Record>>();
 		
 		this.owner = new DummyInvokable();
-		
-		this.config = new Configuration();
-		this.taskConfig = new TaskConfig(this.config);
-
+		this.taskConfig = new TaskConfig(new Configuration());
 		this.executionConfig = executionConfig;
+		this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
 	}
 
 	@Parameterized.Parameters
@@ -279,7 +278,10 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 		return this.taskConfig;
 	}
 
-
+	@Override
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return this.taskManageInfo;
+	}
 
 	@Override
 	public ExecutionConfig getExecutionConfig() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index b71b01e..51c7f93 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.fs.Path;
@@ -43,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.mockito.invocation.InvocationOnMock;
@@ -193,13 +193,8 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public Configuration getTaskManagerConfiguration(){
-		return new UnmodifiableConfiguration(new Configuration());
-	}
-
-	@Override
-	public String getHostname(){
-		return "localhost";
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 1e25bab..20edc20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.PactTaskContext;
 import org.apache.flink.runtime.operators.ResettablePactDriver;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -54,7 +55,9 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
 	
 	protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
 	
-	protected static final int PAGE_SIZE = 32 * 1024; 
+	protected static final int PAGE_SIZE = 32 * 1024;
+
+	private final TaskManagerRuntimeInfo taskManageInfo;
 	
 	private final IOManager ioManager;
 	
@@ -110,6 +113,8 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
 
 		this.executionConfig = executionConfig;
 		this.comparators = new ArrayList<TypeComparator<IN>>(2);
+
+		this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
 	}
 
 	@Parameterized.Parameters
@@ -292,6 +297,11 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactT
 	}
 
 	@Override
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return this.taskManageInfo;
+	}
+	
+	@Override
 	public <X> MutableObjectIterator<X> getInput(int index) {
 		MutableObjectIterator<IN> in = this.input;
 		if (in == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 38d9992..7debb08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -187,7 +187,7 @@ public class HashVsSortMiniBenchmark {
 					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 							this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
-							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
+							this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
 			
 			iterator.open();
 			
@@ -226,7 +226,7 @@ public class HashVsSortMiniBenchmark {
 					new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.serializer1.getSerializer(), this.comparator1, 
 						this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
-						this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE);
+						this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
 			
 			iterator.open();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 44013ef..9091fa7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.JobID;
@@ -45,6 +46,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -86,7 +88,8 @@ public class StreamMockEnvironment implements Environment {
 
 	private final int bufferSize;
 
-	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize,
+									MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.jobConfiguration = jobConfig;
 		this.taskConfiguration = taskConfig;
 		this.inputs = new LinkedList<InputGate>();
@@ -293,13 +296,8 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
-	public Configuration getTaskManagerConfiguration(){
-		return new UnmodifiableConfiguration(new Configuration());
-	}
-
-	@Override
-	public String getHostname(){
-		return "localhost";
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
 	}
 }
 


[5/5] flink git commit: [FLINK-2240] [runtime] Pass flag to configure use of bloom filters through runtime configuration.

Posted by se...@apache.org.
[FLINK-2240] [runtime] Pass flag to configure use of bloom filters through runtime configuration.

Also make sure that most tests run with enabled bloom filters, to increase test coverage.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b73b438
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b73b438
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b73b438

Branch: refs/heads/master
Commit: 0b73b4387a855627209a4dbaef930321a5090594
Parents: 61dcae3
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 6 15:49:07 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 6 18:13:00 2015 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  15 +-
 .../flink/configuration/ConfigConstants.java    |  26 +--
 .../flink/runtime/execution/Environment.java    |  12 +-
 .../AbstractCachedBuildSideJoinDriver.java      |  20 +-
 .../flink/runtime/operators/JoinDriver.java     |  65 +++++--
 .../flink/runtime/operators/PactDriver.java     |  10 +-
 .../runtime/operators/PactTaskContext.java      |   9 +-
 .../runtime/operators/RegularPactTask.java      |  24 ++-
 .../operators/hash/HashMatchIteratorBase.java   |  11 +-
 .../operators/hash/MutableHashTable.java        | 192 ++++---------------
 .../NonReusingBuildFirstHashMatchIterator.java  |   8 +-
 ...ngBuildFirstReOpenableHashMatchIterator.java |  26 ++-
 .../NonReusingBuildSecondHashMatchIterator.java |   8 +-
 ...gBuildSecondReOpenableHashMatchIterator.java |  23 ++-
 .../hash/ReOpenableMutableHashTable.java        |  18 +-
 .../ReusingBuildFirstHashMatchIterator.java     |   8 +-
 ...ngBuildFirstReOpenableHashMatchIterator.java |  22 ++-
 .../ReusingBuildSecondHashMatchIterator.java    |   8 +-
 ...gBuildSecondReOpenableHashMatchIterator.java |  26 ++-
 .../runtime/taskmanager/RuntimeEnvironment.java |  16 +-
 .../operators/drivers/TestTaskContext.java      |   9 +
 .../MutableHashTablePerformanceBenchmark.java   |  10 +-
 .../hash/NonReusingHashMatchIteratorITCase.java |  12 +-
 .../NonReusingReOpenableHashTableITCase.java    |  33 ++--
 .../hash/ReusingHashMatchIteratorITCase.java    |  12 +-
 .../hash/ReusingReOpenableHashTableITCase.java  |  33 ++--
 .../operators/testutils/DriverTestBase.java     |  18 +-
 .../operators/testutils/MockEnvironment.java    |  11 +-
 .../testutils/UnaryOperatorTestBase.java        |  12 +-
 .../operators/util/HashVsSortMiniBenchmark.java |   4 +-
 .../runtime/tasks/StreamMockEnvironment.java    |  14 +-
 31 files changed, 338 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index ba541d4..53b9ae0 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -244,11 +244,6 @@ free for objects created by user-defined functions. (DEFAULT: 0.7)
 This parameter is only evaluated, if `taskmanager.memory.size` is not set.
 - `jobclient.polling.interval`: The interval (in seconds) in which the client
 polls the JobManager for the status of its job (DEFAULT: 2).
-- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and
-fan-out for spilling hash tables. Limits the number of file handles per operator,
-but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).
-- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling
-when this fraction of its memory budget is full (DEFAULT: 0.8).
 - `taskmanager.heartbeat-interval`: The interval in which the TaskManager sends
 heartbeats to the JobManager.
 - `jobmanager.max-heartbeat-delay-before-failure.msecs`: The maximum time that a
@@ -324,6 +319,16 @@ sample exceeds this value (possible because of misconfiguration of the parser),
 the sampling aborts. This value can be overridden for a specific input with the
 input format's parameters (DEFAULT: 2097152 (= 2 MiBytes)).
 
+### Runtime Algorithms
+
+- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and
+fan-out for spilling hash tables. Limits the number of file handles per operator,
+but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128).
+- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling
+when this fraction of its memory budget is full (DEFAULT: 0.8).
+- `taskmanager.runtime.hashjoin-bloom-filters`: If true, the hash join uses bloom filters to pre-filter records against spilled partitions. (DEFAULT: true)
+
+
 ## YARN
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index dad2d99..d145eb2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -172,6 +172,8 @@ public final class ConfigConstants {
 	 */
 	public static final String TASK_MANAGER_MAX_REGISTRATION_DURATION = "taskmanager.maxRegistrationDuration";
 
+	// --------------------------- Runtime Algorithms -------------------------------
+	
 	/**
 	 * Parameter for the maximum fan for out-of-core algorithms.
 	 * Corresponds to the maximum fan-in for merge-sorts and the maximum fan-out
@@ -184,18 +186,17 @@ public final class ConfigConstants {
 	 * sorter will start spilling to disk.
 	 */
 	public static final String DEFAULT_SORT_SPILLING_THRESHOLD_KEY = "taskmanager.runtime.sort-spilling-threshold";
+
+	/**
+	 * Parameter to switch hash join bloom filters for spilled partitions on and off.
+	 */
+	public static final String RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY = "taskmanager.runtime.hashjoin-bloom-filters";
 	
 	/**
 	 * The config parameter defining the timeout for filesystem stream opening.
 	 * A value of 0 indicates infinite waiting.
 	 */
 	public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout";
-	
-	/**
-	 * While spill probe record to disk during probe phase, whether enable bloom filter to filter the probe records
-	 * to minimize the spilled probe records count.
-	 */
-	public static final String HASHJOIN_ENABLE_BLOOMFILTER = "hashjoin.bloomfilter.enabled";
 
 	// ------------------------ YARN Configuration ------------------------
 
@@ -543,6 +544,13 @@ public final class ConfigConstants {
 	 * The default task manager's maximum registration duration
 	 */
 	public static final String DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION = "Inf";
+
+	// ------------------------ Runtime Algorithms ------------------------
+	
+	/**
+	 * Default setting for the switch for hash join bloom filters for spilled partitions.
+	 */
+	public static final boolean DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS = true;
 	
 	/**
 	 * The default value for the maximum spilling fan in/out.
@@ -558,15 +566,9 @@ public final class ConfigConstants {
 	 * The default timeout for filesystem stream opening: infinite (means max long milliseconds).
 	 */
 	public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;
-	
-	/**
-	 * Enable bloom filter for hash join as it promote hash join performance most of the time.
-	 */
-	public static final boolean DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER = true;
 
 	// ------------------------ YARN Configuration ------------------------
 
-
 	/**
 	 * Minimum amount of Heap memory to subtract from the requested TaskManager size.
 	 * We came up with these values experimentally.

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index af29560..c742ce5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
 import java.util.Map;
 import java.util.concurrent.Future;
@@ -72,14 +73,11 @@ public interface Environment {
 	Configuration getTaskConfiguration();
 
 	/**
-	 * @return The task manager configuration
-	 */
-	Configuration getTaskManagerConfiguration();
-
-	/**
-	 * @return Hostname of the task manager
+	 * Gets the task manager info, with configuration and hostname.
+	 * 
+	 * @return The task manager info, with configuration and hostname. 
 	 */
-	String getHostname();
+	TaskManagerRuntimeInfo getTaskManagerInfo();
 
 	/**
 	 * Returns the job-wide configuration object that was attached to the JobGraph.

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index aff8d01..4096f0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator;
@@ -74,7 +75,10 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 				this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
 
 		double availableMemory = config.getRelativeMemoryDriver();
-
+		boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
+				ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
+				ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
+		
 		ExecutionConfig executionConfig = taskContext.getExecutionConfig();
 		objectReuseEnabled = executionConfig.isObjectReuseEnabled();
 
@@ -89,7 +93,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 						this.taskContext.getMemoryManager(),
 						this.taskContext.getIOManager(),
 						this.taskContext.getOwningNepheleTask(),
-						availableMemory);
+						availableMemory,
+						hashJoinUseBitMaps);
 
 
 			} else if (buildSideIndex == 1 && probeSideIndex == 0) {
@@ -102,7 +107,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 						this.taskContext.getMemoryManager(),
 						this.taskContext.getIOManager(),
 						this.taskContext.getOwningNepheleTask(),
-						availableMemory);
+						availableMemory,
+						hashJoinUseBitMaps);
 
 			} else {
 				throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
@@ -118,7 +124,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 						this.taskContext.getMemoryManager(),
 						this.taskContext.getIOManager(),
 						this.taskContext.getOwningNepheleTask(),
-						availableMemory);
+						availableMemory,
+						hashJoinUseBitMaps);
 
 
 			} else if (buildSideIndex == 1 && probeSideIndex == 0) {
@@ -131,7 +138,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 						this.taskContext.getMemoryManager(),
 						this.taskContext.getIOManager(),
 						this.taskContext.getOwningNepheleTask(),
-						availableMemory);
+						availableMemory,
+						hashJoinUseBitMaps);
 
 			} else {
 				throw new Exception("Error: Inconsistent setup for repeatable hash join driver.");
@@ -148,12 +156,10 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo
 
 	@Override
 	public void run() throws Exception {
-
 		final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub();
 		final Collector<OT> collector = this.taskContext.getOutputCollector();
 		
 		while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector));
-			
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index af3da55..5df715f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -19,25 +19,27 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
-import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator;
+import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator;
 import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator;
+import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * The join driver implements the logic of a join operator at runtime. It instantiates either
  * hash or sort-merge based strategies to find joining pairs of records.
@@ -115,19 +117,40 @@ public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Join Driver object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
 		}
+		
+		boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
+				ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
+				ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
 
 		// create and return joining iterator according to provided local strategy.
 		if (objectReuseEnabled) {
 			switch (ls) {
 				case MERGE:
-					this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
-
+					this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, 
+							serializer1, comparator1,
+							serializer2, comparator2,
+							pairComparatorFactory.createComparator12(comparator1, comparator2),
+							memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
 					break;
 				case HYBRIDHASH_BUILD_FIRST:
-					this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+					this.joinIterator = new ReusingBuildFirstHashMatchIterator<>(in1, in2,
+							serializer1, comparator1,
+							serializer2, comparator2,
+							pairComparatorFactory.createComparator21(comparator1, comparator2),
+							memoryManager, ioManager,
+							this.taskContext.getOwningNepheleTask(),
+							fractionAvailableMemory,
+							hashJoinUseBitMaps);
 					break;
 				case HYBRIDHASH_BUILD_SECOND:
-					this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+					this.joinIterator = new ReusingBuildSecondHashMatchIterator<>(in1, in2,
+							serializer1, comparator1,
+							serializer2, comparator2,
+							pairComparatorFactory.createComparator12(comparator1, comparator2),
+							memoryManager, ioManager,
+							this.taskContext.getOwningNepheleTask(),
+							fractionAvailableMemory,
+							hashJoinUseBitMaps);
 					break;
 				default:
 					throw new Exception("Unsupported driver strategy for join driver: " + ls.name());
@@ -135,14 +158,32 @@ public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1
 		} else {
 			switch (ls) {
 				case MERGE:
-					this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
+					this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2,
+							serializer1, comparator1,
+							serializer2, comparator2,
+							pairComparatorFactory.createComparator12(comparator1, comparator2),
+							memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask());
 
 					break;
 				case HYBRIDHASH_BUILD_FIRST:
-					this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator21(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+					this.joinIterator = new NonReusingBuildFirstHashMatchIterator<>(in1, in2,
+							serializer1, comparator1,
+							serializer2, comparator2,
+							pairComparatorFactory.createComparator21(comparator1, comparator2),
+							memoryManager, ioManager,
+							this.taskContext.getOwningNepheleTask(),
+							fractionAvailableMemory,
+							hashJoinUseBitMaps);
 					break;
 				case HYBRIDHASH_BUILD_SECOND:
-					this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, this.taskContext.getOwningNepheleTask(), fractionAvailableMemory);
+					this.joinIterator = new NonReusingBuildSecondHashMatchIterator<>(in1, in2,
+							serializer1, comparator1,
+							serializer2, comparator2,
+							pairComparatorFactory.createComparator12(comparator1, comparator2),
+							memoryManager, ioManager,
+							this.taskContext.getOwningNepheleTask(),
+							fractionAvailableMemory,
+							hashJoinUseBitMaps);
 					break;
 				default:
 					throw new Exception("Unsupported driver strategy for join driver: " + ls.name());

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
index a53f5bf..288f7ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
@@ -16,16 +16,14 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.functions.Function;
 
-
 /**
- * The interface to be implemented by all pact drivers that run alone (or as the primary driver) in a nephele task.
- * The driver is the code that deals with everything that specific to a certain PACT. It implements the actual
- * <i>map</i> or <i>reduce</i> specific code.
+ * The interface to be implemented by all drivers that run alone (or as the primary driver) in a task.
+ * A driver implements the actual code to perform a batch operation, like <i>map()</i>,
+ * <i>reduce()</i>, <i>join()</i>, or <i>coGroup()</i>.
  *
  * @see PactTaskContext
  * 
@@ -37,7 +35,7 @@ public interface PactDriver<S extends Function, OT> {
 	void setup(PactTaskContext<S, OT> context);
 	
 	/**
-	 * Gets the number of inputs (= Nephele Gates and Readers) that the task has.
+	 * Gets the number of inputs that the task has.
 	 * 
 	 * @return The number of inputs.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
index bc23fa3..5c2ed67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
@@ -26,15 +26,14 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
 
 /**
- * A runtime task is the task that is executed by the flink engine inside a task vertex.
- * It typically has a {@link PactDriver}, and optionally multiple chained drivers. In addition, it
- * deals with the runtime setup and teardown and the control-flow logic. The latter appears especially
- * in the case of iterations.
+ * The task context gives a driver (e.g., {@link MapDriver}, or {@link JoinDriver}) access to
+ * the runtime components and configuration that they can use to fulfil their task.
  *
  * @param <S> The UDF type.
  * @param <OT> The produced data type.
@@ -44,6 +43,8 @@ import org.apache.flink.util.MutableObjectIterator;
 public interface PactTaskContext<S, OT> {
 	
 	TaskConfig getTaskConfig();
+	
+	TaskManagerRuntimeInfo getTaskManagerInfo();
 
 	ClassLoader getUserCodeClassLoader();
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 78bf383..873d948 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -60,6 +60,7 @@ import org.apache.flink.runtime.operators.util.ReaderIterator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
@@ -660,7 +661,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 */
 	protected void initInputReaders() throws Exception {
 		final int numInputs = getNumTaskInputs();
-		final MutableReader<?>[] inputReaders = new MutableReader[numInputs];
+		final MutableReader<?>[] inputReaders = new MutableReader<?>[numInputs];
 
 		int currentReaderOffset = 0;
 
@@ -705,7 +706,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 */
 	protected void initBroadcastInputReaders() throws Exception {
 		final int numBroadcastInputs = this.config.getNumBroadcastInputs();
-		final MutableReader<?>[] broadcastInputReaders = new MutableReader[numBroadcastInputs];
+		final MutableReader<?>[] broadcastInputReaders = new MutableReader<?>[numBroadcastInputs];
 
 		int currentReaderOffset = config.getNumInputs();
 
@@ -737,8 +738,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 */
 	protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
 		this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
-		this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null;
-		this.inputIterators = new MutableObjectIterator[numInputs];
+		this.inputComparators = numComparators > 0 ? new TypeComparator<?>[numComparators] : null;
+		this.inputIterators = new MutableObjectIterator<?>[numInputs];
 
 		ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 		
@@ -764,7 +765,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 * Creates all the serializers and iterators for the broadcast inputs.
 	 */
 	protected void initBroadcastInputsSerializers(int numBroadcastInputs) throws Exception {
-		this.broadcastInputSerializers = new TypeSerializerFactory[numBroadcastInputs];
+		this.broadcastInputSerializers = new TypeSerializerFactory<?>[numBroadcastInputs];
 
 		ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 
@@ -787,8 +788,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		final MemoryManager memMan = getMemoryManager();
 		final IOManager ioMan = getIOManager();
 
-		this.localStrategies = new CloseableInputProvider[numInputs];
-		this.inputs = new MutableObjectIterator[numInputs];
+		this.localStrategies = new CloseableInputProvider<?>[numInputs];
+		this.inputs = new MutableObjectIterator<?>[numInputs];
 		this.excludeFromReset = new boolean[numInputs];
 		this.inputIsCached = new boolean[numInputs];
 		this.inputIsAsyncMaterialized = new boolean[numInputs];
@@ -807,8 +808,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		// acts as a pipeline breaker. this one should only be there, if a pipeline breaker is needed.
 		// the second variant spills to the side and will not read unless the result is also consumed
 		// in a pipelined fashion.
-		this.resettableInputs = new SpillingResettableMutableObjectIterator[numInputs];
-		this.tempBarriers = new TempBarrier[numInputs];
+		this.resettableInputs = new SpillingResettableMutableObjectIterator<?>[numInputs];
+		this.tempBarriers = new TempBarrier<?>[numInputs];
 
 		for (int i = 0; i < numInputs; i++) {
 			final int memoryPages;
@@ -1044,6 +1045,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	}
 
 	@Override
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return getEnvironment().getTaskManagerInfo();
+	}
+
+	@Override
 	public MemoryManager getMemoryManager() {
 		return getEnvironment().getMemoryManager();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
index 4e0112a..08b6191 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorBase.java
@@ -32,6 +32,7 @@ import java.util.List;
  * Common methods for all Hash Join Iterators.
  */
 public class HashMatchIteratorBase {
+	
 	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
 			TypeSerializer<BT> buildSideSerializer,
 			TypeComparator<BT> buildSideComparator,
@@ -41,11 +42,15 @@ public class HashMatchIteratorBase {
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction) throws MemoryAllocationException {
+			double memoryFraction,
+			boolean useBloomFilters) throws MemoryAllocationException {
 
 		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+		
+		return new MutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager,
+				useBloomFilters);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 4a57986..b0042fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -24,10 +24,9 @@ import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -46,22 +45,16 @@ import org.apache.flink.runtime.operators.util.BloomFilter;
 import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
-
 /**
  * An implementation of a Hybrid Hash Join. The join starts operating in memory and gradually starts
  * spilling contents to disk, when the memory is not sufficient. It does not need to know a priori 
  * how large the input will be.
- * <p>
- * The design of this class follows on many parts the design presented in
- * "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al. In its current state, the
- * implementation lacks features like dynamic role reversal, partition tuning, or histogram guided partitioning. 
- *<p>
- *
- *
- * <hr>
  * 
- * The layout of the buckets inside a memory segment is as follows:
+ * <p>The design of this class follows on many parts the design presented in
+ * "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al. In its current state, the
+ * implementation lacks features like dynamic role reversal, partition tuning, or histogram guided partitioning.</p>
  * 
+ * <p>The layout of the buckets inside a memory segment is as follows:</p>
  * <pre>
  * +----------------------------- Bucket x ----------------------------
  * |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
@@ -189,8 +182,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 */
 	private static final long BUCKET_FORWARD_POINTER_NOT_SET = ~0x0L;
 	
-//	private static final byte BUCKET_STATUS_SPILLED = 1;
-	
 	/**
 	 * Constant for the bucket status, indicating that the bucket is in memory.
 	 */
@@ -274,11 +265,14 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 */
 	protected final int bucketsPerSegmentBits;
 	
-	/**
+	/** 
 	 * An estimate for the average record length.
 	 */
 	private final int avgRecordLen;
 	
+	/** Flag to enable/disable bloom filters for spilled partitions */
+	private final boolean useBloomFilters;
+	
 	// ------------------------------------------------------------------------
 	
 	/**
@@ -296,8 +290,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 */
 	private HashBucketIterator<BT, PT> bucketIterator;
 	
-//	private LazyHashBucketIterator<BT, PT> lazyBucketIterator;
-	
 	/**
 	 * Iterator over the elements from the probe side.
 	 */
@@ -319,6 +311,10 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 * of hash-codes and pointers to the elements.
 	 */
 	protected MemorySegment[] buckets;
+
+	/** The bloom filter utility used to transform hash buckets of spilled partitions into a
+	 * probabilistic filter */
+	private BloomFilter bloomFilter;
 	
 	/**
 	 * The number of buckets in the current table. The bucket array is not necessarily fully
@@ -353,25 +349,35 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	protected boolean furtherPartitioning = false;
 	
 	private boolean running = true;
-
-	private BloomFilter bloomFilter;
 	
 	// ------------------------------------------------------------------------
 	//                         Construction and Teardown
 	// ------------------------------------------------------------------------
+
+	public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer,
+							TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator,
+							TypePairComparator<PT, BT> comparator,
+							List<MemorySegment> memorySegments, IOManager ioManager)
+	{
+		this(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, comparator,
+				memorySegments, ioManager, true);
+	}
 	
 	public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer,
 			TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator,
-			TypePairComparator<PT, BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager)
+			TypePairComparator<PT, BT> comparator,
+			List<MemorySegment> memorySegments,
+			IOManager ioManager,
+			boolean useBloomFilters)
 	{
 		this(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, comparator,
-			memorySegments, ioManager, DEFAULT_RECORD_LEN);
+			memorySegments, ioManager, DEFAULT_RECORD_LEN, useBloomFilters);
 	}
 	
 	public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer,
 			TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> comparator, List<MemorySegment> memorySegments,
-			IOManager ioManager, int avgRecordLen)
+			IOManager ioManager, int avgRecordLen, boolean useBloomFilters)
 	{
 		// some sanity checks first
 		if (memorySegments == null) {
@@ -390,6 +396,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		this.recordComparator = comparator;
 		this.availableMemory = memorySegments;
 		this.ioManager = ioManager;
+		this.useBloomFilters = useBloomFilters;
 		
 		this.avgRecordLen = avgRecordLen > 0 ? avgRecordLen : 
 				buildSideSerializer.getLength() == -1 ? DEFAULT_RECORD_LEN : buildSideSerializer.getLength();
@@ -551,16 +558,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	}
 	
 	public boolean nextRecord() throws IOException {
-		
+
 		final boolean probeProcessing = processProbeIter();
-		if(probeProcessing) {
-			return true;
-		}
-		return prepareNextPartition();
+		return probeProcessing || prepareNextPartition();
 	}
 	
-	public HashBucketIterator<BT, PT> getMatchesFor(PT record) throws IOException
-	{
+	public HashBucketIterator<BT, PT> getMatchesFor(PT record) throws IOException {
 		final TypeComparator<PT> probeAccessors = this.probeSideComparator;
 		final int hash = hash(probeAccessors.hash(record), this.currentRecursionDepth);
 		final int posHashCode = hash % this.numBuckets;
@@ -585,32 +588,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 	}
 	
-//	public LazyHashBucketIterator<BT, PT> getLazyMatchesFor(PT record) throws IOException
-//	{
-//		final TypeComparator<PT> probeAccessors = this.probeSideComparator;
-//		final int hash = hash(probeAccessors.hash(record), this.currentRecursionDepth);
-//		final int posHashCode = hash % this.numBuckets;
-//		
-//		// get the bucket for the given hash code
-//		final int bucketArrayPos = posHashCode >> this.bucketsPerSegmentBits;
-//		final int bucketInSegmentOffset = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
-//		final MemorySegment bucket = this.buckets[bucketArrayPos];
-//		
-//		// get the basic characteristics of the bucket
-//		final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
-//		final HashPartition<BT, PT> p = this.partitionsBeingBuilt.get(partitionNumber);
-//		
-//		// for an in-memory partition, process set the return iterators, else spill the probe records
-//		if (p.isInMemory()) {
-//			this.recordComparator.setReference(record);
-//			this.lazyBucketIterator.set(bucket, p.overflowSegments, p, hash, bucketInSegmentOffset);
-//			return this.lazyBucketIterator;
-//		}
-//		else {
-//			throw new IllegalStateException("Method is not applicable to partially spilled hash tables.");
-//		}
-//	}
-	
 	public PT getCurrentProbeRecord() {
 		return this.probeIterator.getCurrent();
 	}
@@ -739,7 +716,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 	}
 
-	final private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
+	private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
 		final long totalSize = ((long) bufferSize) * numBuffers;
 		final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
 		final long maxNumRecordsStorable = (MAX_RECURSION_DEPTH + 1) * numRecordsStorable;
@@ -1092,9 +1069,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		this.buckets = table;
 		this.numBuckets = numBuckets;
 		
-		boolean enableBloomFilter = GlobalConfiguration.getBoolean(
-			ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
-		if (enableBloomFilter) {
+		if (useBloomFilters) {
 			initBloomFilter(numBuckets);
 		}
 	}
@@ -1107,8 +1082,8 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		this.numBuckets = 0;
 		
 		if (this.buckets != null) {
-			for (int i = 0; i < this.buckets.length; i++) {
-				this.availableMemory.add(this.buckets[i]);
+			for (MemorySegment bucket : this.buckets) {
+				this.availableMemory.add(bucket);
 			}
 			this.buckets = null;
 		}
@@ -1138,9 +1113,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 		final HashPartition<BT, PT> p = partitions.get(largestPartNum);
 		
-		boolean enableBloomFilter = GlobalConfiguration.getBoolean(
-			ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
-		if (enableBloomFilter) {
+		if (useBloomFilters) {
 			buildBloomFilterForBucketsInPartition(largestPartNum, p);
 		}
 		
@@ -1196,7 +1169,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		buildBloomFilterForExtraOverflowSegments(bucketInSegmentPos, bucket, p);
 	}
 	
-	final private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
+	private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
 		int totalCount = 0;
 		boolean skip = false;
 		long forwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
@@ -1207,7 +1180,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 				break;
 			}
 			MemorySegment overflowSegment = p.overflowSegments[overflowSegNum];
-			int bucketInOverflowSegmentOffset = (int) (forwardPointer & 0xffffffff);
+			int bucketInOverflowSegmentOffset = (int) forwardPointer;
 			
 			final int count = overflowSegment.getShort(bucketInOverflowSegmentOffset + HEADER_COUNT_OFFSET);
 			totalCount += count;
@@ -1587,93 +1560,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 
 	} // end HashBucketIterator
-	
-
-	// ======================================================================================================
-	
-//	public static final class LazyHashBucketIterator<BT, PT> {
-//		
-//		private final TypePairComparator<PT, BT> comparator;
-//		
-//		private MemorySegment bucket;
-//		
-//		private MemorySegment[] overflowSegments;
-//		
-//		private HashPartition<BT, PT> partition;
-//		
-//		private int bucketInSegmentOffset;
-//		
-//		private int searchHashCode;
-//		
-//		private int posInSegment;
-//		
-//		private int countInSegment;
-//		
-//		private int numInSegment;
-//		
-//		private LazyHashBucketIterator(TypePairComparator<PT, BT> comparator) {
-//			this.comparator = comparator;
-//		}
-//		
-//		
-//		void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition<BT, PT> partition,
-//				int searchHashCode, int bucketInSegmentOffset) {
-//			
-//			this.bucket = bucket;
-//			this.overflowSegments = overflowSegments;
-//			this.partition = partition;
-//			this.searchHashCode = searchHashCode;
-//			this.bucketInSegmentOffset = bucketInSegmentOffset;
-//			
-//			this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
-//			this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
-//			this.numInSegment = 0;
-//		}
-//
-//		public boolean next(BT target) {
-//			// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
-//			while (true) {
-//				
-//				while (this.numInSegment < this.countInSegment) {
-//					
-//					final int thisCode = this.bucket.getInt(this.posInSegment);
-//					this.posInSegment += HASH_CODE_LEN;
-//						
-//					// check if the hash code matches
-//					if (thisCode == this.searchHashCode) {
-//						// get the pointer to the pair
-//						final long pointer = this.bucket.getLong(this.bucketInSegmentOffset + 
-//													BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN));
-//						this.numInSegment++;
-//							
-//						// check whether it is really equal, or whether we had only a hash collision
-//						LazyDeSerializable lds = (LazyDeSerializable) target;
-//						lds.setDeSerializer(this.partition, this.partition.getWriteView(), pointer);
-//						if (this.comparator.equalToReference(target)) {
-//							return true;
-//						}
-//					}
-//					else {
-//						this.numInSegment++;
-//					}
-//				}
-//				
-//				// this segment is done. check if there is another chained bucket
-//				final long forwardPointer = this.bucket.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
-//				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
-//					return false;
-//				}
-//				
-//				final int overflowSegNum = (int) (forwardPointer >>> 32);
-//				this.bucket = this.overflowSegments[overflowSegNum];
-//				this.bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
-//				this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
-//				this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
-//				this.numInSegment = 0;
-//			}
-//		}
-//	} 
-	
 
 	// ======================================================================================================
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
index c2d7805..5000dab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashMatchIterator.java
@@ -67,16 +67,16 @@ public class NonReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchI
 			TypePairComparator<V2, V1> pairComparator,
 			MemoryManager memManager, IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-	throws MemoryAllocationException
-	{		
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		this.memManager = memManager;
 		this.firstInput = firstInput;
 		this.secondInput = secondInput;
 		this.probeSideSerializer = serializer2;
 
 		this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2,
-				pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+				pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
index ee870a6..af1626a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashMatchIterator.java
@@ -48,25 +48,32 @@ public class NonReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-		throws MemoryAllocationException
-	{
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
 				comparator2, pairComparator, memManager, ioManager, ownerTask,
-				memoryFraction);
+				memoryFraction, useBitmapFilters);
+		
 		reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
 	}
 
 	@Override
-	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+			TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
 			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
-	throws MemoryAllocationException
-	{
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+		
+		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager, useBitmapFilters);
 	}
 
 	/**
@@ -76,5 +83,4 @@ public class NonReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends
 	public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
 		reopenHashTable.reopenProbe(probeInput);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
index 6099ac7..83952c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashMatchIterator.java
@@ -66,16 +66,16 @@ public class NonReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatch
 			TypePairComparator<V1, V2> pairComparator,
 			MemoryManager memManager, IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-	throws MemoryAllocationException
-	{		
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		this.memManager = memManager;
 		this.firstInput = firstInput;
 		this.secondInput = secondInput;
 		this.probeSideSerializer = serializer1;
 		
 		this.hashJoin = getHashJoin(serializer2, comparator2, serializer1,
-				comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+				comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
index bc7e65b..029be5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashMatchIterator.java
@@ -48,11 +48,12 @@ public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-		throws MemoryAllocationException
-	{
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
-				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+		
 		reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
 	}
 
@@ -62,12 +63,17 @@ public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends
 			TypeComparator<BT> buildSideComparator,
 			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
-	throws MemoryAllocationException
-	{
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+		
+		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager, useBitmapFilters);
 	}
 
 	/**
@@ -77,5 +83,4 @@ public class NonReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends
 	public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
 		reopenHashTable.reopenProbe(probeInput);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
index 6819924..fd5fcde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
@@ -33,17 +33,12 @@ import org.apache.flink.util.MutableObjectIterator;
 
 public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT> {
 
-	/**
-	 * Channel for the spilled partitions
-	 */
+	/** Channel for the spilled partitions */
 	private final FileIOChannel.Enumerator spilledInMemoryPartitions;
 	
-	/**
-	 * Stores the initial partitions and a list of the files that contain the spilled contents
-	 */
+	/** Stores the initial partitions and a list of the files that contain the spilled contents */
 	private List<HashPartition<BT, PT>> initialPartitions;
 	
-
 	/**
 	 * The values of these variables are stored here after the initial open()
 	 * Required to restore the initial state before each additional probe phase.
@@ -58,16 +53,17 @@ public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT>
 			TypeComparator<BT> buildSideComparator,
 			TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> comparator,
-			List<MemorySegment> memorySegments, IOManager ioManager) {
+			List<MemorySegment> memorySegments, IOManager ioManager,
+			boolean useBitmapFilters) {
+		
 		super(buildSideSerializer, probeSideSerializer, buildSideComparator,
-				probeSideComparator, comparator, memorySegments, ioManager);
+				probeSideComparator, comparator, memorySegments, ioManager, useBitmapFilters);
 		keepBuildSidePartitions = true;
 		spilledInMemoryPartitions = ioManager.createChannelEnumerator();
 	}
 	
 	@Override
-	public void open(MutableObjectIterator<BT> buildSide,
-			MutableObjectIterator<PT> probeSide) throws IOException {
+	public void open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide) throws IOException {
 		super.open(buildSide, probeSide);
 		initialPartitions = new ArrayList<HashPartition<BT, PT>>( partitionsBeingBuilt );
 		initialPartitionFanOut = (byte) partitionsBeingBuilt.size();

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
index da76045..b4aaa95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashMatchIterator.java
@@ -71,9 +71,9 @@ public class ReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchIter
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-	throws MemoryAllocationException
-	{		
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		this.memManager = memManager;
 		this.firstInput = firstInput;
 		this.secondInput = secondInput;
@@ -83,7 +83,7 @@ public class ReusingBuildFirstHashMatchIterator<V1, V2, O> extends HashMatchIter
 		this.tempBuildSideRecord = serializer1.createInstance();
 
 		this.hashJoin = getHashJoin(serializer1, comparator1, serializer2,
-				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
index 5501271..714a1f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashMatchIterator.java
@@ -48,25 +48,32 @@ public class ReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends Reu
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
+			double memoryFraction,
+			boolean useBitmapFilters)
 		throws MemoryAllocationException
 	{
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
 				comparator2, pairComparator, memManager, ioManager, ownerTask,
-				memoryFraction);
+				memoryFraction, useBitmapFilters);
 		reopenHashTable = (ReOpenableMutableHashTable<V1, V2>) hashJoin;
 	}
 
 	@Override
-	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+			TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
 			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
-	throws MemoryAllocationException
-	{
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+		
+		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager, useBitmapFilters);
 	}
 	
 	/**
@@ -76,5 +83,4 @@ public class ReusingBuildFirstReOpenableHashMatchIterator<V1, V2, O> extends Reu
 	public void reopenProbe(MutableObjectIterator<V2> probeInput) throws IOException {
 		reopenHashTable.reopenProbe(probeInput);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
index a9435ef..b7c3e29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashMatchIterator.java
@@ -71,9 +71,9 @@ public class ReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatchIte
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-	throws MemoryAllocationException
-	{		
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		this.memManager = memManager;
 		this.firstInput = firstInput;
 		this.secondInput = secondInput;
@@ -83,7 +83,7 @@ public class ReusingBuildSecondHashMatchIterator<V1, V2, O> extends HashMatchIte
 		this.tempBuildSideRecord = serializer2.createInstance();
 
 		this.hashJoin = getHashJoin(serializer2, comparator2, serializer1, comparator1, pairComparator,
-			memManager, ioManager, ownerTask, memoryFraction);
+			memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
index 559d20a..4b4cdf5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashMatchIterator.java
@@ -48,24 +48,31 @@ public class ReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends Re
 			MemoryManager memManager,
 			IOManager ioManager,
 			AbstractInvokable ownerTask,
-			double memoryFraction)
-		throws MemoryAllocationException
-	{
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		super(firstInput, secondInput, serializer1, comparator1, serializer2,
-				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction);
+				comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters);
+		
 		reopenHashTable = (ReOpenableMutableHashTable<V2, V1>) hashJoin;
 	}
 
 	@Override
-	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
+	public <BT, PT> MutableHashTable<BT, PT> getHashJoin(
+			TypeSerializer<BT> buildSideSerializer, TypeComparator<BT> buildSideComparator,
 			TypeSerializer<PT> probeSideSerializer, TypeComparator<PT> probeSideComparator,
 			TypePairComparator<PT, BT> pairComparator,
-			MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction)
-	throws MemoryAllocationException
-	{
+			MemoryManager memManager, IOManager ioManager,
+			AbstractInvokable ownerTask,
+			double memoryFraction,
+			boolean useBitmapFilters) throws MemoryAllocationException {
+		
 		final int numPages = memManager.computeNumberOfPages(memoryFraction);
 		final List<MemorySegment> memorySegments = memManager.allocatePages(ownerTask, numPages);
-		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer, buildSideComparator, probeSideComparator, pairComparator, memorySegments, ioManager);
+		
+		return new ReOpenableMutableHashTable<BT, PT>(buildSideSerializer, probeSideSerializer,
+				buildSideComparator, probeSideComparator, pairComparator,
+				memorySegments, ioManager, useBitmapFilters);
 	}
 	
 	/**
@@ -75,5 +82,4 @@ public class ReusingBuildSecondReOpenableHashMatchIterator<V1, V2, O> extends Re
 	public void reopenProbe(MutableObjectIterator<V1> probeInput) throws IOException {
 		reopenHashTable.reopenProbe(probeInput);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index cd6dbd6..8cfc1c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -75,9 +75,7 @@ public class RuntimeEnvironment implements Environment {
 
 	private final AccumulatorRegistry accumulatorRegistry;
 
-	private final Configuration taskManagerConfiguration;
-
-	private final String hostname;
+	private final TaskManagerRuntimeInfo taskManagerInfo;
 
 	// ------------------------------------------------------------------------
 
@@ -124,8 +122,7 @@ public class RuntimeEnvironment implements Environment {
 		this.writers = checkNotNull(writers);
 		this.inputGates = checkNotNull(inputGates);
 		this.jobManager = checkNotNull(jobManager);
-		this.taskManagerConfiguration = checkNotNull(taskManagerInfo).getConfiguration();
-		this.hostname = taskManagerInfo.getHostname();
+		this.taskManagerInfo = checkNotNull(taskManagerInfo);
 	}
 
 	// ------------------------------------------------------------------------
@@ -176,13 +173,8 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
-	public Configuration getTaskManagerConfiguration(){
-		return taskManagerConfiguration;
-	}
-
-	@Override
-	public String getHostname(){
-		return hostname;
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return taskManagerInfo;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index b1466c9..30e417b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.PactTaskContext;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -62,6 +63,8 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
 
 	private ExecutionConfig executionConfig = new ExecutionConfig();
 
+	private TaskManagerRuntimeInfo taskManageInfo;
+
 	// --------------------------------------------------------------------------------------------
 	//  Constructors
 	// --------------------------------------------------------------------------------------------
@@ -70,6 +73,7 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
 	
 	public TestTaskContext(long memoryInBytes) {
 		this.memoryManager = new DefaultMemoryManager(memoryInBytes,1 ,32 * 1024, true);
+		this.taskManageInfo = new TaskManagerRuntimeInfo("localhost", new Configuration());
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -156,6 +160,11 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
 	}
 
 	@Override
+	public TaskManagerRuntimeInfo getTaskManagerInfo() {
+		return this.taskManageInfo;
+	}
+
+	@Override
 	@SuppressWarnings("unchecked")
 	public <X> MutableObjectIterator<X> getInput(int index) {
 		switch (index) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
index 452e4c1..c0f8f59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
@@ -24,9 +24,6 @@ import java.util.List;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -40,6 +37,7 @@ import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -192,10 +190,6 @@ public class MutableHashTablePerformanceBenchmark {
 		InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
 		InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
 		
-		Configuration conf = new Configuration();
-		conf.setBoolean(ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, enableBloomFilter);
-		GlobalConfiguration.includeConfiguration(conf);
-		
 		// allocate the memory for the HashTable
 		List<MemorySegment> memSegments;
 		try {
@@ -212,7 +206,7 @@ public class MutableHashTablePerformanceBenchmark {
 		final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
 			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
 			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
-			memSegments, ioManager);
+			memSegments, ioManager, enableBloomFilter);
 		join.open(buildIterator, probeIterator);
 		
 		final StringPair recordReuse = new StringPair();

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
index f4d2251..0d5a26e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
@@ -155,7 +155,7 @@ public class NonReusingHashMatchIteratorITCase {
 					new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, 1.0);
+						this.memoryManager, ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			
@@ -242,7 +242,7 @@ public class NonReusingHashMatchIteratorITCase {
 					new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, 1.0);
+						this.memoryManager, ioManager, this.parentTask, 1.0, true);
 
 			iterator.open();
 			
@@ -291,7 +291,7 @@ public class NonReusingHashMatchIteratorITCase {
 				new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
+					this.memoryManager, ioManager, this.parentTask, 1.0, true);
 
 			iterator.open();
 			
@@ -378,7 +378,7 @@ public class NonReusingHashMatchIteratorITCase {
 				new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
+					this.memoryManager, ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			
@@ -425,7 +425,7 @@ public class NonReusingHashMatchIteratorITCase {
 					new NonReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
 						input1, input2, this.pairSerializer, this.pairComparator,
 						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, 1.0);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			
@@ -472,7 +472,7 @@ public class NonReusingHashMatchIteratorITCase {
 					new NonReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
 						input1, input2, this.pairSerializer, this.pairComparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, 1.0);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index f5105bb..306a370 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -239,7 +239,7 @@ public class NonReusingReOpenableHashTableITCase {
 				new NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
 						buildInput, probeInput, this.recordSerializer, this.record1Comparator,
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
+					this.memoryManager, ioManager, this.parentTask, 1.0, true);
 
 		iterator.open();
 		// do first join with both inputs
@@ -277,7 +277,7 @@ public class NonReusingReOpenableHashTableITCase {
 	//
 	//
 
-	private final MutableObjectIterator<Record> getProbeInput(final int numKeys,
+	private MutableObjectIterator<Record> getProbeInput(final int numKeys,
 			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
 		MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
 		MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
@@ -334,9 +334,9 @@ public class NonReusingReOpenableHashTableITCase {
 		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager);
+				memSegments, ioManager, true);
 
-		for(int probe = 0; probe < NUM_PROBES; probe++) {
+		for (int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
 			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
 			if(probe == 0) {
@@ -348,9 +348,8 @@ public class NonReusingReOpenableHashTableITCase {
 			Record record;
 			final Record recordReuse = new Record();
 
-			while (join.nextRecord())
-			{
-				int numBuildValues = 0;
+			while (join.nextRecord()) {
+				long numBuildValues = 0;
 
 				final Record probeRec = join.getCurrentProbeRecord();
 				int key = probeRec.getField(0, IntValue.class).getValue();
@@ -370,10 +369,10 @@ public class NonReusingReOpenableHashTableITCase {
 
 				Long contained = map.get(key);
 				if (contained == null) {
-					contained = Long.valueOf(numBuildValues);
+					contained = numBuildValues;
 				}
 				else {
-					contained = Long.valueOf(contained.longValue() + numBuildValues);
+					contained = contained + numBuildValues;
 				}
 
 				map.put(key, contained);
@@ -450,11 +449,12 @@ public class NonReusingReOpenableHashTableITCase {
 		final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
 				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
 				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager);
-		for(int probe = 0; probe < NUM_PROBES; probe++) {
+				memSegments, ioManager, true);
+		
+		for (int probe = 0; probe < NUM_PROBES; probe++) {
 			// create a probe input that gives 10 million pairs with 10 values sharing a key
 			MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
-			if(probe == 0) {
+			if (probe == 0) {
 				join.open(buildInput, probeInput);
 			} else {
 				join.reopenProbe(probeInput);
@@ -462,9 +462,8 @@ public class NonReusingReOpenableHashTableITCase {
 			Record record;
 			final Record recordReuse = new Record();
 
-			while (join.nextRecord())
-			{
-				int numBuildValues = 0;
+			while (join.nextRecord()) {
+				long numBuildValues = 0;
 
 				final Record probeRec = join.getCurrentProbeRecord();
 				int key = probeRec.getField(0, IntValue.class).getValue();
@@ -484,10 +483,10 @@ public class NonReusingReOpenableHashTableITCase {
 
 				Long contained = map.get(key);
 				if (contained == null) {
-					contained = Long.valueOf(numBuildValues);
+					contained = numBuildValues;
 				}
 				else {
-					contained = Long.valueOf(contained.longValue() + numBuildValues);
+					contained = contained + numBuildValues;
 				}
 
 				map.put(key, contained);

http://git-wip-us.apache.org/repos/asf/flink/blob/0b73b438/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
index 18cd8d0..f770ca4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
@@ -155,7 +155,7 @@ public class ReusingHashMatchIteratorITCase {
 					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, 1.0);
+						this.memoryManager, ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			
@@ -242,7 +242,7 @@ public class ReusingHashMatchIteratorITCase {
 					new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
 						input1, input2, this.recordSerializer, this.record1Comparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-						this.memoryManager, ioManager, this.parentTask, 1.0);
+						this.memoryManager, ioManager, this.parentTask, 1.0, true);
 
 			iterator.open();
 			
@@ -291,7 +291,7 @@ public class ReusingHashMatchIteratorITCase {
 				new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
+					this.memoryManager, ioManager, this.parentTask, 1.0, true);
 
 			iterator.open();
 			
@@ -378,7 +378,7 @@ public class ReusingHashMatchIteratorITCase {
 				new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
 					input1, input2, this.recordSerializer, this.record1Comparator, 
 					this.recordSerializer, this.record2Comparator, this.recordPairComparator,
-					this.memoryManager, ioManager, this.parentTask, 1.0);
+					this.memoryManager, ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			
@@ -425,7 +425,7 @@ public class ReusingHashMatchIteratorITCase {
 					new ReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
 						input1, input2, this.pairSerializer, this.pairComparator,
 						this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, 1.0);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();
 			
@@ -472,7 +472,7 @@ public class ReusingHashMatchIteratorITCase {
 					new ReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
 						input1, input2, this.pairSerializer, this.pairComparator, 
 						this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
-						this.memoryManager, this.ioManager, this.parentTask, 1.0);
+						this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
 			
 			iterator.open();