You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/08/22 11:41:48 UTC

[hudi] branch master updated: [HUDI-1083] Optimization in determining insert bucket location for a given key (#1868)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d09c02  [HUDI-1083] Optimization in determining insert bucket location for a given key (#1868)
1d09c02 is described below

commit 1d09c02f1cb9a3929995fb6df1bb7d58cdcb3eb3
Author: Shen Hong <sh...@126.com>
AuthorDate: Sat Aug 22 19:41:39 2020 +0800

    [HUDI-1083] Optimization in determining insert bucket location for a given key (#1868)
    
    - To determine insert bucket location for a given key, hudi walks through all insert buckets with O(N) cost, while this patch adds an optimization to make it O(logN).
---
 .../commit/InsertBucketCumulativeWeightPair.java   | 69 ++++++++++++++++++++++
 .../table/action/commit/UpsertPartitioner.java     | 39 +++++++-----
 .../table/action/commit/TestUpsertPartitioner.java | 51 ++++++++++------
 3 files changed, 125 insertions(+), 34 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucketCumulativeWeightPair.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucketCumulativeWeightPair.java
new file mode 100644
index 0000000..11db69a
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucketCumulativeWeightPair.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+/**
+ * Each InsertBucket has a weight, InsertBucketCumulativeWeightPair stored here is the cumulativeWeight of the
+ * InsertBucket. If there are multiple InsertBuckets in a partition, the InsertBuckets are numbered from 1,
+ * the cumulativeWeight of a InsertBucket is the sum of the InsertBucket weights from number 1 to its own number.
+ *
+ * Example, there are three InsertBucket in a partition, each bucketNumber and weight is:
+ * 1) bucketNumber: 1, weight: 0.2
+ * 2) bucketNumber: 2, weight: 0.3
+ * 3) bucketNumber: 3, weight: 0.5
+ *
+ * Each cumulativeWeight of the bucket is:
+ * 1) bucketNumber: 1, cumulativeWeight: 0.2
+ * 2) bucketNumber: 2, cumulativeWeight: 0.5
+ * 3) bucketNumber: 3, cumulativeWeight: 1.0
+ */
+public class InsertBucketCumulativeWeightPair extends Pair<InsertBucket, Double> {
+  InsertBucket insertBucket;
+  Double cumulativeWeight;
+
+  public InsertBucketCumulativeWeightPair(final InsertBucket insertBucket, final Double cumulativeWeight) {
+    super();
+    this.insertBucket = insertBucket;
+    this.cumulativeWeight = cumulativeWeight;
+  }
+
+  @Override
+  public InsertBucket getLeft() {
+    return insertBucket;
+  }
+
+  @Override
+  public Double getRight() {
+    return cumulativeWeight;
+  }
+
+  @Override
+  public int compareTo(final Pair<InsertBucket, Double> other) {
+    // Only need to compare the cumulativeWeight.
+    return cumulativeWeight.compareTo(other.getRight());
+  }
+
+  @Override
+  public Double setValue(Double value) {
+    this.cumulativeWeight = value;
+    return value;
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 755854b..86fa1bf 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -41,6 +41,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.PairFunction;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -77,7 +78,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
   /**
    * Helps us pack inserts into 1 or more buckets depending on number of incoming records.
    */
-  private HashMap<String, List<InsertBucket>> partitionPathToInsertBuckets;
+  private HashMap<String, List<InsertBucketCumulativeWeightPair>> partitionPathToInsertBucketInfos;
   /**
    * Remembers what type each bucket is for later.
    */
@@ -90,7 +91,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
   public UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc, HoodieTable<T> table,
       HoodieWriteConfig config) {
     updateLocationToBucket = new HashMap<>();
-    partitionPathToInsertBuckets = new HashMap<>();
+    partitionPathToInsertBucketInfos = new HashMap<>();
     bucketInfoMap = new HashMap<>();
     this.profile = profile;
     this.table = table;
@@ -99,7 +100,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
     assignInserts(profile, jsc);
 
     LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
-        + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
+        + "Partition to insert buckets => " + partitionPathToInsertBucketInfos + ", \n"
         + "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
   }
 
@@ -193,15 +194,17 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
         }
 
         // Go over all such buckets, and assign weights as per amount of incoming inserts.
-        List<InsertBucket> insertBuckets = new ArrayList<>();
+        List<InsertBucketCumulativeWeightPair> insertBuckets = new ArrayList<>();
+        double curentCumulativeWeight = 0;
         for (int i = 0; i < bucketNumbers.size(); i++) {
           InsertBucket bkt = new InsertBucket();
           bkt.bucketNumber = bucketNumbers.get(i);
           bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
-          insertBuckets.add(bkt);
+          curentCumulativeWeight += bkt.weight;
+          insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt, curentCumulativeWeight));
         }
         LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
-        partitionPathToInsertBuckets.put(partitionPath, insertBuckets);
+        partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
       }
     }
   }
@@ -252,8 +255,8 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
     return bucketInfoMap.get(bucketNumber);
   }
 
-  public List<InsertBucket> getInsertBuckets(String partitionPath) {
-    return partitionPathToInsertBuckets.get(partitionPath);
+  public List<InsertBucketCumulativeWeightPair> getInsertBuckets(String partitionPath) {
+    return partitionPathToInsertBucketInfos.get(partitionPath);
   }
 
   @Override
@@ -270,20 +273,24 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
       return updateLocationToBucket.get(location.getFileId());
     } else {
       String partitionPath = keyLocation._1().getPartitionPath();
-      List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(partitionPath);
+      List<InsertBucketCumulativeWeightPair> targetBuckets = partitionPathToInsertBucketInfos.get(partitionPath);
       // pick the target bucket to use based on the weights.
-      double totalWeight = 0.0;
       final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts());
       final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
       final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
-      for (InsertBucket insertBucket : targetBuckets) {
-        totalWeight += insertBucket.weight;
-        if (r <= totalWeight) {
-          return insertBucket.bucketNumber;
-        }
+
+      int index = Collections.binarySearch(targetBuckets, new InsertBucketCumulativeWeightPair(new InsertBucket(), r));
+
+      if (index >= 0) {
+        return targetBuckets.get(index).getKey().bucketNumber;
       }
+
+      if ((-1 * index - 1) < targetBuckets.size()) {
+        return targetBuckets.get((-1 * index - 1)).getKey().bucketNumber;
+      }
+
       // return first one, by default
-      return targetBuckets.get(0).bucketNumber;
+      return targetBuckets.get(0).getKey().bucketNumber;
     }
   }
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index f49d6d5..b8df5ef 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -180,7 +180,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
     final String testPartitionPath = "2016/09/26";
     // Inserts + Updates... Check all updates go together & inserts subsplit
     UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, testPartitionPath, false);
-    List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+    List<InsertBucketCumulativeWeightPair> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
     assertEquals(2, insertBuckets.size(), "Total of 2 insert buckets");
   }
 
@@ -201,20 +201,18 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
 
     WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(insertRecords));
     UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config);
-    List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+    List<InsertBucketCumulativeWeightPair> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
 
     float bucket0Weight = 0.2f;
-    InsertBucket newInsertBucket0 = new InsertBucket();
-    newInsertBucket0.bucketNumber = insertBuckets.get(0).bucketNumber;
-    newInsertBucket0.weight = bucket0Weight;
-    insertBuckets.remove(0);
-    insertBuckets.add(0, newInsertBucket0);
-
-    InsertBucket newInsertBucket1 = new InsertBucket();
-    newInsertBucket1.bucketNumber = insertBuckets.get(1).bucketNumber;
-    newInsertBucket1.weight = 1 - bucket0Weight;
-    insertBuckets.remove(1);
-    insertBuckets.add(1, newInsertBucket1);
+    InsertBucketCumulativeWeightPair pair = insertBuckets.remove(0);
+    pair.getKey().weight = bucket0Weight;
+    pair.setValue(new Double(bucket0Weight));
+    insertBuckets.add(0, pair);
+
+    InsertBucketCumulativeWeightPair pair1 = insertBuckets.remove(1);
+    pair1.getKey().weight = 1 - bucket0Weight;
+    pair1.setValue(new Double(1));
+    insertBuckets.add(1, pair1);
 
     Map<Integer, Integer> partition2numRecords = new HashMap<Integer, Integer>();
     for (HoodieRecord hoodieRecord: insertRecords) {
@@ -238,13 +236,26 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
             "The weight of bucket1 should be " + (1 - bucket0Weight));
   }
 
+  private void assertInsertBuckets(Double[] weights,
+                                   Double[] cumulativeWeights,
+                                   List<InsertBucketCumulativeWeightPair> insertBuckets) {
+    for (int i = 0; i < weights.length; i++) {
+      assertEquals(i, insertBuckets.get(i).getKey().bucketNumber,
+          String.format("BucketNumber of insert bucket %d must be same as %d", i, i));
+      assertEquals(weights[i], insertBuckets.get(i).getKey().weight, 0.01,
+          String.format("Insert bucket %d should have weight %.1f", i, weights[i]));
+      assertEquals(cumulativeWeights[i], insertBuckets.get(i).getValue(), 0.01,
+          String.format("Insert bucket %d should have cumulativeWeight %.1f", i, cumulativeWeights[i]));
+    }
+  }
+
   @Test
   public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
     final String testPartitionPath = "2016/09/26";
     // Inserts + Updates .. Check updates go together & inserts subsplit, after expanding
     // smallest file
     UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, testPartitionPath, false);
-    List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+    List<InsertBucketCumulativeWeightPair> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
 
     assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
     assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType,
@@ -254,8 +265,10 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
     assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
         "Bucket 2 is INSERT");
     assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets");
-    assertEquals(0, insertBuckets.get(0).bucketNumber, "First insert bucket must be same as update bucket");
-    assertEquals(0.5, insertBuckets.get(0).weight, 0.01, "First insert bucket should have weight 0.5");
+
+    Double[] weights = { 0.5, 0.25, 0.25};
+    Double[] cumulativeWeights = { 0.5, 0.75, 1.0};
+    assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
 
     // Now with insert split size auto tuned
     partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, testPartitionPath, true);
@@ -271,8 +284,10 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
     assertEquals(BucketType.INSERT, partitioner.getBucketInfo(3).bucketType,
         "Bucket 3 is INSERT");
     assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
-    assertEquals(0, insertBuckets.get(0).bucketNumber, "First insert bucket must be same as update bucket");
-    assertEquals(200.0 / 2400, insertBuckets.get(0).weight, 0.01, "First insert bucket should have weight 0.5");
+
+    weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
+    cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+    assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
   }
 
   private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {