You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/06 18:52:17 UTC

[3/5] flink git commit: [FLINK-2240] Add bloom filter to filter probe records during hash join.

[FLINK-2240] Add bloom filter to filter probe records during hash join.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61dcae39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61dcae39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61dcae39

Branch: refs/heads/master
Commit: 61dcae391cb3b45ba3aff47d4d9163889d2958a4
Parents: 685086a
Author: chengxiang li <ch...@intel.com>
Authored: Fri Jul 3 23:53:47 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 6 17:14:39 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  13 +-
 .../operators/hash/MutableHashTable.java        | 145 +++++++++-
 .../runtime/operators/util/BloomFilter.java     | 226 ++++++++++++++++
 .../MutableHashTablePerformanceBenchmark.java   | 268 +++++++++++++++++++
 .../runtime/operators/util/BloomFilterTest.java | 162 +++++++++++
 5 files changed, 806 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index c76741b..dad2d99 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -178,7 +178,7 @@ public final class ConfigConstants {
 	 * for hybrid hash joins. 
 	 */
 	public static final String DEFAULT_SPILLING_MAX_FAN_KEY = "taskmanager.runtime.max-fan";
-
+	
 	/**
 	 * Key for the default spilling threshold. When more than the threshold memory of the sort buffers is full, the
 	 * sorter will start spilling to disk.
@@ -190,6 +190,12 @@ public final class ConfigConstants {
 	 * A value of 0 indicates infinite waiting.
 	 */
 	public static final String FS_STREAM_OPENING_TIMEOUT_KEY = "taskmanager.runtime.fs_timeout";
+	
+	/**
+	 * While spill probe record to disk during probe phase, whether enable bloom filter to filter the probe records
+	 * to minimize the spilled probe records count.
+	 */
+	public static final String HASHJOIN_ENABLE_BLOOMFILTER = "hashjoin.bloomfilter.enabled";
 
 	// ------------------------ YARN Configuration ------------------------
 
@@ -552,6 +558,11 @@ public final class ConfigConstants {
 	 * The default timeout for filesystem stream opening: infinite (means max long milliseconds).
 	 */
 	public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0;
+	
+	/**
+	 * Enable bloom filter for hash join as it promote hash join performance most of the time.
+	 */
+	public static final boolean DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER = true;
 
 	// ------------------------ YARN Configuration ------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 7f07cfb..4a57986 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.hash;
 
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.iterative.io.HashPartitionIterator;
+import org.apache.flink.runtime.operators.util.BloomFilter;
 import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -194,6 +195,11 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 * Constant for the bucket status, indicating that the bucket is in memory.
 	 */
 	private static final byte BUCKET_STATUS_IN_MEMORY = 0;
+
+	/**
+	 * Constant for the bucket status, indicating that the bucket has filter.
+	 */
+	private static final byte BUCKET_STATUS_IN_FILTER = 1;
 	
 	// ------------------------------------------------------------------------
 	//                              Members
@@ -348,6 +354,8 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	
 	private boolean running = true;
 
+	private BloomFilter bloomFilter;
+	
 	// ------------------------------------------------------------------------
 	//                         Construction and Teardown
 	// ------------------------------------------------------------------------
@@ -469,12 +477,19 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 				this.recordComparator.setReference(next);
 				this.bucketIterator.set(bucket, p.overflowSegments, p, hash, bucketInSegmentOffset);
 				return true;
-			}
-			else {
-				p.insertIntoProbeBuffer(next);
+			} else {
+				byte status = bucket.get(bucketInSegmentOffset + HEADER_STATUS_OFFSET);
+				if (status == BUCKET_STATUS_IN_FILTER) {
+					this.bloomFilter.setBitsLocation(bucket, bucketInSegmentOffset + BUCKET_HEADER_LENGTH);
+					// Use BloomFilter to filter out all the probe records which would not match any key in spilled build table buckets.
+					if (this.bloomFilter.testHash(hash)) {
+						p.insertIntoProbeBuffer(next);
+					}
+				} else {
+					p.insertIntoProbeBuffer(next);
+				}
 			}
 		}
-		
 		// -------------- partition done ---------------
 		
 		return false;
@@ -710,6 +725,27 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 			p.finalizeBuildPhase(this.ioManager, this.currentEnumerator, this.writeBehindBuffers);
 		}
 	}
+
+	private void initBloomFilter(int numBuckets) {
+		int avgNumRecordsPerBucket = getEstimatedMaxBucketEntries(this.availableMemory.size(), this.segmentSize,
+			numBuckets, this.avgRecordLen);
+		// Assign all bucket size to bloom filter except bucket header length.
+		int byteSize = HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH;
+		this.bloomFilter = new BloomFilter(avgNumRecordsPerBucket, byteSize);
+		if (LOG.isDebugEnabled()) {
+			double fpp = BloomFilter.estimateFalsePositiveProbability(avgNumRecordsPerBucket, byteSize << 3);
+			LOG.debug(String.format("Create BloomFilter with average input entries per bucket[%d], bytes size[%d], false positive probability[%f].",
+				avgNumRecordsPerBucket, byteSize, fpp));
+		}
+	}
+
+	final private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
+		final long totalSize = ((long) bufferSize) * numBuffers;
+		final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
+		final long maxNumRecordsStorable = (MAX_RECURSION_DEPTH + 1) * numRecordsStorable;
+		final long maxNumRecordsPerBucket = maxNumRecordsStorable / numBuckets;
+		return (int) maxNumRecordsPerBucket;
+	}
 	
 	/**
 	 * @param p
@@ -816,7 +852,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 				final int hashCode = hash(btComparator.hash(rec), nextRecursionLevel);
 				insertIntoTable(rec, hashCode);
 			}
-
+			
 			// finalize the partitions
 			for (int i = 0; i < this.partitionsBeingBuilt.size(); i++) {
 				HashPartition<BT, PT> part = this.partitionsBeingBuilt.get(i);
@@ -853,6 +889,14 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		if (pointer != -1) {
 			// record was inserted into an in-memory partition. a pointer must be inserted into the buckets
 			insertBucketEntry(p, bucket, bucketInSegmentPos, hashCode, pointer);
+		} else {
+			byte status = bucket.get(bucketInSegmentPos + HEADER_STATUS_OFFSET);
+			if (status == BUCKET_STATUS_IN_FILTER) {
+				// While partition has been spilled, relocation bloom filter bits for current bucket,
+				// and build bloom filter with hashcode.
+				this.bloomFilter.setBitsLocation(bucket, bucketInSegmentPos + BUCKET_HEADER_LENGTH);
+				this.bloomFilter.addHash(hashCode);
+			}
 		}
 	}
 	
@@ -1047,6 +1091,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 		this.buckets = table;
 		this.numBuckets = numBuckets;
+		
+		boolean enableBloomFilter = GlobalConfiguration.getBoolean(
+			ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
+		if (enableBloomFilter) {
+			initBloomFilter(numBuckets);
+		}
 	}
 	
 	/**
@@ -1088,6 +1138,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 		final HashPartition<BT, PT> p = partitions.get(largestPartNum);
 		
+		boolean enableBloomFilter = GlobalConfiguration.getBoolean(
+			ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, ConfigConstants.DEAFULT_HASHJOIN_ENABLE_BLOOMFILTER);
+		if (enableBloomFilter) {
+			buildBloomFilterForBucketsInPartition(largestPartNum, p);
+		}
+		
 		// spill the partition
 		int numBuffersFreed = p.spillPartition(this.availableMemory, this.ioManager, 
 										this.currentEnumerator.next(), this.writeBehindBuffers);
@@ -1101,6 +1157,81 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		return largestPartNum;
 	}
 	
+	final protected void buildBloomFilterForBucketsInPartition(int partNum, HashPartition<BT, PT> partition) {
+		// Find all the buckets which belongs to this partition, and build bloom filter for each bucket(include its overflow buckets).
+		final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
+		for (MemorySegment segment : this.buckets) {
+			for (int i = 0; i < bucketsPerSegment; i++) {
+				final int bucketInSegmentOffset = i * HASH_BUCKET_SIZE;
+				byte partitionNumber = segment.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
+				if (partitionNumber == partNum) {
+					byte status = segment.get(bucketInSegmentOffset + HEADER_STATUS_OFFSET);
+					if (status == BUCKET_STATUS_IN_MEMORY) {
+						buildBloomFilterForBucket(bucketInSegmentOffset, segment, partition);
+					}
+				}
+			}
+		}
+	}
+	
+	/**
+	 * Set all the bucket memory except bucket header as the bit set of bloom filter, and use hash code of build records
+	 * to build bloom filter.
+	 *
+	 * @param bucketInSegmentPos
+	 * @param bucket
+	 * @param p
+	 */
+	final void buildBloomFilterForBucket(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
+		final int count = bucket.getShort(bucketInSegmentPos + HEADER_COUNT_OFFSET);
+		int[] hashCodes = new int[count];
+		// As the hashcode and bloom filter occupy same bytes, so we read all hashcode out at first and then write back to bloom filter.
+		for (int i = 0; i < count; i++) {
+			hashCodes[i] = bucket.getInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + i * HASH_CODE_LEN);
+		}
+		this.bloomFilter.setBitsLocation(bucket, bucketInSegmentPos + BUCKET_HEADER_LENGTH);
+		for (int hashCode : hashCodes) {
+			this.bloomFilter.addHash(hashCode);
+		}
+		buildBloomFilterForExtraOverflowSegments(bucketInSegmentPos, bucket, p);
+	}
+	
+	final private void buildBloomFilterForExtraOverflowSegments(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
+		int totalCount = 0;
+		boolean skip = false;
+		long forwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
+		while (forwardPointer != BUCKET_FORWARD_POINTER_NOT_SET) {
+			final int overflowSegNum = (int) (forwardPointer >>> 32);
+			if (overflowSegNum < 0 || overflowSegNum >= p.numOverflowSegments) {
+				skip = true;
+				break;
+			}
+			MemorySegment overflowSegment = p.overflowSegments[overflowSegNum];
+			int bucketInOverflowSegmentOffset = (int) (forwardPointer & 0xffffffff);
+			
+			final int count = overflowSegment.getShort(bucketInOverflowSegmentOffset + HEADER_COUNT_OFFSET);
+			totalCount += count;
+			// The bits size of bloom filter per bucket is 112 * 8, while expected input entries is greater than 2048, the fpp would higher than 0.9,
+			// which make the bloom filter an overhead instead of optimization.
+			if (totalCount > 2048) {
+				skip = true;
+				break;
+			}
+			
+			for (int i = 0; i < count; i++) {
+				int hashCode = overflowSegment.getInt(bucketInOverflowSegmentOffset + BUCKET_HEADER_LENGTH + i * HASH_CODE_LEN);
+				this.bloomFilter.addHash(hashCode);
+			}
+			
+			forwardPointer = overflowSegment.getLong(bucketInOverflowSegmentOffset + HEADER_FORWARD_OFFSET);
+			
+		}
+		
+		if (!skip) {
+			bucket.put(bucketInSegmentPos + HEADER_STATUS_OFFSET, BUCKET_STATUS_IN_FILTER);
+		}
+	}
+
 	/**
 	 * This method makes sure that at least a certain number of memory segments is in the list of free segments.
 	 * Free memory can be in the list of free segments, or in the return-queue where segments used to write behind are

http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
new file mode 100644
index 0000000..947a56b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.core.memory.MemorySegment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * BloomFilter is a probabilistic data structure for set membership check. BloomFilters are
+ * highly space efficient when compared to using a HashSet. Because of the probabilistic nature of
+ * bloom filter false positive (element not present in bloom filter but test() says true) are
+ * possible but false negatives are not possible (if element is present then test() will never
+ * say false). The false positive probability is configurable depending on which storage requirement
+ * may increase or decrease. Lower the false positive probability greater is the space requirement.
+ * Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
+ * During the creation of bloom filter expected number of entries must be specified. If the number
+ * of insertions exceed the specified initial number of entries then false positive probability will
+ * increase accordingly.
+ * <p/>
+ * Internally, this implementation of bloom filter uses MemorySegment to store BitSet, BloomFilter and
+ * BitSet are designed to be able to switch between different MemorySegments, so that Flink can share
+ * the same BloomFilter/BitSet object instance for different bloom filters.
+ * <p/>
+ * Part of this class refers to the implementation from Apache Hive project
+ * https://github.com/apache/hive/blob/master/common/src/java/org/apache/hive/common/util/BloomFilter.java
+ */
+
+public class BloomFilter {
+	
+	protected BitSet bitSet;
+	protected int expectedEntries;
+	protected int numHashFunctions;
+	
+	public BloomFilter(int expectedEntries, int byteSize) {
+		checkArgument(expectedEntries > 0, "expectedEntries should be > 0");
+		this.expectedEntries = expectedEntries;
+		this.numHashFunctions = optimalNumOfHashFunctions(expectedEntries, byteSize << 3);
+		this.bitSet = new BitSet(byteSize);
+	}
+	
+	public void setBitsLocation(MemorySegment memorySegment, int offset) {
+		this.bitSet.setMemorySegment(memorySegment, offset);
+	}
+	
+	/**
+	 * Compute optimal bits number with given input entries and expected false positive probability.
+	 *
+	 * @param inputEntries
+	 * @param fpp
+	 * @return optimal bits number
+	 */
+	public static int optimalNumOfBits(long inputEntries, double fpp) {
+		int numBits = (int) (-inputEntries * Math.log(fpp) / (Math.log(2) * Math.log(2)));
+		return numBits;
+	}
+	
+	/**
+	 * Compute the false positive probability based on given input entries and bits size.
+	 * Note: this is just the math expected value, you should not expect the fpp in real case would under the return value for certain.
+	 *
+	 * @param inputEntries
+	 * @param bitSize
+	 * @return
+	 */
+	public static double estimateFalsePositiveProbability(long inputEntries, int bitSize) {
+		int numFunction = optimalNumOfHashFunctions(inputEntries, bitSize);
+		double p = Math.pow(Math.E, -(double) numFunction * inputEntries / bitSize);
+		double estimatedFPP = Math.pow(1 - p, numFunction);
+		return estimatedFPP;
+	}
+	
+	/**
+	 * compute the optimal hash function number with given input entries and bits size, which would
+	 * make the false positive probability lowest.
+	 *
+	 * @param expectEntries
+	 * @param bitSize
+	 * @return hash function number
+	 */
+	static int optimalNumOfHashFunctions(long expectEntries, long bitSize) {
+		return Math.max(1, (int) Math.round((double) bitSize / expectEntries * Math.log(2)));
+	}
+	
+	public void addHash(int hash32) {
+		int hash1 = hash32;
+		int hash2 = hash32 >>> 16;
+		
+		for (int i = 1; i <= numHashFunctions; i++) {
+			int combinedHash = hash1 + (i * hash2);
+			// hashcode should be positive, flip all the bits if it's negative
+			if (combinedHash < 0) {
+				combinedHash = ~combinedHash;
+			}
+			int pos = combinedHash % bitSet.bitSize();
+			bitSet.set(pos);
+		}
+	}
+		
+	public boolean testHash(int hash32) {
+		int hash1 = hash32;
+		int hash2 = hash32 >>> 16;
+		
+		for (int i = 1; i <= numHashFunctions; i++) {
+			int combinedHash = hash1 + (i * hash2);
+			// hashcode should be positive, flip all the bits if it's negative
+			if (combinedHash < 0) {
+				combinedHash = ~combinedHash;
+			}
+			int pos = combinedHash % bitSet.bitSize();
+			if (!bitSet.get(pos)) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+	public void reset() {
+		this.bitSet.clear();
+	}
+	
+	@Override
+	public String toString() {
+		StringBuilder output = new StringBuilder();
+		output.append("BloomFilter:\n");
+		output.append("\thash function number:").append(numHashFunctions).append("\n");
+		output.append(bitSet);
+		return output.toString();
+	}
+	
+	/**
+	 * Bare metal bit set implementation. For performance reasons, this implementation does not check
+	 * for index bounds nor expand the bit set size if the specified index is greater than the size.
+	 */
+	public class BitSet {
+		private MemorySegment memorySegment;
+		// MemorySegment byte array offset.
+		private int offset;
+		// MemorySegment byte size.
+		private int length;
+		private final int LONG_POSITION_MASK = 0xffffffc0;
+		
+		public BitSet(int byteSize) {
+			Preconditions.checkArgument(byteSize > 0, "bits size should be greater than 0.");
+			Preconditions.checkArgument(byteSize << 29 == 0, "bytes size should be integral multiple of long size(8 Bytes).");
+			this.length = byteSize;
+		}
+		
+		public void setMemorySegment(MemorySegment memorySegment, int offset) {
+			this.memorySegment = memorySegment;
+			this.offset = offset;
+		}
+		
+		/**
+		 * Sets the bit at specified index.
+		 *
+		 * @param index - position
+		 */
+		public void set(int index) {
+			int longIndex = (index & LONG_POSITION_MASK) >>> 3;
+			long current = memorySegment.getLong(offset + longIndex);
+			current |= (1L << index);
+			memorySegment.putLong(offset + longIndex, current);
+		}
+		
+		/**
+		 * Returns true if the bit is set in the specified index.
+		 *
+		 * @param index - position
+		 * @return - value at the bit position
+		 */
+		public boolean get(int index) {
+			int longIndex = (index & LONG_POSITION_MASK) >>> 3;
+			long current = memorySegment.getLong(offset + longIndex);
+			return (current & (1L << index)) != 0;
+		}
+		
+		/**
+		 * Number of bits
+		 */
+		public int bitSize() {
+			return length << 3;
+		}
+		
+		public MemorySegment getMemorySegment() {
+			return this.memorySegment;
+		}
+		
+		/**
+		 * Clear the bit set.
+		 */
+		public void clear() {
+			long zeroValue = 0L;
+			for (int i = 0; i < (length / 8); i++) {
+				memorySegment.putLong(offset + i * 8, zeroValue);
+			}
+		}
+		
+		@Override
+		public String toString() {
+			StringBuilder output = new StringBuilder();
+			output.append("BitSet:\n");
+			output.append("\tMemorySegment:").append(memorySegment.size()).append("\n");
+			output.append("\tOffset:").append(offset).append("\n");
+			output.append("\tLength:").append(length).append("\n");
+			return output.toString();
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
new file mode 100644
index 0000000..452e4c1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.types.StringPair;
+import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
+import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+public class MutableHashTablePerformanceBenchmark {
+	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+	
+	private MemoryManager memManager;
+	private IOManager ioManager;
+	
+	private TypeSerializer<StringPair> pairBuildSideAccesssor;
+	private TypeSerializer<StringPair> pairProbeSideAccesssor;
+	private TypeComparator<StringPair> pairBuildSideComparator;
+	private TypeComparator<StringPair> pairProbeSideComparator;
+	private TypePairComparator<StringPair, StringPair> pairComparator;
+	
+	private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char.";
+	
+	
+	@Before
+	public void setup() {
+		this.pairBuildSideAccesssor = new StringPairSerializer();
+		this.pairProbeSideAccesssor = new StringPairSerializer();
+		this.pairBuildSideComparator = new StringPairComparator();
+		this.pairProbeSideComparator = new StringPairComparator();
+		this.pairComparator = new StringPairPairComparator();
+		
+		this.memManager = new DefaultMemoryManager(64 * 1024 * 1024, 1);
+		this.ioManager = new IOManagerAsync();
+	}
+	
+	@After
+	public void tearDown() {
+		// shut down I/O manager and Memory Manager and verify the correct shutdown
+		this.ioManager.shutdown();
+		if (!this.ioManager.isProperlyShutDown()) {
+			fail("I/O manager was not property shut down.");
+		}
+		if (!this.memManager.verifyEmpty()) {
+			fail("Not all memory was properly released to the memory manager --> Memory Leak.");
+		}
+	}
+	
+	@Test
+	public void compareMutableHashTablePerformance1() throws IOException {
+		// ----------------------------------------------90% filtered during probe spill phase-----------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 10;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = 500000;
+		
+		long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+		long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+		
+		System.out.println("HybridHashJoin2:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+		System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+	}
+	
+	@Test
+	public void compareMutableHashTablePerformance2() throws IOException {
+		// ----------------------------------------------80% filtered during probe spill phase-----------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 5;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = 1000000;
+		
+		long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+		long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+		
+		System.out.println("HybridHashJoin3:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+		System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+	}
+	
+	@Test
+	public void compareMutableHashTablePerformance3() throws IOException {
+		// ----------------------------------------------50% filtered during probe spill phase-------------------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 2;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = 2500000;
+		
+		long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+		long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+		
+		System.out.println("HybridHashJoin4:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+		System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+	}
+	
+	@Test
+	public void compareMutableHashTablePerformance4() throws IOException {
+		// ----------------------------------------------0% filtered during probe spill phase-----------------------------------------
+		// create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records.
+		int buildSize = 1000000;
+		int buildStep = 1;
+		int buildScope = buildStep * buildSize;
+		// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
+		int probeSize = 5000000;
+		int probeStep = 1;
+		int probeScope = buildSize;
+		
+		int expectedResult = probeSize / buildStep;
+		
+		long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
+		long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
+		
+		System.out.println("HybridHashJoin5:");
+		System.out.println("Build input size: " + 100 * buildSize);
+		System.out.println("Probe input size: " + 100 * probeSize);
+		System.out.println("Available memory: " + this.memManager.getMemorySize());
+		System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
+		System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
+	}
+	
+	private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize,
+		int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException {
+		
+		InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
+		InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
+		
+		Configuration conf = new Configuration();
+		conf.setBoolean(ConfigConstants.HASHJOIN_ENABLE_BLOOMFILTER, enableBloomFilter);
+		GlobalConfiguration.includeConfiguration(conf);
+		
+		// allocate the memory for the HashTable
+		List<MemorySegment> memSegments;
+		try {
+			// 33 is minimum number of pages required to perform hash join this inputs
+			memSegments = this.memManager.allocatePages(MEM_OWNER, (int) (this.memManager.getMemorySize() / this.memManager.getPageSize()));
+		} catch (MemoryAllocationException maex) {
+			fail("Memory for the Join could not be provided.");
+			return -1;
+		}
+		
+		// ----------------------------------------------------------------------------------------
+		
+		long start = System.currentTimeMillis();
+		final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
+			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
+			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
+			memSegments, ioManager);
+		join.open(buildIterator, probeIterator);
+		
+		final StringPair recordReuse = new StringPair();
+		int numRecordsInJoinResult = 0;
+		
+		while (join.nextRecord()) {
+			MutableHashTable.HashBucketIterator<StringPair, StringPair> buildSide = join.getBuildSideIterator();
+			while (buildSide.next(recordReuse) != null) {
+				numRecordsInJoinResult++;
+			}
+		}
+		Assert.assertEquals("Wrong number of records in join result.", expectedResultSize, numRecordsInJoinResult);
+		
+		join.close();
+		long cost = System.currentTimeMillis() - start;
+		// ----------------------------------------------------------------------------------------
+		
+		this.memManager.release(join.getFreedMemory());
+		return cost;
+	}
+	
+	
+	static class InputIterator implements MutableObjectIterator<StringPair> {
+		
+		private int numLeft;
+		private int distance;
+		private int scope;
+		
+		public InputIterator(int size, int distance, int scope) {
+			this.numLeft = size;
+			this.distance = distance;
+			this.scope = scope;
+		}
+		
+		@Override
+		public StringPair next(StringPair reuse) throws IOException {
+			if (this.numLeft > 0) {
+				numLeft--;
+				int currentKey = (numLeft * distance) % scope;
+				reuse.setKey(Integer.toString(currentKey));
+				reuse.setValue(COMMENT);
+				return reuse;
+			} else {
+				return null;
+			}
+		}
+		
+		@Override
+		public StringPair next() throws IOException {
+			return next(new StringPair());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/61dcae39/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
new file mode 100644
index 0000000..cbbeca0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class BloomFilterTest {
+	
+	private static BloomFilter bloomFilter;
+	private static final int INPUT_SIZE = 1024;
+	private static final double FALSE_POSITIVE_PROBABILITY = 0.05;
+	
+	@BeforeClass
+	public static void init() {
+		int bitsSize = BloomFilter.optimalNumOfBits(INPUT_SIZE, FALSE_POSITIVE_PROBABILITY);
+		bitsSize = bitsSize + (Long.SIZE - (bitsSize % Long.SIZE));
+		int byteSize = bitsSize >>> 3;
+		MemorySegment memorySegment = new MemorySegment(new byte[byteSize]);
+		bloomFilter = new BloomFilter(INPUT_SIZE, byteSize);
+		bloomFilter.setBitsLocation(memorySegment, 0);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testBloomFilterArguments1() {
+		new BloomFilter(-1, 128);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testBloomFilterArguments2() {
+		new BloomFilter(0, 128);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testBloomFilterArguments3() {
+		new BloomFilter(1024, -1);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testBloomFilterArguments4() {
+		new BloomFilter(1024, 0);
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testBloomFilterArguments5() {
+		new BloomFilter(1024, 21);
+	}
+	
+	@Test
+	public void testBloomNumBits() {
+		assertEquals(0, BloomFilter.optimalNumOfBits(0, 0));
+		assertEquals(0, BloomFilter.optimalNumOfBits(0, 1));
+		assertEquals(0, BloomFilter.optimalNumOfBits(1, 1));
+		assertEquals(7, BloomFilter.optimalNumOfBits(1, 0.03));
+		assertEquals(72, BloomFilter.optimalNumOfBits(10, 0.03));
+		assertEquals(729, BloomFilter.optimalNumOfBits(100, 0.03));
+		assertEquals(7298, BloomFilter.optimalNumOfBits(1000, 0.03));
+		assertEquals(72984, BloomFilter.optimalNumOfBits(10000, 0.03));
+		assertEquals(729844, BloomFilter.optimalNumOfBits(100000, 0.03));
+		assertEquals(7298440, BloomFilter.optimalNumOfBits(1000000, 0.03));
+		assertEquals(6235224, BloomFilter.optimalNumOfBits(1000000, 0.05));
+	}
+	
+	@Test
+	public void testBloomFilterNumHashFunctions() {
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(-1, -1));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(0, 0));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10, 0));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10, 10));
+		assertEquals(7, BloomFilter.optimalNumOfHashFunctions(10, 100));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(100, 100));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(1000, 100));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(10000, 100));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(100000, 100));
+		assertEquals(1, BloomFilter.optimalNumOfHashFunctions(1000000, 100));
+	}
+	
+	@Test
+	public void testBloomFilterFalsePositiveProbability() {
+		assertEquals(7298440, BloomFilter.optimalNumOfBits(1000000, 0.03));
+		assertEquals(6235224, BloomFilter.optimalNumOfBits(1000000, 0.05));
+		assertEquals(4792529, BloomFilter.optimalNumOfBits(1000000, 0.1));
+		assertEquals(3349834, BloomFilter.optimalNumOfBits(1000000, 0.2));
+		assertEquals(2505911, BloomFilter.optimalNumOfBits(1000000, 0.3));
+		assertEquals(1907139, BloomFilter.optimalNumOfBits(1000000, 0.4));
+		
+		// Make sure the estimated fpp error is less than 1%.
+		assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 7298440) - 0.03) < 0.01);
+		assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 6235224) - 0.05) < 0.01);
+		assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 4792529) - 0.1) < 0.01);
+		assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 3349834) - 0.2) < 0.01);
+		assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 2505911) - 0.3) < 0.01);
+		assertTrue(Math.abs(BloomFilter.estimateFalsePositiveProbability(1000000, 1907139) - 0.4) < 0.01);
+	}
+	
+	@Test
+	public void testHashcodeInput() {
+		bloomFilter.reset();
+		int val1 = "val1".hashCode();
+		int val2 = "val2".hashCode();
+		int val3 = "val3".hashCode();
+		int val4 = "val4".hashCode();
+		int val5 = "val5".hashCode();
+		
+		assertFalse(bloomFilter.testHash(val1));
+		assertFalse(bloomFilter.testHash(val2));
+		assertFalse(bloomFilter.testHash(val3));
+		assertFalse(bloomFilter.testHash(val4));
+		assertFalse(bloomFilter.testHash(val5));
+		bloomFilter.addHash(val1);
+		assertTrue(bloomFilter.testHash(val1));
+		assertFalse(bloomFilter.testHash(val2));
+		assertFalse(bloomFilter.testHash(val3));
+		assertFalse(bloomFilter.testHash(val4));
+		assertFalse(bloomFilter.testHash(val5));
+		bloomFilter.addHash(val2);
+		assertTrue(bloomFilter.testHash(val1));
+		assertTrue(bloomFilter.testHash(val2));
+		assertFalse(bloomFilter.testHash(val3));
+		assertFalse(bloomFilter.testHash(val4));
+		assertFalse(bloomFilter.testHash(val5));
+		bloomFilter.addHash(val3);
+		assertTrue(bloomFilter.testHash(val1));
+		assertTrue(bloomFilter.testHash(val2));
+		assertTrue(bloomFilter.testHash(val3));
+		assertFalse(bloomFilter.testHash(val4));
+		assertFalse(bloomFilter.testHash(val5));
+		bloomFilter.addHash(val4);
+		assertTrue(bloomFilter.testHash(val1));
+		assertTrue(bloomFilter.testHash(val2));
+		assertTrue(bloomFilter.testHash(val3));
+		assertTrue(bloomFilter.testHash(val4));
+		assertFalse(bloomFilter.testHash(val5));
+		bloomFilter.addHash(val5);
+		assertTrue(bloomFilter.testHash(val1));
+		assertTrue(bloomFilter.testHash(val2));
+		assertTrue(bloomFilter.testHash(val3));
+		assertTrue(bloomFilter.testHash(val4));
+		assertTrue(bloomFilter.testHash(val5));
+	}
+}
\ No newline at end of file