You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/08/22 11:41:48 UTC
[hudi] branch master updated: [HUDI-1083] Optimization in
determining insert bucket location for a given key (#1868)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1d09c02 [HUDI-1083] Optimization in determining insert bucket location for a given key (#1868)
1d09c02 is described below
commit 1d09c02f1cb9a3929995fb6df1bb7d58cdcb3eb3
Author: Shen Hong <sh...@126.com>
AuthorDate: Sat Aug 22 19:41:39 2020 +0800
[HUDI-1083] Optimization in determining insert bucket location for a given key (#1868)
- To determine insert bucket location for a given key, hudi walks through all insert buckets with O(N) cost, while this patch adds an optimization to make it O(logN).
---
.../commit/InsertBucketCumulativeWeightPair.java | 69 ++++++++++++++++++++++
.../table/action/commit/UpsertPartitioner.java | 39 +++++++-----
.../table/action/commit/TestUpsertPartitioner.java | 51 ++++++++++------
3 files changed, 125 insertions(+), 34 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucketCumulativeWeightPair.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucketCumulativeWeightPair.java
new file mode 100644
index 0000000..11db69a
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertBucketCumulativeWeightPair.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+/**
+ * Each InsertBucket has a weight, InsertBucketCumulativeWeightPair stored here is the cumulativeWeight of the
+ * InsertBucket. If there are multiple InsertBuckets in a partition, the InsertBuckets are numbered from 1,
+ * the cumulativeWeight of a InsertBucket is the sum of the InsertBucket weights from number 1 to its own number.
+ *
+ * Example, there are three InsertBucket in a partition, each bucketNumber and weight is:
+ * 1) bucketNumber: 1, weight: 0.2
+ * 2) bucketNumber: 2, weight: 0.3
+ * 3) bucketNumber: 3, weight: 0.5
+ *
+ * Each cumulativeWeight of the bucket is:
+ * 1) bucketNumber: 1, cumulativeWeight: 0.2
+ * 2) bucketNumber: 2, cumulativeWeight: 0.5
+ * 3) bucketNumber: 3, cumulativeWeight: 1.0
+ */
+public class InsertBucketCumulativeWeightPair extends Pair<InsertBucket, Double> {
+ InsertBucket insertBucket;
+ Double cumulativeWeight;
+
+ public InsertBucketCumulativeWeightPair(final InsertBucket insertBucket, final Double cumulativeWeight) {
+ super();
+ this.insertBucket = insertBucket;
+ this.cumulativeWeight = cumulativeWeight;
+ }
+
+ @Override
+ public InsertBucket getLeft() {
+ return insertBucket;
+ }
+
+ @Override
+ public Double getRight() {
+ return cumulativeWeight;
+ }
+
+ @Override
+ public int compareTo(final Pair<InsertBucket, Double> other) {
+ // Only need to compare the cumulativeWeight.
+ return cumulativeWeight.compareTo(other.getRight());
+ }
+
+ @Override
+ public Double setValue(Double value) {
+ this.cumulativeWeight = value;
+ return value;
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 755854b..86fa1bf 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -41,6 +41,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -77,7 +78,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
/**
* Helps us pack inserts into 1 or more buckets depending on number of incoming records.
*/
- private HashMap<String, List<InsertBucket>> partitionPathToInsertBuckets;
+ private HashMap<String, List<InsertBucketCumulativeWeightPair>> partitionPathToInsertBucketInfos;
/**
* Remembers what type each bucket is for later.
*/
@@ -90,7 +91,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
public UpsertPartitioner(WorkloadProfile profile, JavaSparkContext jsc, HoodieTable<T> table,
HoodieWriteConfig config) {
updateLocationToBucket = new HashMap<>();
- partitionPathToInsertBuckets = new HashMap<>();
+ partitionPathToInsertBucketInfos = new HashMap<>();
bucketInfoMap = new HashMap<>();
this.profile = profile;
this.table = table;
@@ -99,7 +100,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
assignInserts(profile, jsc);
LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
- + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
+ + "Partition to insert buckets => " + partitionPathToInsertBucketInfos + ", \n"
+ "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
}
@@ -193,15 +194,17 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
}
// Go over all such buckets, and assign weights as per amount of incoming inserts.
- List<InsertBucket> insertBuckets = new ArrayList<>();
+ List<InsertBucketCumulativeWeightPair> insertBuckets = new ArrayList<>();
+ double curentCumulativeWeight = 0;
for (int i = 0; i < bucketNumbers.size(); i++) {
InsertBucket bkt = new InsertBucket();
bkt.bucketNumber = bucketNumbers.get(i);
bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
- insertBuckets.add(bkt);
+ curentCumulativeWeight += bkt.weight;
+ insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt, curentCumulativeWeight));
}
LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
- partitionPathToInsertBuckets.put(partitionPath, insertBuckets);
+ partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
}
}
}
@@ -252,8 +255,8 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
return bucketInfoMap.get(bucketNumber);
}
- public List<InsertBucket> getInsertBuckets(String partitionPath) {
- return partitionPathToInsertBuckets.get(partitionPath);
+ public List<InsertBucketCumulativeWeightPair> getInsertBuckets(String partitionPath) {
+ return partitionPathToInsertBucketInfos.get(partitionPath);
}
@Override
@@ -270,20 +273,24 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends Partiti
return updateLocationToBucket.get(location.getFileId());
} else {
String partitionPath = keyLocation._1().getPartitionPath();
- List<InsertBucket> targetBuckets = partitionPathToInsertBuckets.get(partitionPath);
+ List<InsertBucketCumulativeWeightPair> targetBuckets = partitionPathToInsertBucketInfos.get(partitionPath);
// pick the target bucket to use based on the weights.
- double totalWeight = 0.0;
final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts());
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation._1().getRecordKey());
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
- for (InsertBucket insertBucket : targetBuckets) {
- totalWeight += insertBucket.weight;
- if (r <= totalWeight) {
- return insertBucket.bucketNumber;
- }
+
+ int index = Collections.binarySearch(targetBuckets, new InsertBucketCumulativeWeightPair(new InsertBucket(), r));
+
+ if (index >= 0) {
+ return targetBuckets.get(index).getKey().bucketNumber;
}
+
+ if ((-1 * index - 1) < targetBuckets.size()) {
+ return targetBuckets.get((-1 * index - 1)).getKey().bucketNumber;
+ }
+
// return first one, by default
- return targetBuckets.get(0).bucketNumber;
+ return targetBuckets.get(0).getKey().bucketNumber;
}
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index f49d6d5..b8df5ef 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -180,7 +180,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
final String testPartitionPath = "2016/09/26";
// Inserts + Updates... Check all updates go together & inserts subsplit
UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, testPartitionPath, false);
- List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+ List<InsertBucketCumulativeWeightPair> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
assertEquals(2, insertBuckets.size(), "Total of 2 insert buckets");
}
@@ -201,20 +201,18 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
WorkloadProfile profile = new WorkloadProfile(jsc.parallelize(insertRecords));
UpsertPartitioner partitioner = new UpsertPartitioner(profile, jsc, table, config);
- List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+ List<InsertBucketCumulativeWeightPair> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
float bucket0Weight = 0.2f;
- InsertBucket newInsertBucket0 = new InsertBucket();
- newInsertBucket0.bucketNumber = insertBuckets.get(0).bucketNumber;
- newInsertBucket0.weight = bucket0Weight;
- insertBuckets.remove(0);
- insertBuckets.add(0, newInsertBucket0);
-
- InsertBucket newInsertBucket1 = new InsertBucket();
- newInsertBucket1.bucketNumber = insertBuckets.get(1).bucketNumber;
- newInsertBucket1.weight = 1 - bucket0Weight;
- insertBuckets.remove(1);
- insertBuckets.add(1, newInsertBucket1);
+ InsertBucketCumulativeWeightPair pair = insertBuckets.remove(0);
+ pair.getKey().weight = bucket0Weight;
+ pair.setValue(new Double(bucket0Weight));
+ insertBuckets.add(0, pair);
+
+ InsertBucketCumulativeWeightPair pair1 = insertBuckets.remove(1);
+ pair1.getKey().weight = 1 - bucket0Weight;
+ pair1.setValue(new Double(1));
+ insertBuckets.add(1, pair1);
Map<Integer, Integer> partition2numRecords = new HashMap<Integer, Integer>();
for (HoodieRecord hoodieRecord: insertRecords) {
@@ -238,13 +236,26 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
"The weight of bucket1 should be " + (1 - bucket0Weight));
}
+ private void assertInsertBuckets(Double[] weights,
+ Double[] cumulativeWeights,
+ List<InsertBucketCumulativeWeightPair> insertBuckets) {
+ for (int i = 0; i < weights.length; i++) {
+ assertEquals(i, insertBuckets.get(i).getKey().bucketNumber,
+ String.format("BucketNumber of insert bucket %d must be same as %d", i, i));
+ assertEquals(weights[i], insertBuckets.get(i).getKey().weight, 0.01,
+ String.format("Insert bucket %d should have weight %.1f", i, weights[i]));
+ assertEquals(cumulativeWeights[i], insertBuckets.get(i).getValue(), 0.01,
+ String.format("Insert bucket %d should have cumulativeWeight %.1f", i, cumulativeWeights[i]));
+ }
+ }
+
@Test
public void testUpsertPartitionerWithSmallInsertHandling() throws Exception {
final String testPartitionPath = "2016/09/26";
// Inserts + Updates .. Check updates go together & inserts subsplit, after expanding
// smallest file
UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, testPartitionPath, false);
- List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
+ List<InsertBucketCumulativeWeightPair> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
assertEquals(3, partitioner.numPartitions(), "Should have 3 partitions");
assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType,
@@ -254,8 +265,10 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
assertEquals(BucketType.INSERT, partitioner.getBucketInfo(2).bucketType,
"Bucket 2 is INSERT");
assertEquals(3, insertBuckets.size(), "Total of 3 insert buckets");
- assertEquals(0, insertBuckets.get(0).bucketNumber, "First insert bucket must be same as update bucket");
- assertEquals(0.5, insertBuckets.get(0).weight, 0.01, "First insert bucket should have weight 0.5");
+
+ Double[] weights = { 0.5, 0.25, 0.25};
+ Double[] cumulativeWeights = { 0.5, 0.75, 1.0};
+ assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
// Now with insert split size auto tuned
partitioner = getUpsertPartitioner(1000 * 1024, 2400, 100, 800 * 1024, testPartitionPath, true);
@@ -271,8 +284,10 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
assertEquals(BucketType.INSERT, partitioner.getBucketInfo(3).bucketType,
"Bucket 3 is INSERT");
assertEquals(4, insertBuckets.size(), "Total of 4 insert buckets");
- assertEquals(0, insertBuckets.get(0).bucketNumber, "First insert bucket must be same as update bucket");
- assertEquals(200.0 / 2400, insertBuckets.get(0).weight, 0.01, "First insert bucket should have weight 0.5");
+
+ weights = new Double[] { 0.08, 0.31, 0.31, 0.31};
+ cumulativeWeights = new Double[] { 0.08, 0.39, 0.69, 1.0};
+ assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
}
private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {