You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/06 18:52:17 UTC
[3/5] flink git commit: [FLINK-2240] Add bloom filter to filter probe
records during hash join.
[FLINK-2240] Add bloom filter to filter probe records during hash join.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61dcae39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61dcae39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61dcae39
Branch: refs/heads/master
Commit: 61dcae391cb3b45ba3aff47d4d9163889d2958a4
Parents: 685086a
Author: chengxiang li <ch...@intel.com>
Authored: Fri Jul 3 23:53:47 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 6 17:14:39 2015 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 13 +-
.../operators/hash/MutableHashTable.java | 145 +++++++++-
.../runtime/operators/util/BloomFilter.java | 226 ++++++++++++++++
.../MutableHashTablePerformanceBenchmark.java | 268 +++++++++++++++++++
.../runtime/operators/util/BloomFilterTest.java | 162 +++++++++++
5 files changed, 806 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index c76741b..dad2d99 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -178,7 +178,7 @@ public final class ConfigConstants {
* for hybrid hash joins.
*/
public static final String DEFAULT_SPILLING_MAX_FAN_KEY = "taskmanager.runtime.max-fan";
-
+
/**
* Key for the default spilling threshold. When more than the threshold memory of the sort buffers is full, the
* sorter will start spilling to disk.
@@ -190,6 +190,12 @@ public final class ConfigConstants {
* A value of 0 indicates infinite waiting.
*/
public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout";
+
+ /**
+ * While spill probe record to disk during probe phase, whether enable bloom filter to filter the probe records
+ * to minimize the spilled probe records count.
+ */
+ public static final String HASHJOIN_ENABLE_BLOOMFILTER = "hashjoin.bloomfilter.enabled";
// ------------------------ YARN Configuration ------------------------
@@ -552,6 +558,11 @@ public final class ConfigConstants {
* The default timeout for filesystem stream opening: infinite (means max long milliseconds).
*/
public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;
+
+ /**
+ * Enable bloom filter for hash join as it promote hash join performance most of the time.
+ */
+ public static final boolean DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER = true;
// ------------------------ YARN Configuration ------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 7f07cfb..4a57986 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -16,16 +16,16 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.hash;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.iterative.io.HashPartitionIterator;
+import org.apache.flink.runtime.operators.util.BloomFilter;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
@@ -194,6 +195,11 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
* Constant for the bucket status, indicating that the bucket is in memory.
*/
private static final byte BUCKET_STATUS_IN_MEMORY = 0;
+
+ /**
+ * Constant for the bucket status, indicating that the bucket has filter.
+ */
+ private static final byte BUCKET_STATUS_IN_FILTER = 1;
// ------------------------------------------------------------------------
// Members
@@ -348,6 +354,8 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
private boolean running = true;
+ private BloomFilter bloomFilter;
+
// ------------------------------------------------------------------------
// Construction and Teardown
// ------------------------------------------------------------------------
@@ -469,12 +477,19 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
this.recordComparator.setReference(next);
this.bucketIterator.set(bucket, p.overflowSegments, p, hash, bucketInSegmentOffset);
return true;
- }
- else {
- p.insertIntoProbeBuffer(next);
+ } else {
+ byte status = bucket.get(bucketInSegmentOffset + HEADER_STATUS_OFFSET);
+ if (status == BUCKET_STATUS_IN_FILTER) {
+ this.bloomFilter.setBitsLocation(bucket, bucketInSegmentOffset + BUCKET_HEADER_LENGTH);
+ // Use BloomFilter to filter out all the probe records which would not match any key in spilled build table buckets.
+ if (this.bloomFilter.testHash(hash)) {
+ p.insertIntoProbeBuffer(next);
+ }
+ } else {
+ p.insertIntoProbeBuffer(next);
+ }
}
}
-
// -------------- partition done ---------------
return false;
@@ -710,6 +725,27 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
p.finalizeBuildPhase(this.ioManager, this.currentEnumerator, this.writeBehindBuffers);
}
}
+
+ private void initBloomFilter(int numBuckets) {
+ int avgNumRecordsPerBucket = getEstimatedMaxBucketEntries(this.availableMemory.size(), this.segmentSize,
+ numBuckets, this.avgRecordLen);
+ // Assign all bucket size to bloom filter except bucket header length.
+ int byteSize = HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH;
+ this.bloomFilter = new BloomFilter(avgNumRecordsPerBucket, byteSize);
+ if (LOG.isDebugEnabled()) {
+ double fpp = BloomFilter.estimateFalsePositiveProbability(avgNumRecordsPerBucket, byteSize << 3);
+ LOG.debug(String.format("Create BloomFilter with average input entries per bucket[%d], bytes size[%d], false positive probability[%f].",
+ avgNumRecordsPerBucket, byteSize, fpp));
+ }
+ }
+
+ final private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
+ final long totalSize = ((long) bufferSize) * numBuffers;
+ final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
+ final long maxNumRecordsStorable = (MAX_RECURSION_DEPTH + 1) * numRecordsStorable;
+ final long maxNumRecordsPerBucket = maxNumRecordsStorable / numBuckets;
+ return (int) maxNumRecordsPerBucket;
+ }
/**
* @param p
@@ -816,7 +852,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
final int hashCode = hash(btComparator.hash(rec), nextRecursionLevel);
insertIntoTable(rec, hashCode);
}
-
+
// finalize the partitions
for (int i = 0; i < this.partitionsBeingBuilt.size(); i++) {
HashPartition<BT, PT> part = this.partitionsBeingBuilt.get(i);
@@ -853,6 +889,14 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
if (pointer != -1) {
// record was inserted into an in-memory partition. a pointer must be inserted into the buckets
insertBucketEntry(p, bucket, bucketInSegmentPos, hashCode, pointer);
+ } else {
+ byte status = bucket.get(bucketInSegmentPos + HEADER_STATUS_OFFSET);
+ if (status == BUCKET_STATUS_IN_FILTER) {
+ // While partition has been spilled, relocation bloom filter bits for current bucket,
+ // and build bloom filter with hashcode.
+ this.bloomFilter.setBitsLocation(bucket, bucketInSegmentPos + BUCKET_HEADER_LENGTH);
+ this.bloomFilter.addHash(hashCode);
+ }
}
}
@@ -1047,6 +1091,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
this.buckets = table;
this.numBuckets = numBuckets;
+
+ boolean enableBloomFilter = GlobalConfiguration.getBoolean(
+ ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
+ if (enableBloomFilter) {
+ initBloomFilter(numBuckets);
+ }
}
/**
@@ -1088,6 +1138,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
final HashPartition<BT, PT> p = partitions.get(largestPartNum);
+ boolean enableBloomFilter = GlobalConfiguration.getBoolean(
+ ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
+ if (enableBloomFilter) {
+ buildBloomFilterForBucketsInPartition(largestPartNum, p);
+ }
+
// spill the partition
int numBuffersFreed = p.spillPartition(this.availableMemory, this.ioManager,
this.currentEnumerator.next(), this.writeBehindBuffers);
@@ -1101,6 +1157,81 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
return largestPartNum;
}
+ final protected void buildBloomFilterForBucketsInPartition(int partNum, HashPartition<BT, PT> partition) {
+ // Find all the buckets which belongs to this partition, and build bloom filter for each bucket(include its overflow buckets).
+ final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
+ for (MemorySegment segment : this.buckets) {
+ for (int i = 0; i < bucketsPerSegment; i++) {
+ final int bucketInSegmentOffset = i * HASH_BUCKET_SIZE;
+ byte partitionNumber = segment.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
+ if (partitionNumber == partNum) {
+ byte status = segment.get(bucketInSegmentOffset + HEADER_STATUS_OFFSET);
+ if (status == BUCKET_STATUS_IN_MEMORY) {
+ buildBloomFilterForBucket(bucketInSegmentOffset, segment, partition);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Set all the bucket memory except bucket header as the bit set of bloom filter, and use hash code of build records
+ * to build bloom filter.
+ *
+ * @param bucketInSegmentPos
+ * @param bucket
+ * @param p
+ */
+ final void buildBloomFilterForBucket(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
+ final int count = bucket.getShort(bucketInSegmentPos + HEADER_COUNT_OFFSET);
+ int[] hashCodes = new int[count];
+ // As the hashcode and bloom filter occupy same bytes, so we read all hashcode out at first and then write back to bloom filter.
+ for (int i = 0; i < count; i++) {
+ hashCodes[i] = bucket.getInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + i * HASH_CODE_LEN);
+ }
+ this.bloomFilter.setBitsLocation(bucket, bucketInSegmentPos + BUCKET_HEADER_LENGTH);
+ for (int hashCode : hashCodes) {
+ this.bloomFilter.addHash(hashCode);
+ }
+ buildBloomFilterForExtraOverflowSegments(bucketInSegmentPos, bucket, p);
+ }
+
+ final private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
+ int totalCount = 0;
+ boolean skip = false;
+ long forwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
+ while (forwardPointer != BUCKET_FORWARD_POINTER_NOT_SET) {
+ final int overflowSegNum = (int) (forwardPointer >>> 32);
+ if (overflowSegNum < 0 || overflowSegNum >= p.numOverflowSegments) {
+ skip = true;
+ break;
+ }
+ MemorySegment overflowSegment = p.overflowSegments[overflowSegNum];
+ int bucketInOverflowSegmentOffset = (int) (forwardPointer & 0xffffffff);
+
+ final int count = overflowSegment.getShort(bucketInOverflowSegmentOffset + HEADER_COUNT_OFFSET);
+ totalCount += count;
+ // The bits size of bloom filter per bucket is 112 * 8, while expected input entries is greater than 2048, the fpp would higher than 0.9,
+ // which make the bloom filter an overhead instead of optimization.
+ if (totalCount > 2048) {
+ skip = true;
+ break;
+ }
+
+ for (int i = 0; i < count; i++) {
+ int hashCode = overflowSegment.getInt(bucketInOverflowSegmentOffset + BUCKET_HEADER_LENGTH + i * HASH_CODE_LEN);
+ this.bloomFilter.addHash(hashCode);
+ }
+
+ forwardPointer = overflowSegment.getLong(bucketInOverflowSegmentOffset + HEADER_FORWARD_OFFSET);
+
+ }
+
+ if (!skip) {
+ bucket.put(bucketInSegmentPos + HEADER_STATUS_OFFSET, BUCKET_STATUS_IN_FILTER);
+ }
+ }
+
/**
* This method makes sure that at least a certain number of memory segments is in the list of free segments.
* Free memory can be in the list of free segments, or in the return-queue where segments used to write behind are
http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
new file mode 100644
index 0000000..947a56b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.core.memory.MemorySegment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * BloomFilter is a probabilistic data structure for set membership check. BloomFilters are
+ * highly space efficient when compared to using a HashSet. Because of the probabilistic nature of
+ * bloom filter false positive (element not present in bloom filter but test() says true) are
+ * possible but false negatives are not possible (if element is present then test() will never
+ * say false). The false positive probability is configurable depending on which storage requirement
+ * may increase or decrease. Lower the false positive probability greater is the space requirement.
+ * Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
+ * During the creation of bloom filter expected number of entries must be specified. If the number
+ * of insertions exceed the specified initial number of entries then false positive probability will
+ * increase accordingly.
+ * <p/>
+ * Internally, this implementation of bloom filter uses MemorySegment to store BitSet, BloomFilter and
+ * BitSet are designed to be able to switch between different MemorySegments, so that Flink can share
+ * the same BloomFilter/BitSet object instance for different bloom filters.
+ * <p/>
+ * Part of this class refers to the implementation from Apache Hive project
+ * https://github.com/apache/hive/blob/master/common/src/java/org/apache/hive/common/util/BloomFilter.java
+ */
+
+public class BloomFilter {
+
+ protected BitSet bitSet;
+ protected int expectedEntries;
+ protected int numHashFunctions;
+
+ public BloomFilter(int expectedEntries, int byteSize) {
+ checkArgument(expectedEntries > 0, "expectedEntries should be > 0");
+ this.expectedEntries = expectedEntries;
+ this.numHashFunctions = optimalNumOfHashFunctions(expectedEntries, byteSize << 3);
+ this.bitSet = new BitSet(byteSize);
+ }
+
+ public void setBitsLocation(MemorySegment memorySegment, int offset) {
+ this.bitSet.setMemorySegment(memorySegment, offset);
+ }
+
+ /**
+ * Compute optimal bits number with given input entries and expected false positive probability.
+ *
+ * @param inputEntries
+ * @param fpp
+ * @return optimal bits number
+ */
+ public static int optimalNumOfBits(long inputEntries, double fpp) {
+ int numBits = (int) (-inputEntries * Math.log(fpp) / (Math.log(2) * Math.log(2)));
+ return numBits;
+ }
+
+ /**
+ * Compute the false positive probability based on given input entries and bits size.
+ * Note: this is just the math expected value, you should not expect the fpp in real case would under the return value for certain.
+ *
+ * @param inputEntries
+ * @param bitSize
+ * @return
+ */
+ public static double estimateFalsePositiveProbability(long inputEntries, int bitSize) {
+ int numFunction = optimalNumOfHashFunctions(inputEntries, bitSize);
+ double p = Math.pow(Math.E, -(double) numFunction * inputEntries / bitSize);
+ double estimatedFPP = Math.pow(1 - p, numFunction);
+ return estimatedFPP;
+ }
+
+ /**
+ * compute the optimal hash function number with given input entries and bits size, which would
+ * make the false positive probability lowest.
+ *
+ * @param expectEntries
+ * @param bitSize
+ * @return hash function number
+ */
+ static int optimalNumOfHashFunctions(long expectEntries, long bitSize) {
+ return Math.max(1, (int) Math.round((double) bitSize / expectEntries * Math.log(2)));
+ }
+
+ public void addHash(int hash32) {
+ int hash1 = hash32;
+ int hash2 = hash32 >>> 16;
+
+ for (int i = 1; i <= numHashFunctions; i++) {
+ int combinedHash = hash1 + (i * hash2);
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combinedHash < 0) {
+ combinedHash = ~combinedHash;
+ }
+ int pos = combinedHash % bitSet.bitSize();
+ bitSet.set(pos);
+ }
+ }
+
+ public boolean testHash(int hash32) {
+ int hash1 = hash32;
+ int hash2 = hash32 >>> 16;
+
+ for (int i = 1; i <= numHashFunctions; i++) {
+ int combinedHash = hash1 + (i * hash2);
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combinedHash < 0) {
+ combinedHash = ~combinedHash;
+ }
+ int pos = combinedHash % bitSet.bitSize();
+ if (!bitSet.get(pos)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void reset() {
+ this.bitSet.clear();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder output = new StringBuilder();
+ output.append("BloomFilter:\n");
+ output.append("\thash function number:").append(numHashFunctions).append("\n");
+ output.append(bitSet);
+ return output.toString();
+ }
+
+ /**
+ * Bare metal bit set implementation. For performance reasons, this implementation does not check
+ * for index bounds nor expand the bit set size if the specified index is greater than the size.
+ */
+ public class BitSet {
+ private MemorySegment memorySegment;
+ // MemorySegment byte array offset.
+ private int offset;
+ // MemorySegment byte size.
+ private int length;
+ private final int LONG_POSITION_MASK = 0xffffffc0;
+
+ public BitSet(int byteSize) {
+ Preconditions.checkArgument(byteSize > 0, "bits size should be greater than 0.");
+ Preconditions.checkArgument(byteSize << 29 == 0, "bytes size should be integral multiple of long size(8 Bytes).");
+ this.length = byteSize;
+ }
+
+ public void setMemorySegment(MemorySegment memorySegment, int offset) {
+ this.memorySegment = memorySegment;
+ this.offset = offset;
+ }
+
+ /**
+ * Sets the bit at specified index.
+ *
+ * @param index - position
+ */
+ public void set(int index) {
+ int longIndex = (index & LONG_POSITION_MASK) >>> 3;
+ long current = memorySegment.getLong(offset + longIndex);
+ current |= (1L << index);
+ memorySegment.putLong(offset + longIndex, current);
+ }
+
+ /**
+ * Returns true if the bit is set in the specified index.
+ *
+ * @param index - position
+ * @return - value at the bit position
+ */
+ public boolean get(int index) {
+ int longIndex = (index & LONG_POSITION_MASK) >>> 3;
+ long current = memorySegment.getLong(offset + longIndex);
+ return (current & (1L << index)) != 0;
+ }
+
+ /**
+ * Number of bits
+ */
+ public int bitSize() {
+ return length << 3;
+ }
+
+ public MemorySegment getMemorySegment() {
+ return this.memorySegment;
+ }
+
+ /**
+ * Clear the bit set.
+ */
+ public void clear() {
+ long zeroValue = 0L;
+ for (int i = 0; i < (length / 8); i++) {
+ memorySegment.putLong(offset + i * 8, zeroValue);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder output = new StringBuilder();
+ output.append("BitSet:\n");
+ output.append("\tMemorySegment:").append(memorySegment.size()).append("\n");
+ output.append("\tOffset:").append(offset).append("\n");
+ output.append("\tLength:").append(length).append("\n");
+ return output.toString();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
new file mode 100644
index 0000000..452e4c1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.types.StringPair;
+import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+public class MutableHashTablePerformanceBenchmark {
+ private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+
+ private MemoryManager memManager;
+ private IOManager ioManager;
+
+ private TypeSerializer<StringPair> pairBuildSideAccesssor;
+ private TypeSerializer<StringPair> pairProbeSideAccesssor;
+ private TypeComparator<StringPair> pairBuildSideComparator;
+ private TypeComparator<StringPair> pairProbeSideComparator;
+ private TypePairComparator<StringPair, StringPair> pairComparator;
+
+ private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char.";
+
+
+ @Before
+ public void setup() {
+ this.pairBuildSideAccesssor = new StringPairSerializer();
+ this.pairProbeSideAccesssor = new StringPairSerializer();
+ this.pairBuildSideComparator = new StringPairComparator();
+ this.pairProbeSideComparator = new StringPairComparator();
+ this.pairComparator = new StringPairPairComparator();
+
+ this.memManager = new DefaultMemoryManager(64 * 1024 * 1024, 1);
+ this.ioManager = new IOManagerAsync();
+ }
+
+ @After
+ public void tearDown() {
+ // shut down I/O manager and Memory Manager and verify the correct shutdown
+ this.ioManager.shutdown();
+ if (!this.ioManager.isProperlyShutDown()) {
+ fail("I/O manager was not property shut down.");
+ }
+ if (!this.memManager.verifyEmpty()) {
+ fail("Not all memory was properly released to the memory manager --> Memory Leak.");
+ }
+ }
+
+ @Test
+ public void compareMutableHashTablePerformance1() throws IOException {
+ // ----------------------------------------------90% filtered during probe spill phase-----------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 10;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = 500000;
+
+ long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+ long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+
+ System.out.println("HybridHashJoin2:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+ }
+
+ @Test
+ public void compareMutableHashTablePerformance2() throws IOException {
+ // ----------------------------------------------80% filtered during probe spill phase-----------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 5;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = 1000000;
+
+ long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+ long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+
+ System.out.println("HybridHashJoin3:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+ }
+
+ @Test
+ public void compareMutableHashTablePerformance3() throws IOException {
+ // ----------------------------------------------50% filtered during probe spill phase-------------------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 2;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = 2500000;
+
+ long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+ long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+
+ System.out.println("HybridHashJoin4:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+ }
+
+ @Test
+ public void compareMutableHashTablePerformance4() throws IOException {
+ // ----------------------------------------------0% filtered during probe spill phase-----------------------------------------
+ // create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records.
+ int buildSize = 1000000;
+ int buildStep = 1;
+ int buildScope = buildStep * buildSize;
+ // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+ int probeSize = 5000000;
+ int probeStep = 1;
+ int probeScope = buildSize;
+
+ int expectedResult = probeSize / buildStep;
+
+ long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+ long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+
+ System.out.println("HybridHashJoin5:");
+ System.out.println("Build input size: " + 100 * buildSize);
+ System.out.println("Probe input size: " + 100 * probeSize);
+ System.out.println("Available memory: " + this.memManager.getMemorySize());
+ System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+ System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+ }
+
+ private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize,
+ int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException {
+
+ InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
+ InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, enableBloomFilter);
+ GlobalConfiguration.includeConfiguration(conf);
+
+ // allocate the memory for the HashTable
+ List<MemorySegment> memSegments;
+ try {
+ // 33 is minimum number of pages required to perform hash join this inputs
+ memSegments = this.memManager.allocatePages(MEM_OWNER, (int) (this.memManager.getMemorySize() / this.memManager.getPageSize()));
+ } catch (MemoryAllocationException maex) {
+ fail("Memory for the Join could not be provided.");
+ return -1;
+ }
+
+ // ----------------------------------------------------------------------------------------
+
+ long start = System.currentTimeMillis();
+ final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
+ this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
+ this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
+ memSegments, ioManager);
+ join.open(buildIterator, probeIterator);
+
+ final StringPair recordReuse = new StringPair();
+ int numRecordsInJoinResult = 0;
+
+ while (join.nextRecord()) {
+ MutableHashTable.HashBucketIterator<StringPair, StringPair> buildSide = join.getBuildSideIterator();
+ while (buildSide.next(recordReuse) != null) {
+ numRecordsInJoinResult++;
+ }
+ }
+ Assert.assertEquals("Wrong number of records in join result.", expectedResultSize, numRecordsInJoinResult);
+
+ join.close();
+ long cost = System.currentTimeMillis() - start;
+ // ----------------------------------------------------------------------------------------
+
+ this.memManager.release(join.getFreedMemory());
+ return cost;
+ }
+
+
+ static class InputIterator implements MutableObjectIterator<StringPair> {
+
+ private int numLeft;
+ private int distance;
+ private int scope;
+
+ public InputIterator(int size, int distance, int scope) {
+ this.numLeft = size;
+ this.distance = distance;
+ this.scope = scope;
+ }
+
+ @Override
+ public StringPair next(StringPair reuse) throws IOException {
+ if (this.numLeft > 0) {
+ numLeft--;
+ int currentKey = (numLeft * distance) % scope;
+ reuse.setKey(Integer.toString(currentKey));
+ reuse.setValue(COMMENT);
+ return reuse;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public StringPair next() throws IOException {
+ return next(new StringPair());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
new file mode 100644
index 0000000..cbbeca0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class BloomFilterTest {
+
+ private static BloomFilter bloomFilter;
+ private static final int INPUT_SIZE = 1024;
+ private static final double FALSE_POSITIVE_PROBABILITY = 0.05;
+
+ @BeforeClass
+ public static void init() {
+ int bitsSize = BloomFilter.optimalNumOfBits(INPUT_SIZE, FALSE_POSITIVE_PROBABILITY);
+ bitsSize = bitsSize + (Long.SIZE - (bitsSize % Long.SIZE));
+ int byteSize = bitsSize >>> 3;
+ MemorySegment memorySegment = new MemorySegment(new byte[byteSize]);
+ bloomFilter = new BloomFilter(INPUT_SIZE, byteSize);
+ bloomFilter.setBitsLocation(memorySegment, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBloomFilterArguments1() {
+ new BloomFilter(-1, 128);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBloomFilterArguments2() {
+ new BloomFilter(0, 128);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBloomFilterArguments3() {
+ new BloomFilter(1024, -1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBloomFilterArguments4() {
+ new BloomFilter(1024, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBloomFilterArguments5() {
+ new BloomFilter(1024, 21);
+ }
+
+ @Test
+ public void testBloomNumBits() {
+ assertEquals(0, BloomFilter.optimalNumOfBits(0, 0));
+ assertEquals(0, BloomFilter.optimalNumOfBits(0, 1));
+ assertEquals(0, BloomFilter.optimalNumOfBits(1, 1));
+ assertEquals(7, BloomFilter.optimalNumOfBits(1, 0.03));
+ assertEquals(72, BloomFilter.optimalNumOfBits(10, 0.03));
+ assertEquals(729, BloomFilter.optimalNumOfBits(100, 0.03));
+ assertEquals(7298, BloomFilter.optimalNumOfBits(1000, 0.03));
+ assertEquals(72984, BloomFilter.optimalNumOfBits(10000, 0.03));
+ assertEquals(729844, BloomFilter.optimalNumOfBits(100000, 0.03));
+ assertEquals(7298440, BloomFilter.optimalNumOfBits(1000000, 0.03));
+ assertEquals(6235224, BloomFilter.optimalNumOfBits(1000000, 0.05));
+ }
+
+ @Test
+ public void testBloomFilterNumHashFunctions() {
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(-1, -1));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(0, 0));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10, 0));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10, 10));
+ assertEquals(7, BloomFilter.optimalNumOfHashFunctions(10, 100));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(100, 100));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(1000, 100));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10000, 100));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(100000, 100));
+ assertEquals(1, BloomFilter.optimalNumOfHashFunctions(1000000, 100));
+ }
+
+ @Test
+ public void testBloomFilterFalsePositiveProbability() {
+ assertEquals(7298440, BloomFilter.optimalNumOfBits(1000000, 0.03));
+ assertEquals(6235224, BloomFilter.optimalNumOfBits(1000000, 0.05));
+ assertEquals(4792529, BloomFilter.optimalNumOfBits(1000000, 0.1));
+ assertEquals(3349834, BloomFilter.optimalNumOfBits(1000000, 0.2));
+ assertEquals(2505911, BloomFilter.optimalNumOfBits(1000000, 0.3));
+ assertEquals(1907139, BloomFilter.optimalNumOfBits(1000000, 0.4));
+
+ // Make sure the estimated fpp error is less than 1%.
+ assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 7298440) - 0.03) < 0.01);
+ assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 6235224) - 0.05) < 0.01);
+ assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 4792529) - 0.1) < 0.01);
+ assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 3349834) - 0.2) < 0.01);
+ assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 2505911) - 0.3) < 0.01);
+ assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 1907139) - 0.4) < 0.01);
+ }
+
+ @Test
+ public void testHashcodeInput() {
+ bloomFilter.reset();
+ int val1 = "val1".hashCode();
+ int val2 = "val2".hashCode();
+ int val3 = "val3".hashCode();
+ int val4 = "val4".hashCode();
+ int val5 = "val5".hashCode();
+
+ assertFalse(bloomFilter.testHash(val1));
+ assertFalse(bloomFilter.testHash(val2));
+ assertFalse(bloomFilter.testHash(val3));
+ assertFalse(bloomFilter.testHash(val4));
+ assertFalse(bloomFilter.testHash(val5));
+ bloomFilter.addHash(val1);
+ assertTrue(bloomFilter.testHash(val1));
+ assertFalse(bloomFilter.testHash(val2));
+ assertFalse(bloomFilter.testHash(val3));
+ assertFalse(bloomFilter.testHash(val4));
+ assertFalse(bloomFilter.testHash(val5));
+ bloomFilter.addHash(val2);
+ assertTrue(bloomFilter.testHash(val1));
+ assertTrue(bloomFilter.testHash(val2));
+ assertFalse(bloomFilter.testHash(val3));
+ assertFalse(bloomFilter.testHash(val4));
+ assertFalse(bloomFilter.testHash(val5));
+ bloomFilter.addHash(val3);
+ assertTrue(bloomFilter.testHash(val1));
+ assertTrue(bloomFilter.testHash(val2));
+ assertTrue(bloomFilter.testHash(val3));
+ assertFalse(bloomFilter.testHash(val4));
+ assertFalse(bloomFilter.testHash(val5));
+ bloomFilter.addHash(val4);
+ assertTrue(bloomFilter.testHash(val1));
+ assertTrue(bloomFilter.testHash(val2));
+ assertTrue(bloomFilter.testHash(val3));
+ assertTrue(bloomFilter.testHash(val4));
+ assertFalse(bloomFilter.testHash(val5));
+ bloomFilter.addHash(val5);
+ assertTrue(bloomFilter.testHash(val1));
+ assertTrue(bloomFilter.testHash(val2));
+ assertTrue(bloomFilter.testHash(val3));
+ assertTrue(bloomFilter.testHash(val4));
+ assertTrue(bloomFilter.testHash(val5));
+ }
+}
\ No newline at end of file