You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/26 22:50:49 UTC
[6/8] flink git commit: [FLINK-2853] Port
MutableHashTablePerformanceBenchmark to JMH.
[FLINK-2853] Port MutableHashTablePerformanceBenchmark to JMH.
This closes #1267
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5ee55bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5ee55bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5ee55bd
Branch: refs/heads/master
Commit: e5ee55bdaf0dcd6e3dc38b52964a8570b22dd671
Parents: 75a5257
Author: gallenvara <ga...@126.com>
Authored: Fri Oct 16 16:48:30 2015 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Oct 26 20:05:03 2015 +0100
----------------------------------------------------------------------
flink-benchmark/pom.xml | 11 +-
.../MutableHashTablePerformanceBenchmark.java | 359 +++++++++++++++++++
.../MutableHashTablePerformanceBenchmark.java | 262 --------------
3 files changed, 368 insertions(+), 264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e5ee55bd/flink-benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/flink-benchmark/pom.xml b/flink-benchmark/pom.xml
index d6ba1d7..b22e0d1 100644
--- a/flink-benchmark/pom.xml
+++ b/flink-benchmark/pom.xml
@@ -56,14 +56,21 @@ under the License.
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
<scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
+ <artifactId>flink-java</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
- </dependency>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/e5ee55bd/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
new file mode 100644
index 0000000..186c595
--- /dev/null
+++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.hash.MutableHashTable;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.types.StringPair;
+import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import static org.junit.Assert.fail;
+
+@State(Scope.Thread)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class MutableHashTablePerformanceBenchmark {
+
+ private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+
+ private MemoryManager memManager;
+ private IOManager ioManager;
+
+ private TypeSerializer<StringPair> pairBuildSideAccesssor;
+ private TypeSerializer<StringPair> pairProbeSideAccesssor;
+ private TypeComparator<StringPair> pairBuildSideComparator;
+ private TypeComparator<StringPair> pairProbeSideComparator;
+ private TypePairComparator<StringPair, StringPair> pairComparator;
+
+ private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char.";
+
+ @Setup
+ public void setup() {
+ this.pairBuildSideAccesssor = new StringPairSerializer();
+ this.pairProbeSideAccesssor = new StringPairSerializer();
+ this.pairBuildSideComparator = new StringPairComparator();
+ this.pairProbeSideComparator = new StringPairComparator();
+ this.pairComparator = new StringPairPairComparator();
+
+ this.memManager = new MemoryManager(64 * 1024 * 1024, 1);
+ this.ioManager = new IOManagerAsync();
+ }
+
+ @TearDown
+ public void tearDown() {
+ // shut down I/O manager and Memory Manager and verify the correct shutdown
+ this.ioManager.shutdown();
+ if (!this.ioManager.isProperlyShutDown()) {
+ fail("I/O manager was not property shut down.");
+ }
+ if (!this.memManager.verifyEmpty()) {
+ fail("Not all memory was properly released to the memory manager --> Memory Leak.");
+ }
+ }
+
+ @Benchmark
+ public void compareMutableHashTableWithBloomFilter1() throws IOException {
+ // ----------------------------------------------90% filtered during probe spill phase-----------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 10;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = 500000;
+
+ this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+
+ System.out.println("HybridHashJoin1:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ }
+
+ @Benchmark
+ public void compareMutableHashTableWithoutBloomFilter1() throws IOException {
+ // ----------------------------------------------90% filtered during probe spill phase-----------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 10;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = 500000;
+
+ this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+
+ System.out.println("HybridHashJoin1:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ }
+
+ @Benchmark
+ public void compareMutableHashTableWithBloomFilter2() throws IOException {
+ // ----------------------------------------------80% filtered during probe spill phase-----------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 5;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = 1000000;
+
+ this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+
+ System.out.println("HybridHashJoin2:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ }
+
+ @Benchmark
+ public void compareMutableHashTableWithoutBloomFilter2() throws IOException {
+ // ----------------------------------------------80% filtered during probe spill phase-----------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 5;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = 1000000;
+
+ this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+
+ System.out.println("HybridHashJoin2:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ }
+
+ @Benchmark
+ public void compareMutableHashTableWithBloomFilter3() throws IOException {
+ // ----------------------------------------------50% filtered during probe spill phase-------------------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 2;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = 2500000;
+
+ this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+
+ System.out.println("HybridHashJoin3:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ }
+
+ @Benchmark
+ public void compareMutableHashTableWithoutBloomFilter3() throws IOException {
+ // ----------------------------------------------50% filtered during probe spill phase-------------------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 2;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = 2500000;
+
+ this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+
+ System.out.println("HybridHashJoin3:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ }
+
+ @Benchmark
+ public void compareMutableHashTableWithBloomFilter4() throws IOException {
+ // ----------------------------------------------0% filtered during probe spill phase-----------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 1;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = probeSize / buildStep;
+
+ this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+
+ System.out.println("HybridHashJoin4:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ }
+
+ @Benchmark
+ public void compareMutableHashTableWithoutBloomFilter4() throws IOException {
+ // ----------------------------------------------0% filtered during probe spill phase-----------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 1;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = probeSize / buildStep;
+
+ this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+
+ System.out.println("HybridHashJoin4:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ }
+
+ private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize,
+ int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException {
+
+ InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
+ InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
+
+ // allocate the memory for the HashTable
+ List<MemorySegment> memSegments;
+ try {
+ // 33 is minimum number of pages required to perform hash join this inputs
+ memSegments = this.memManager.allocatePages(MEM_OWNER, (int) (this.memManager.getMemorySize() / this.memManager.getPageSize()));
+ } catch (MemoryAllocationException maex) {
+ fail("Memory for the Join could not be provided.");
+ return -1;
+ }
+
+ // ----------------------------------------------------------------------------------------
+
+ final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
+ this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
+ this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
+ memSegments, ioManager, enableBloomFilter);
+ join.open(buildIterator, probeIterator);
+
+ final StringPair recordReuse = new StringPair();
+ int numRecordsInJoinResult = 0;
+
+ while (join.nextRecord()) {
+ MutableHashTable.HashBucketIterator<StringPair, StringPair> buildSide = join.getBuildSideIterator();
+ while (buildSide.next(recordReuse) != null) {
+ numRecordsInJoinResult++;
+ }
+ }
+ Assert.assertEquals("Wrong number of records in join result.", expectedResultSize, numRecordsInJoinResult);
+
+ join.close();
+ // ----------------------------------------------------------------------------------------
+
+ this.memManager.release(join.getFreedMemory());
+ return 1;
+ }
+
+
+ static class InputIterator implements MutableObjectIterator<StringPair> {
+
+ private int numLeft;
+ private int distance;
+ private int scope;
+
+ public InputIterator(int size, int distance, int scope) {
+ this.numLeft = size;
+ this.distance = distance;
+ this.scope = scope;
+ }
+
+ @Override
+ public StringPair next(StringPair reuse) throws IOException {
+ if (this.numLeft > 0) {
+ numLeft--;
+ int currentKey = (numLeft * distance) % scope;
+ reuse.setKey(Integer.toString(currentKey));
+ reuse.setValue(COMMENT);
+ return reuse;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public StringPair next() throws IOException {
+ return next(new StringPair());
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options opt = new OptionsBuilder()
+ .include(MutableHashTablePerformanceBenchmark.class.getSimpleName())
+ .warmupIterations(2)
+ .measurementIterations(2)
+ .forks(1)
+ .build();
+ new Runner(opt).run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5ee55bd/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
deleted file mode 100644
index 70c9427..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.types.StringPair;
-import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
-import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
-import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
-import org.apache.flink.util.MutableObjectIterator;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public class MutableHashTablePerformanceBenchmark {
-
- private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-
- private MemoryManager memManager;
- private IOManager ioManager;
-
- private TypeSerializer<StringPair> pairBuildSideAccesssor;
- private TypeSerializer<StringPair> pairProbeSideAccesssor;
- private TypeComparator<StringPair> pairBuildSideComparator;
- private TypeComparator<StringPair> pairProbeSideComparator;
- private TypePairComparator<StringPair, StringPair> pairComparator;
-
- private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char.";
-
-
- @Before
- public void setup() {
- this.pairBuildSideAccesssor = new StringPairSerializer();
- this.pairProbeSideAccesssor = new StringPairSerializer();
- this.pairBuildSideComparator = new StringPairComparator();
- this.pairProbeSideComparator = new StringPairComparator();
- this.pairComparator = new StringPairPairComparator();
-
- this.memManager = new MemoryManager(64 * 1024 * 1024, 1);
- this.ioManager = new IOManagerAsync();
- }
-
- @After
- public void tearDown() {
- // shut down I/O manager and Memory Manager and verify the correct shutdown
- this.ioManager.shutdown();
- if (!this.ioManager.isProperlyShutDown()) {
- fail("I/O manager was not property shut down.");
- }
- if (!this.memManager.verifyEmpty()) {
- fail("Not all memory was properly released to the memory manager --> Memory Leak.");
- }
- }
-
- @Test
- public void compareMutableHashTablePerformance1() throws IOException {
- // ----------------------------------------------90% filtered during probe spill phase-----------------------------------------
- // create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records.
- int buildSize = 1000000;
- int buildStep = 10;
- int buildScope = buildStep * buildSize;
- // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
- int probeSize = 5000000;
- int probeStep = 1;
- int probeScope = buildSize;
-
- int expectedResult = 500000;
-
- long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
- long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
-
- System.out.println("HybridHashJoin2:");
- System.out.println("Build input size: " + 100 * buildSize);
- System.out.println("Probe input size: " + 100 * probeSize);
- System.out.println("Available memory: " + this.memManager.getMemorySize());
- System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
- System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
- }
-
- @Test
- public void compareMutableHashTablePerformance2() throws IOException {
- // ----------------------------------------------80% filtered during probe spill phase-----------------------------------------
- // create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records.
- int buildSize = 1000000;
- int buildStep = 5;
- int buildScope = buildStep * buildSize;
- // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
- int probeSize = 5000000;
- int probeStep = 1;
- int probeScope = buildSize;
-
- int expectedResult = 1000000;
-
- long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
- long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
-
- System.out.println("HybridHashJoin3:");
- System.out.println("Build input size: " + 100 * buildSize);
- System.out.println("Probe input size: " + 100 * probeSize);
- System.out.println("Available memory: " + this.memManager.getMemorySize());
- System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
- System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
- }
-
- @Test
- public void compareMutableHashTablePerformance3() throws IOException {
- // ----------------------------------------------50% filtered during probe spill phase-------------------------------------------------
- // create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records.
- int buildSize = 1000000;
- int buildStep = 2;
- int buildScope = buildStep * buildSize;
- // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
- int probeSize = 5000000;
- int probeStep = 1;
- int probeScope = buildSize;
-
- int expectedResult = 2500000;
-
- long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
- long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
-
- System.out.println("HybridHashJoin4:");
- System.out.println("Build input size: " + 100 * buildSize);
- System.out.println("Probe input size: " + 100 * probeSize);
- System.out.println("Available memory: " + this.memManager.getMemorySize());
- System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
- System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
- }
-
- @Test
- public void compareMutableHashTablePerformance4() throws IOException {
- // ----------------------------------------------0% filtered during probe spill phase-----------------------------------------
- // create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records.
- int buildSize = 1000000;
- int buildStep = 1;
- int buildScope = buildStep * buildSize;
- // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
- int probeSize = 5000000;
- int probeStep = 1;
- int probeScope = buildSize;
-
- int expectedResult = probeSize / buildStep;
-
- long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
- long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
-
- System.out.println("HybridHashJoin5:");
- System.out.println("Build input size: " + 100 * buildSize);
- System.out.println("Probe input size: " + 100 * probeSize);
- System.out.println("Available memory: " + this.memManager.getMemorySize());
- System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
- System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
- }
-
- private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize,
- int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException {
-
- InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
- InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
-
- // allocate the memory for the HashTable
- List<MemorySegment> memSegments;
- try {
- // 33 is minimum number of pages required to perform hash join this inputs
- memSegments = this.memManager.allocatePages(MEM_OWNER, (int) (this.memManager.getMemorySize() / this.memManager.getPageSize()));
- } catch (MemoryAllocationException maex) {
- fail("Memory for the Join could not be provided.");
- return -1;
- }
-
- // ----------------------------------------------------------------------------------------
-
- long start = System.currentTimeMillis();
- final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
- this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
- this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
- memSegments, ioManager, enableBloomFilter);
- join.open(buildIterator, probeIterator);
-
- final StringPair recordReuse = new StringPair();
- int numRecordsInJoinResult = 0;
-
- while (join.nextRecord()) {
- MutableHashTable.HashBucketIterator<StringPair, StringPair> buildSide = join.getBuildSideIterator();
- while (buildSide.next(recordReuse) != null) {
- numRecordsInJoinResult++;
- }
- }
- Assert.assertEquals("Wrong number of records in join result.", expectedResultSize, numRecordsInJoinResult);
-
- join.close();
- long cost = System.currentTimeMillis() - start;
- // ----------------------------------------------------------------------------------------
-
- this.memManager.release(join.getFreedMemory());
- return cost;
- }
-
-
- static class InputIterator implements MutableObjectIterator<StringPair> {
-
- private int numLeft;
- private int distance;
- private int scope;
-
- public InputIterator(int size, int distance, int scope) {
- this.numLeft = size;
- this.distance = distance;
- this.scope = scope;
- }
-
- @Override
- public StringPair next(StringPair reuse) throws IOException {
- if (this.numLeft > 0) {
- numLeft--;
- int currentKey = (numLeft * distance) % scope;
- reuse.setKey(Integer.toString(currentKey));
- reuse.setValue(COMMENT);
- return reuse;
- } else {
- return null;
- }
- }
-
- @Override
- public StringPair next() throws IOException {
- return next(new StringPair());
- }
- }
-}