You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/20 15:38:25 UTC
[incubator-hudi] branch master updated: [HUDI-772] Make
UserDefinedBulkInsertPartitioner configurable for DataSource (#1500)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ddd105b [HUDI-772] Make UserDefinedBulkInsertPartitioner configurable for DataSource (#1500)
ddd105b is described below
commit ddd105bb3119174b613c6917ee25795f2939f430
Author: Dongwook <kw...@users.noreply.github.com>
AuthorDate: Mon Apr 20 08:38:18 2020 -0700
[HUDI-772] Make UserDefinedBulkInsertPartitioner configurable for DataSource (#1500)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 10 +++
.../main/java/org/apache/hudi/DataSourceUtils.java | 27 ++++++-
hudi-spark/src/test/java/DataSourceTestUtils.java | 13 ++++
hudi-spark/src/test/java/DataSourceUtilsTest.java | 86 ++++++++++++++++++++++
4 files changed, 134 insertions(+), 2 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 5ac87da..50af725 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -57,6 +57,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_PARALLELISM = "1500";
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
+ private static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class";
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
private static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
private static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
@@ -157,6 +158,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(BULKINSERT_PARALLELISM));
}
+ public String getUserDefinedBulkInsertPartitionerClass() {
+ return props.getProperty(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS);
+ }
+
public int getInsertShuffleParallelism() {
return Integer.parseInt(props.getProperty(INSERT_PARALLELISM));
}
@@ -603,6 +608,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder withUserDefinedBulkInsertPartitionerClass(String className) {
+ props.setProperty(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS, className);
+ return this;
+ }
+
public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) {
props.setProperty(INSERT_PARALLELISM, String.valueOf(insertShuffleParallelism));
props.setProperty(UPSERT_PARALLELISM, String.valueOf(upsertShuffleParallelism));
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 7a4caac..34f2ef2 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -25,7 +25,9 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -36,6 +38,7 @@ import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
@@ -153,6 +156,24 @@ public class DataSourceUtils {
}
/**
+ * Create a UserDefinedBulkInsertPartitioner class via reflection,
+ * <br>
+ * if the class name of UserDefinedBulkInsertPartitioner is configured through the HoodieWriteConfig.
+ * @see HoodieWriteConfig#getUserDefinedBulkInsertPartitionerClass()
+ */
+ private static Option<UserDefinedBulkInsertPartitioner> createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config)
+ throws HoodieException {
+ String bulkInsertPartitionerClass = config.getUserDefinedBulkInsertPartitionerClass();
+ try {
+ return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass)
+ ? Option.empty() :
+ Option.of((UserDefinedBulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass));
+ } catch (Throwable e) {
+ throw new HoodieException("Could not create UserDefinedBulkInsertPartitioner class " + bulkInsertPartitionerClass, e);
+ }
+ }
+
+ /**
* Create a payload class via reflection, passing in an ordering/precombine value.
*/
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
@@ -196,9 +217,11 @@ public class DataSourceUtils {
}
public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
- String instantTime, String operation) {
+ String instantTime, String operation) throws HoodieException {
if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) {
- return client.bulkInsert(hoodieRecords, instantTime);
+ Option<UserDefinedBulkInsertPartitioner> userDefinedBulkInsertPartitioner =
+ createUserDefinedBulkInsertPartitioner(client.getConfig());
+ return client.bulkInsert(hoodieRecords, instantTime, userDefinedBulkInsertPartitioner);
} else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) {
return client.insert(hoodieRecords, instantTime);
} else {
diff --git a/hudi-spark/src/test/java/DataSourceTestUtils.java b/hudi-spark/src/test/java/DataSourceTestUtils.java
index 036e6c2..0d801bb 100644
--- a/hudi-spark/src/test/java/DataSourceTestUtils.java
+++ b/hudi-spark/src/test/java/DataSourceTestUtils.java
@@ -19,7 +19,10 @@
import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.util.List;
@@ -52,4 +55,14 @@ public class DataSourceTestUtils {
.map(hr -> "{\"_row_key\":\"" + hr.getRecordKey() + "\",\"partition\":\"" + hr.getPartitionPath() + "\"}")
.collect(Collectors.toList());
}
+
+ public static class NoOpBulkInsertPartitioner<T extends HoodieRecordPayload>
+ implements UserDefinedBulkInsertPartitioner<T> {
+
+ @Override
+ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
+ return records;
+ }
+ }
+
}
diff --git a/hudi-spark/src/test/java/DataSourceUtilsTest.java b/hudi-spark/src/test/java/DataSourceUtilsTest.java
index 4bacb7c..c14b852 100644
--- a/hudi-spark/src/test/java/DataSourceUtilsTest.java
+++ b/hudi-spark/src/test/java/DataSourceUtilsTest.java
@@ -17,18 +17,58 @@
*/
import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
import java.time.LocalDate;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+@ExtendWith(MockitoExtension.class)
public class DataSourceUtilsTest {
+ @Mock
+ private HoodieWriteClient hoodieWriteClient;
+
+ @Mock
+ private JavaRDD<HoodieRecord> hoodieRecords;
+
+ @Captor
+ private ArgumentCaptor<Option> optionCaptor;
+ private HoodieWriteConfig config;
+
+ @BeforeEach
+ public void setUp() {
+ config = HoodieWriteConfig.newBuilder().withPath("/").build();
+ }
+
@Test
public void testAvroRecordsFieldConversion() {
// There are fields event_date1, event_date2, event_date3 with logical type as Date. event_date1 & event_date3 are
@@ -59,4 +99,50 @@ public class DataSourceUtilsTest {
assertEquals("Hudi Meetup", DataSourceUtils.getNestedFieldValAsString(record, "event_name", true));
assertEquals("Hudi PMC", DataSourceUtils.getNestedFieldValAsString(record, "event_organizer", true));
}
+
+ @Test
+ public void testDoWriteOperationWithoutUserDefinedBulkInsertPartitioner() throws HoodieException {
+ when(hoodieWriteClient.getConfig()).thenReturn(config);
+
+ DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time",
+ DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL());
+
+ verify(hoodieWriteClient, times(1)).bulkInsert(any(hoodieRecords.getClass()), anyString(),
+ optionCaptor.capture());
+ assertThat(optionCaptor.getValue(), is(equalTo(Option.empty())));
+ }
+
+ @Test
+ public void testDoWriteOperationWithNonExistUserDefinedBulkInsertPartitioner() throws HoodieException {
+ setAndVerifyHoodieWriteClientWith("NonExistClassName");
+
+ Exception exception = assertThrows(HoodieException.class, () -> {
+ DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time",
+ DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL());
+ });
+
+ assertThat(exception.getMessage(), containsString("Could not create UserDefinedBulkInsertPartitioner"));
+ }
+
+ @Test
+ public void testDoWriteOperationWithUserDefinedBulkInsertPartitioner() throws HoodieException {
+ setAndVerifyHoodieWriteClientWith(DataSourceTestUtils.NoOpBulkInsertPartitioner.class.getName());
+
+ DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time",
+ DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL());
+
+ verify(hoodieWriteClient, times(1)).bulkInsert(any(hoodieRecords.getClass()), anyString(),
+ optionCaptor.capture());
+ assertThat(optionCaptor.getValue().get(), is(instanceOf(DataSourceTestUtils.NoOpBulkInsertPartitioner.class)));
+ }
+
+ private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) {
+ config = HoodieWriteConfig.newBuilder().withPath(config.getBasePath())
+ .withUserDefinedBulkInsertPartitionerClass(partitionerClassName)
+ .build();
+ when(hoodieWriteClient.getConfig()).thenReturn(config);
+
+ assertThat(config.getUserDefinedBulkInsertPartitionerClass(), is(equalTo(partitionerClassName)));
+ }
+
}