You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/07/23 12:32:00 UTC

[hudi] branch master updated: [HUDI-1082] Fix minor bug in deciding the insert buckets (#1838)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c3279cd  [HUDI-1082] Fix minor bug in deciding the insert buckets (#1838)
c3279cd is described below

commit c3279cd5989805946267b046007ea23ba4b615c2
Author: Shen Hong <sh...@126.com>
AuthorDate: Thu Jul 23 20:31:49 2020 +0800

    [HUDI-1082] Fix minor bug in deciding the insert buckets (#1838)
---
 .../table/action/commit/UpsertPartitioner.java     | 11 +++--
 .../table/action/commit/TestUpsertPartitioner.java | 57 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 5 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 8857fc3..755854b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -67,9 +67,9 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
    */
   private int totalBuckets = 0;
   /**
-   * Stat for the current workload. Helps in determining total inserts, upserts etc.
+   * Stat for the current workload. Helps in determining inserts, upserts etc.
    */
-  private WorkloadStat globalStat;
+  private WorkloadProfile profile;
   /**
    * Helps decide which bucket an incoming update should go to.
    */
@@ -92,7 +92,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
     updateLocationToBucket = new HashMap<>();
     partitionPathToInsertBuckets = new HashMap<>();
     bucketInfoMap = new HashMap<>();
-    globalStat = profile.getGlobalStat();
+    this.profile = profile;
     this.table = table;
     this.config = config;
     assignUpdates(profile);
@@ -269,10 +269,11 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
       HoodieRecordLocation location = keyLocation._2().get();
       return updateLocationToBucket.get(location.getFileId());
     } else {
-      List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(keyLocation._1().getPartitionPath());
+      String partitionPath = keyLocation._1().getPartitionPath();
+      List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(partitionPath);
       // pick the target bucket to use based on the weights.
       double totalWeight = 0.0;
-      final long totalInserts = Math.max(1, globalStat.getNumInserts());
+      final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts());
       final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
       final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
       for (InsertBucket insertBucket : targetBuckets) {
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 09be8f1..c526ad1 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -45,14 +45,17 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import scala.Tuple2;
 
 import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
 import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -180,6 +183,60 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
   }
 
   @Test
+  public void testPartitionWeight() throws Exception {
+    final String testPartitionPath = "2016/09/26";
+    int totalInsertNum = 2000;
+
+    HoodieWriteConfig config = makeHoodieClientConfigBuilder()
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
+            .insertSplitSize(totalInsertNum / 2).autoTuneInsertSplits(false).build()).build();
+
+    HoodieClientTestUtils.fakeCommit(basePath, "001");
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+    List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001", totalInsertNum);
+
+    WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(insertRecords));
+    UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config);
+    List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+
+    float bucket0Weight = 0.2f;
+    InsertBucket newInsertBucket0 = new InsertBucket();
+    newInsertBucket0.bucketNumber = insertBuckets.get(0).bucketNumber;
+    newInsertBucket0.weight = bucket0Weight;
+    insertBuckets.remove(0);
+    insertBuckets.add(0, newInsertBucket0);
+
+    InsertBucket newInsertBucket1 = new InsertBucket();
+    newInsertBucket1.bucketNumber = insertBuckets.get(1).bucketNumber;
+    newInsertBucket1.weight = 1 - bucket0Weight;
+    insertBuckets.remove(1);
+    insertBuckets.add(1, newInsertBucket1);
+
+    Map<Integer, Integer> partition2numRecords = new HashMap<Integer, Integer>();
+    for (HoodieRecord hoodieRecord: insertRecords) {
+      int partition = partitioner.getPartition(new Tuple2<>(
+              hoodieRecord.getKey(), Option.ofNullable(hoodieRecord.getCurrentLocation())));
+      if (!partition2numRecords.containsKey(partition)) {
+        partition2numRecords.put(partition, 0);
+      }
+      partition2numRecords.put(partition, partition2numRecords.get(partition) + 1);
+    }
+
+    assertTrue(partition2numRecords.get(0) < partition2numRecords.get(1),
+            "The insert num of bucket1 should more than bucket0");
+    assertTrue(partition2numRecords.get(0) + partition2numRecords.get(1) == totalInsertNum,
+            "The total insert records should be " + totalInsertNum);
+    assertEquals(String.valueOf(bucket0Weight),
+            String.format("%.1f", (partition2numRecords.get(0) * 1.0f / totalInsertNum)),
+            "The weight of bucket0 should be " + bucket0Weight);
+    assertEquals(String.valueOf(1 - bucket0Weight),
+            String.format("%.1f", (partition2numRecords.get(1) * 1.0f / totalInsertNum)),
+            "The weight of bucket1 should be " + (1 - bucket0Weight));
+  }
+
+  @Test
   public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
     final String testPartitionPath = "2016/09/26";
     // Inserts + Updates .. Check updates go together & inserts subsplit, after expanding