You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/18 04:20:14 UTC

[flink] branch master updated: [FLINK-13161][table-blink-runtime] Fix calculation of numBuckets in BinaryHashBucketArea

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 18ace00  [FLINK-13161][table-blink-runtime] Fix calculation of numBuckets in BinaryHashBucketArea
18ace00 is described below

commit 18ace00455aeba00ac459623c46f762900981aec
Author: LouisXu <xu...@163.com>
AuthorDate: Tue Jul 9 13:05:59 2019 +0800

    [FLINK-13161][table-blink-runtime] Fix calculation of numBuckets in BinaryHashBucketArea
    
    The original calcucation may lead to some requested segments are wasted.
    
    This closes #9027
---
 .../flink/table/runtime/hashtable/BinaryHashBucketArea.java    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
index 9665b30..e44cc81 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
@@ -164,9 +164,9 @@ public class BinaryHashBucketArea {
 		this.size = 0;
 
 		int minNumBuckets = (int) Math.ceil((estimatedRowCount / loadFactor / NUM_ENTRIES_PER_BUCKET));
-		int bucketNumSegs = Math.max(1, Math.min(maxSegs, (minNumBuckets >>> table.bucketsPerSegmentBits) +
-				((minNumBuckets & table.bucketsPerSegmentMask) == 0 ? 0 : 1)));
-		int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs << table.bucketsPerSegmentBits);
+		int bucketNumSegs = MathUtils.roundDownToPowerOf2(Math.max(1, Math.min(maxSegs, (minNumBuckets >>> table.bucketsPerSegmentBits) +
+				((minNumBuckets & table.bucketsPerSegmentMask) == 0 ? 0 : 1))));
+		int numBuckets = bucketNumSegs << table.bucketsPerSegmentBits;
 
 		int threshold = (int) (numBuckets * NUM_ENTRIES_PER_BUCKET * loadFactor);
 
@@ -185,7 +185,7 @@ public class BinaryHashBucketArea {
 
 	private void setNewBuckets(MemorySegment[] buckets, int numBuckets, int threshold) {
 		this.buckets = buckets;
-		checkArgument(MathUtils.isPowerOf2(numBuckets));
+		checkArgument(MathUtils.isPowerOf2(buckets.length));
 		this.numBuckets = numBuckets;
 		this.numBucketsMask = numBuckets - 1;
 		this.overflowSegments = new MemorySegment[2];
@@ -203,7 +203,7 @@ public class BinaryHashBucketArea {
 		int oldNumBuckets = numBuckets;
 		MemorySegment[] oldOverflowSegments = overflowSegments;
 		int newNumSegs = oldBuckets.length * 2;
-		int newNumBuckets = MathUtils.roundDownToPowerOf2(newNumSegs << table.bucketsPerSegmentBits);
+		int newNumBuckets = newNumSegs << table.bucketsPerSegmentBits;
 		int newThreshold = (int) (newNumBuckets * NUM_ENTRIES_PER_BUCKET * loadFactor);
 
 		// We can't resize if not spillingAllowed and there are not enough buffers.