You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/02 15:54:38 UTC

[2/4] flink git commit: [FLINK-2545] add bucket member verification while build bloom filter.

[FLINK-2545] add bucket member verification while build bloom filter.

This closes #1067


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e6e4de5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e6e4de5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e6e4de5

Branch: refs/heads/master
Commit: 2e6e4de5d1d2b5123f4311493763fd84f52779ab
Parents: dd9979f
Author: chengxiang li <ch...@intel.com>
Authored: Thu Aug 27 14:28:38 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 2 14:12:55 2015 +0200

----------------------------------------------------------------------
 .../operators/hash/MutableHashTable.java        | 15 ++++-
 .../runtime/operators/hash/HashTableITCase.java | 59 ++++++++++++++++++++
 2 files changed, 71 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2e6e4de5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 7661808..2ad01aa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -1120,9 +1120,14 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	final protected void buildBloomFilterForBucketsInPartition(int partNum, HashPartition<BT, PT> partition) {
 		// Find all the buckets which belongs to this partition, and build bloom filter for each bucket(include its overflow buckets).
 		final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
-		for (MemorySegment segment : this.buckets) {
-			for (int i = 0; i < bucketsPerSegment; i++) {
-				final int bucketInSegmentOffset = i * HASH_BUCKET_SIZE;
+
+		int numSegs = this.buckets.length;
+		// go over all segments that are part of the table
+		for (int i = 0, bucket = 0; i < numSegs && bucket < numBuckets; i++) {
+			final MemorySegment segment = this.buckets[i];
+			// go over all buckets in the segment
+			for (int k = 0; k < bucketsPerSegment && bucket < numBuckets; k++, bucket++) {
+				final int bucketInSegmentOffset = k * HASH_BUCKET_SIZE;
 				byte partitionNumber = segment.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
 				if (partitionNumber == partNum) {
 					byte status = segment.get(bucketInSegmentOffset + HEADER_STATUS_OFFSET);
@@ -1140,6 +1145,10 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 */
 	final void buildBloomFilterForBucket(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
 		final int count = bucket.getShort(bucketInSegmentPos + HEADER_COUNT_OFFSET);
+		if (count <= 0) {
+			return;
+		}
+
 		int[] hashCodes = new int[count];
 		// As the hashcode and bloom filter occupy same bytes, so we read all hashcode out at first and then write back to bloom filter.
 		for (int i = 0; i < count; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2e6e4de5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index 233fa4d..52f6ffc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -1476,6 +1476,65 @@ public class HashTableITCase {
 
 		this.memManager.release(join.getFreedMemory());
 	}
+
+	@Test
+	public void testBucketsNotFulfillSegment() throws Exception {
+		final int NUM_KEYS = 10000;
+		final int BUILD_VALS_PER_KEY = 3;
+		final int PROBE_VALS_PER_KEY = 10;
+
+		// create a build input that gives 30000 pairs with 3 values sharing the same key
+		MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+
+		// create a probe input that gives 100000 pairs with 10 values sharing a key
+		MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);
+
+		// allocate the memory for the HashTable
+		List<MemorySegment> memSegments;
+		try {
+			// 33 is minimum number of pages required to perform hash join this inputs
+			memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
+		}
+		catch (MemoryAllocationException maex) {
+			fail("Memory for the Join could not be provided.");
+			return;
+		}
+
+		// For FLINK-2545, the buckets data may not fulfill it's buffer, for example, the buffer may contains 256 buckets,
+		// while hash table only assign 250 bucket on it. The unused buffer bytes may contains arbitrary data, which may
+		// influence hash table if forget to skip it. To mock this, put the invalid bucket data(partition=1, inMemory=true, count=-1)
+		// at the end of buffer.
+		for (MemorySegment segment : memSegments) {
+			int newBucketOffset = segment.size() - 128;
+			// initialize the header fields
+			segment.put(newBucketOffset + 0, (byte)0);
+			segment.put(newBucketOffset + 1, (byte)0);
+			segment.putShort(newBucketOffset + 2, (short) -1);
+			segment.putLong(newBucketOffset + 4, ~0x0L);
+		}
+
+		// ----------------------------------------------------------------------------------------
+
+		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
+			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
+			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
+			memSegments, ioManager);
+		join.open(buildInput, probeInput);
+
+		final IntPair recordReuse = new IntPair();
+		int numRecordsInJoinResult = 0;
+
+		while (join.nextRecord()) {
+			HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
+			while (buildSide.next(recordReuse) != null) {
+				numRecordsInJoinResult++;
+			}
+		}
+		Assert.assertEquals("Wrong number of records in join result.", NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);
+
+		join.close();
+		this.memManager.release(join.getFreedMemory());
+	}
 	
 	// ============================================================================================