You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/26 22:50:49 UTC

[6/8] flink git commit: [FLINK-2853] Port MutableHashTablePerformanceBenchmark to JMH.

[FLINK-2853] Port MutableHashTablePerformanceBenchmark to JMH.

This closes #1267


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5ee55bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5ee55bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5ee55bd

Branch: refs/heads/master
Commit: e5ee55bdaf0dcd6e3dc38b52964a8570b22dd671
Parents: 75a5257
Author: gallenvara <ga...@126.com>
Authored: Fri Oct 16 16:48:30 2015 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Oct 26 20:05:03 2015 +0100

----------------------------------------------------------------------
 flink-benchmark/pom.xml                         |  11 +-
 .../MutableHashTablePerformanceBenchmark.java   | 359 +++++++++++++++++++
 .../MutableHashTablePerformanceBenchmark.java   | 262 --------------
 3 files changed, 368 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e5ee55bd/flink-benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/flink-benchmark/pom.xml b/flink-benchmark/pom.xml
index d6ba1d7..b22e0d1 100644
--- a/flink-benchmark/pom.xml
+++ b/flink-benchmark/pom.xml
@@ -56,14 +56,21 @@ under the License.
       <artifactId>jmh-generator-annprocess</artifactId>
       <version>${jmh.version}</version>
       <scope>provided</scope>
+    </dependency>  
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-runtime</artifactId>
+      <version>0.10-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-core</artifactId>
+      <artifactId>flink-java</artifactId>
       <version>${project.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
-    </dependency>  
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ee55bd/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
new file mode 100644
index 0000000..186c595
--- /dev/null
+++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.hash.MutableHashTable;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.types.StringPair;
+import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import static org.junit.Assert.fail;
+
+@State(Scope.Thread)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class MutableHashTablePerformanceBenchmark {
+	
+	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+	
+	private MemoryManager memManager;
+	private IOManager ioManager;
+	
+	private TypeSerializer<StringPair> pairBuildSideAccesssor;
+	private TypeSerializer<StringPair> pairProbeSideAccesssor;
+	private TypeComparator<StringPair> pairBuildSideComparator;
+	private TypeComparator<StringPair> pairProbeSideComparator;
+	private TypePairComparator<StringPair, StringPair> pairComparator;
+	
+	private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char.";
+	
+	@Setup
+	public void setup() {
+		this.pairBuildSideAccesssor = new StringPairSerializer();
+		this.pairProbeSideAccesssor = new StringPairSerializer();
+		this.pairBuildSideComparator = new StringPairComparator();
+		this.pairProbeSideComparator = new StringPairComparator();
+		this.pairComparator = new StringPairPairComparator();
+		
+		this.memManager = new MemoryManager(64 * 1024 * 1024, 1);
+		this.ioManager = new IOManagerAsync();
+	}
+	
+	@TearDown
+	public void tearDown() {
+		// shut down I/O manager and Memory Manager and verify the correct shutdown
+		this.ioManager.shutdown();
+		if (!this.ioManager.isProperlyShutDown()) {
+			fail("I/O manager was not property shut down.");
+		}
+		if (!this.memManager.verifyEmpty()) {
+			fail("Not all memory was properly released to the memory manager --> Memory Leak.");
+		}
+	}
+	
+	@Benchmark
+	public void compareMutableHashTableWithBloomFilter1() throws IOException {
+		// ----------------------------------------------90% filtered during probe spill phase-----------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 10;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = 500000;
+		
+		this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+		
+		System.out.println("HybridHashJoin1:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+	}
+	
+	@Benchmark
+	public void compareMutableHashTableWithoutBloomFilter1() throws IOException {
+		// ----------------------------------------------90% filtered during probe spill phase-----------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 10;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = 500000;
+		
+		this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+		
+		System.out.println("HybridHashJoin1:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+	}
+	
+	@Benchmark
+	public void compareMutableHashTableWithBloomFilter2() throws IOException {
+		// ----------------------------------------------80% filtered during probe spill phase-----------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 5;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = 1000000;
+		
+		this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+		
+		System.out.println("HybridHashJoin2:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+	}
+	
+	@Benchmark
+	public void compareMutableHashTableWithoutBloomFilter2() throws IOException {
+		// ----------------------------------------------80% filtered during probe spill phase-----------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 5;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = 1000000;
+		
+		this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+		
+		System.out.println("HybridHashJoin2:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+	}
+	
+	@Benchmark
+	public void compareMutableHashTableWithBloomFilter3() throws IOException {
+		// ----------------------------------------------50% filtered during probe spill phase-------------------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 2;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = 2500000;
+		
+		this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+		
+		System.out.println("HybridHashJoin3:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+	}
+	
+	@Benchmark
+	public void compareMutableHashTableWithoutBloomFilter3() throws IOException {
+		// ----------------------------------------------50% filtered during probe spill phase-------------------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 2;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = 2500000;
+		
+		this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+		
+		System.out.println("HybridHashJoin3:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+	}
+	
+	@Benchmark
+	public void compareMutableHashTableWithBloomFilter4() throws IOException {
+		// ----------------------------------------------0% filtered during probe spill phase-----------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 1;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = probeSize / buildStep;
+		
+		this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+		
+		System.out.println("HybridHashJoin4:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+	}
+	
+	@Benchmark
+	public void compareMutableHashTableWithoutBloomFilter4() throws IOException {
+		// ----------------------------------------------0% filtered during probe spill phase-----------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 1;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = probeSize / buildStep;
+		
+		this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+		
+		System.out.println("HybridHashJoin4:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+	}
+	
+	private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize,
+		int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException {
+		
+		InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
+		InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
+		
+		// allocate the memory for the HashTable
+		List<MemorySegment> memSegments;
+		try {
+			// 33 is minimum number of pages required to perform hash join this inputs
+			memSegments = this.memManager.allocatePages(MEM_OWNER, (int) (this.memManager.getMemorySize() / this.memManager.getPageSize()));
+		} catch (MemoryAllocationException maex) {
+			fail("Memory for the Join could not be provided.");
+			return -1;
+		}
+		
+		// ----------------------------------------------------------------------------------------
+		
+		final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
+			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
+			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
+			memSegments, ioManager, enableBloomFilter);
+		join.open(buildIterator, probeIterator);
+		
+		final StringPair recordReuse = new StringPair();
+		int numRecordsInJoinResult = 0;
+		
+		while (join.nextRecord()) {
+			MutableHashTable.HashBucketIterator<StringPair, StringPair> buildSide = join.getBuildSideIterator();
+			while (buildSide.next(recordReuse) != null) {
+				numRecordsInJoinResult++;
+			}
+		}
+		Assert.assertEquals("Wrong number of records in join result.", expectedResultSize, numRecordsInJoinResult);
+		
+		join.close();
+		// ----------------------------------------------------------------------------------------
+		
+		this.memManager.release(join.getFreedMemory());
+		return 1;
+	}
+	
+	
+	static class InputIterator implements MutableObjectIterator<StringPair> {
+		
+		private int numLeft;
+		private int distance;
+		private int scope;
+		
+		public InputIterator(int size, int distance, int scope) {
+			this.numLeft = size;
+			this.distance = distance;
+			this.scope = scope;
+		}
+		
+		@Override
+		public StringPair next(StringPair reuse) throws IOException {
+			if (this.numLeft > 0) {
+				numLeft--;
+				int currentKey = (numLeft * distance) % scope;
+				reuse.setKey(Integer.toString(currentKey));
+				reuse.setValue(COMMENT);
+				return reuse;
+			} else {
+				return null;
+			}
+		}
+		
+		@Override
+		public StringPair next() throws IOException {
+			return next(new StringPair());
+		}
+	}
+	
+	public static void main(String[] args) throws Exception {
+		Options opt = new OptionsBuilder()
+				.include(MutableHashTablePerformanceBenchmark.class.getSimpleName())
+				.warmupIterations(2)
+				.measurementIterations(2)
+				.forks(1)
+				.build();
+		new Runner(opt).run();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e5ee55bd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
deleted file mode 100644
index 70c9427..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.types.StringPair;
-import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
-import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
-import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
-import org.apache.flink.util.MutableObjectIterator;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public class MutableHashTablePerformanceBenchmark {
-	
-	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-	
-	private MemoryManager memManager;
-	private IOManager ioManager;
-	
-	private TypeSerializer<StringPair> pairBuildSideAccesssor;
-	private TypeSerializer<StringPair> pairProbeSideAccesssor;
-	private TypeComparator<StringPair> pairBuildSideComparator;
-	private TypeComparator<StringPair> pairProbeSideComparator;
-	private TypePairComparator<StringPair, StringPair> pairComparator;
-	
-	private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char.";
-	
-	
-	@Before
-	public void setup() {
-		this.pairBuildSideAccesssor = new StringPairSerializer();
-		this.pairProbeSideAccesssor = new StringPairSerializer();
-		this.pairBuildSideComparator = new StringPairComparator();
-		this.pairProbeSideComparator = new StringPairComparator();
-		this.pairComparator = new StringPairPairComparator();
-		
-		this.memManager = new MemoryManager(64 * 1024 * 1024, 1);
-		this.ioManager = new IOManagerAsync();
-	}
-	
-	@After
-	public void tearDown() {
-		// shut down I/O manager and Memory Manager and verify the correct shutdown
-		this.ioManager.shutdown();
-		if (!this.ioManager.isProperlyShutDown()) {
-			fail("I/O manager was not property shut down.");
-		}
-		if (!this.memManager.verifyEmpty()) {
-			fail("Not all memory was properly released to the memory manager --> Memory Leak.");
-		}
-	}
-	
-	@Test
-	public void compareMutableHashTablePerformance1() throws IOException {
-		// ----------------------------------------------90% filtered during probe spill phase-----------------------------------------
-		// create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records.
-		int buildSize = 1000000;
-		int buildStep = 10;
-		int buildScope = buildStep * buildSize;
-		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
-		int probeSize = 5000000;
-		int probeStep = 1;
-		int probeScope = buildSize;
-		
-		int expectedResult = 500000;
-		
-		long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
-		long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
-		
-		System.out.println("HybridHashJoin2:");
-		System.out.println("Build input size: " + 100 * buildSize);
-		System.out.println("Probe input size: " + 100 * probeSize);
-		System.out.println("Available memory: " + this.memManager.getMemorySize());
-		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
-		System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
-	}
-	
-	@Test
-	public void compareMutableHashTablePerformance2() throws IOException {
-		// ----------------------------------------------80% filtered during probe spill phase-----------------------------------------
-		// create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records.
-		int buildSize = 1000000;
-		int buildStep = 5;
-		int buildScope = buildStep * buildSize;
-		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
-		int probeSize = 5000000;
-		int probeStep = 1;
-		int probeScope = buildSize;
-		
-		int expectedResult = 1000000;
-		
-		long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
-		long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
-		
-		System.out.println("HybridHashJoin3:");
-		System.out.println("Build input size: " + 100 * buildSize);
-		System.out.println("Probe input size: " + 100 * probeSize);
-		System.out.println("Available memory: " + this.memManager.getMemorySize());
-		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
-		System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
-	}
-	
-	@Test
-	public void compareMutableHashTablePerformance3() throws IOException {
-		// ----------------------------------------------50% filtered during probe spill phase-------------------------------------------------
-		// create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records.
-		int buildSize = 1000000;
-		int buildStep = 2;
-		int buildScope = buildStep * buildSize;
-		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
-		int probeSize = 5000000;
-		int probeStep = 1;
-		int probeScope = buildSize;
-		
-		int expectedResult = 2500000;
-		
-		long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
-		long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
-		
-		System.out.println("HybridHashJoin4:");
-		System.out.println("Build input size: " + 100 * buildSize);
-		System.out.println("Probe input size: " + 100 * probeSize);
-		System.out.println("Available memory: " + this.memManager.getMemorySize());
-		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
-		System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
-	}
-	
-	@Test
-	public void compareMutableHashTablePerformance4() throws IOException {
-		// ----------------------------------------------0% filtered during probe spill phase-----------------------------------------
-		// create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records.
-		int buildSize = 1000000;
-		int buildStep = 1;
-		int buildScope = buildStep * buildSize;
-		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
-		int probeSize = 5000000;
-		int probeStep = 1;
-		int probeScope = buildSize;
-		
-		int expectedResult = probeSize / buildStep;
-		
-		long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
-		long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
-		
-		System.out.println("HybridHashJoin5:");
-		System.out.println("Build input size: " + 100 * buildSize);
-		System.out.println("Probe input size: " + 100 * probeSize);
-		System.out.println("Available memory: " + this.memManager.getMemorySize());
-		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
-		System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
-	}
-	
-	private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize,
-		int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException {
-		
-		InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
-		InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
-		
-		// allocate the memory for the HashTable
-		List<MemorySegment> memSegments;
-		try {
-			// 33 is minimum number of pages required to perform hash join this inputs
-			memSegments = this.memManager.allocatePages(MEM_OWNER, (int) (this.memManager.getMemorySize() / this.memManager.getPageSize()));
-		} catch (MemoryAllocationException maex) {
-			fail("Memory for the Join could not be provided.");
-			return -1;
-		}
-		
-		// ----------------------------------------------------------------------------------------
-		
-		long start = System.currentTimeMillis();
-		final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
-			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
-			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
-			memSegments, ioManager, enableBloomFilter);
-		join.open(buildIterator, probeIterator);
-		
-		final StringPair recordReuse = new StringPair();
-		int numRecordsInJoinResult = 0;
-		
-		while (join.nextRecord()) {
-			MutableHashTable.HashBucketIterator<StringPair, StringPair> buildSide = join.getBuildSideIterator();
-			while (buildSide.next(recordReuse) != null) {
-				numRecordsInJoinResult++;
-			}
-		}
-		Assert.assertEquals("Wrong number of records in join result.", expectedResultSize, numRecordsInJoinResult);
-		
-		join.close();
-		long cost = System.currentTimeMillis() - start;
-		// ----------------------------------------------------------------------------------------
-		
-		this.memManager.release(join.getFreedMemory());
-		return cost;
-	}
-	
-	
-	static class InputIterator implements MutableObjectIterator<StringPair> {
-		
-		private int numLeft;
-		private int distance;
-		private int scope;
-		
-		public InputIterator(int size, int distance, int scope) {
-			this.numLeft = size;
-			this.distance = distance;
-			this.scope = scope;
-		}
-		
-		@Override
-		public StringPair next(StringPair reuse) throws IOException {
-			if (this.numLeft > 0) {
-				numLeft--;
-				int currentKey = (numLeft * distance) % scope;
-				reuse.setKey(Integer.toString(currentKey));
-				reuse.setValue(COMMENT);
-				return reuse;
-			} else {
-				return null;
-			}
-		}
-		
-		@Override
-		public StringPair next() throws IOException {
-			return next(new StringPair());
-		}
-	}
-}