You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/26 14:32:16 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #1868: [HUDI-1083] Optimization in determining insert bucket location for a given key

nsivabalan commented on a change in pull request #1868:
URL: https://github.com/apache/hudi/pull/1868#discussion_r460534315



##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -272,21 +275,44 @@ public int getPartition(Object key) {
       String partitionPath = keyLocation._1().getPartitionPath();
       List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(partitionPath);
       // pick the target bucket to use based on the weights.
-      double totalWeight = 0.0;
       final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts());
       final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
       final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
-      for (InsertBucket insertBucket : targetBuckets) {
-        totalWeight += insertBucket.weight;
-        if (r <= totalWeight) {
-          return insertBucket.bucketNumber;
-        }
+
+      int index = binarySearch(targetBuckets, r);
+      if (index >= 0) {
+        return targetBuckets.get(index).bucketNumber;
+      }
+
+      if (-1 * index - 1 < targetBuckets.size()) {

Review comment:
       sorry why do we need this? if not found, why not return the first bucket ? 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucket.java
##########
@@ -18,24 +18,31 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.jetbrains.annotations.NotNull;
+
 import java.io.Serializable;
 
 /**
  * Helper class for an insert bucket along with the weight [0.0, 1.0] that defines the amount of incoming inserts that
  * should be allocated to the bucket.
  */
-public class InsertBucket implements Serializable {
+public class InsertBucket implements Serializable, Comparable<InsertBucket> {
 
   int bucketNumber;
-  // fraction of total inserts, that should go into this bucket
-  double weight;
+  // cumulate fraction of total inserts, that should go into this bucket and the previous bucket.
+  double cumulativeWeight;

Review comment:
       Somehow I don't feel comfortable adding cumulative weights to InsertBucket class. Each Insert bucket doesn't have anything to do with strict ordering wrt other buckets. UpsertPartitioner has some ordering for its own purpose. 
   
   Can we try something like this.
   partitionPathToInsertBucketInfo : Map<String, List/Array of Pair of <Insert Bucket number, cumulative weight> > 
   InsertBucketMap: Map of insert bucket number to InsertBucket object. 
   
   getPartition(key)
   will first look at partitionPathToInsertBucketInfo to fetch list of cumulative weights of all insert buckets along with insert bucket nos. Once we found the target bucket number, we can look into other map. 
   
   Or you can store InsertBucket itself in the value of partitionPathToInsertBucketInfo to avoid another look up. Tuple3<cumulativeWeight, InsetBucket>
   
   
   

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -272,21 +275,44 @@ public int getPartition(Object key) {
       String partitionPath = keyLocation._1().getPartitionPath();
       List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(partitionPath);
       // pick the target bucket to use based on the weights.
-      double totalWeight = 0.0;
       final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts());
       final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
       final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
-      for (InsertBucket insertBucket : targetBuckets) {
-        totalWeight += insertBucket.weight;
-        if (r <= totalWeight) {
-          return insertBucket.bucketNumber;
-        }
+
+      int index = binarySearch(targetBuckets, r);

Review comment:
       why not Collections.binarySearch() ? 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
##########
@@ -272,21 +275,44 @@ public int getPartition(Object key) {
       String partitionPath = keyLocation._1().getPartitionPath();
       List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(partitionPath);
       // pick the target bucket to use based on the weights.
-      double totalWeight = 0.0;
       final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts());
       final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
       final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
-      for (InsertBucket insertBucket : targetBuckets) {
-        totalWeight += insertBucket.weight;
-        if (r <= totalWeight) {
-          return insertBucket.bucketNumber;
-        }
+
+      int index = binarySearch(targetBuckets, r);
+      if (index >= 0) {
+        return targetBuckets.get(index).bucketNumber;
+      }
+
+      if (-1 * index - 1 < targetBuckets.size()) {

Review comment:
       also, ideally we should not hit this scenario only. If our bin packing is good, cumulative weight of last entry should be 1.0. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org