You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/18 04:20:14 UTC
[flink] branch master updated: [FLINK-13161][table-blink-runtime]
Fix calculation of numBuckets in BinaryHashBucketArea
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 18ace00 [FLINK-13161][table-blink-runtime] Fix calculation of numBuckets in BinaryHashBucketArea
18ace00 is described below
commit 18ace00455aeba00ac459623c46f762900981aec
Author: LouisXu <xu...@163.com>
AuthorDate: Tue Jul 9 13:05:59 2019 +0800
[FLINK-13161][table-blink-runtime] Fix calculation of numBuckets in BinaryHashBucketArea
The original calcucation may lead to some requested segments are wasted.
This closes #9027
---
.../flink/table/runtime/hashtable/BinaryHashBucketArea.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
index 9665b30..e44cc81 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
@@ -164,9 +164,9 @@ public class BinaryHashBucketArea {
this.size = 0;
int minNumBuckets = (int) Math.ceil((estimatedRowCount / loadFactor / NUM_ENTRIES_PER_BUCKET));
- int bucketNumSegs = Math.max(1, Math.min(maxSegs, (minNumBuckets >>> table.bucketsPerSegmentBits) +
- ((minNumBuckets & table.bucketsPerSegmentMask) == 0 ? 0 : 1)));
- int numBuckets = MathUtils.roundDownToPowerOf2(bucketNumSegs << table.bucketsPerSegmentBits);
+ int bucketNumSegs = MathUtils.roundDownToPowerOf2(Math.max(1, Math.min(maxSegs, (minNumBuckets >>> table.bucketsPerSegmentBits) +
+ ((minNumBuckets & table.bucketsPerSegmentMask) == 0 ? 0 : 1))));
+ int numBuckets = bucketNumSegs << table.bucketsPerSegmentBits;
int threshold = (int) (numBuckets * NUM_ENTRIES_PER_BUCKET * loadFactor);
@@ -185,7 +185,7 @@ public class BinaryHashBucketArea {
private void setNewBuckets(MemorySegment[] buckets, int numBuckets, int threshold) {
this.buckets = buckets;
- checkArgument(MathUtils.isPowerOf2(numBuckets));
+ checkArgument(MathUtils.isPowerOf2(buckets.length));
this.numBuckets = numBuckets;
this.numBucketsMask = numBuckets - 1;
this.overflowSegments = new MemorySegment[2];
@@ -203,7 +203,7 @@ public class BinaryHashBucketArea {
int oldNumBuckets = numBuckets;
MemorySegment[] oldOverflowSegments = overflowSegments;
int newNumSegs = oldBuckets.length * 2;
- int newNumBuckets = MathUtils.roundDownToPowerOf2(newNumSegs << table.bucketsPerSegmentBits);
+ int newNumBuckets = newNumSegs << table.bucketsPerSegmentBits;
int newThreshold = (int) (newNumBuckets * NUM_ENTRIES_PER_BUCKET * loadFactor);
// We can't resize if not spillingAllowed and there are not enough buffers.