You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/20 15:38:25 UTC

[incubator-hudi] branch master updated: [HUDI-772] Make UserDefinedBulkInsertPartitioner configurable for DataSource (#1500)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ddd105b  [HUDI-772] Make UserDefinedBulkInsertPartitioner configurable for DataSource (#1500)
ddd105b is described below

commit ddd105bb3119174b613c6917ee25795f2939f430
Author: Dongwook <kw...@users.noreply.github.com>
AuthorDate: Mon Apr 20 08:38:18 2020 -0700

    [HUDI-772] Make UserDefinedBulkInsertPartitioner configurable for DataSource (#1500)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 10 +++
 .../main/java/org/apache/hudi/DataSourceUtils.java | 27 ++++++-
 hudi-spark/src/test/java/DataSourceTestUtils.java  | 13 ++++
 hudi-spark/src/test/java/DataSourceUtilsTest.java  | 86 ++++++++++++++++++++++
 4 files changed, 134 insertions(+), 2 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 5ac87da..50af725 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -57,6 +57,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   private static final String DEFAULT_PARALLELISM = "1500";
   private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
   private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
+  private static final String BULKINSERT_USER_DEFINED_PARTITIONER_CLASS = "hoodie.bulkinsert.user.defined.partitioner.class";
   private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
   private static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
   private static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
@@ -157,6 +158,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Integer.parseInt(props.getProperty(BULKINSERT_PARALLELISM));
   }
 
+  public String getUserDefinedBulkInsertPartitionerClass() {
+    return props.getProperty(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS);
+  }
+
   public int getInsertShuffleParallelism() {
     return Integer.parseInt(props.getProperty(INSERT_PARALLELISM));
   }
@@ -603,6 +608,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withUserDefinedBulkInsertPartitionerClass(String className) {
+      props.setProperty(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS, className);
+      return this;
+    }
+
     public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) {
       props.setProperty(INSERT_PARALLELISM, String.valueOf(insertShuffleParallelism));
       props.setProperty(UPSERT_PARALLELISM, String.valueOf(upsertShuffleParallelism));
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 7a4caac..34f2ef2 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -25,7 +25,9 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -36,6 +38,7 @@ import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
 
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
@@ -153,6 +156,24 @@ public class DataSourceUtils {
   }
 
   /**
+   * Create a UserDefinedBulkInsertPartitioner class via reflection,
+   * <br>
+   * if the class name of UserDefinedBulkInsertPartitioner is configured through the HoodieWriteConfig.
+   * @see HoodieWriteConfig#getUserDefinedBulkInsertPartitionerClass()
+   */
+  private static Option<UserDefinedBulkInsertPartitioner> createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config)
+          throws HoodieException {
+    String bulkInsertPartitionerClass = config.getUserDefinedBulkInsertPartitionerClass();
+    try {
+      return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass)
+              ? Option.empty() :
+              Option.of((UserDefinedBulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass));
+    } catch (Throwable e) {
+      throw new HoodieException("Could not create UserDefinedBulkInsertPartitioner class " + bulkInsertPartitionerClass, e);
+    }
+  }
+
+  /**
    * Create a payload class via reflection, passing in an ordering/precombine value.
    */
   public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
@@ -196,9 +217,11 @@ public class DataSourceUtils {
   }
 
   public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
-                                                      String instantTime, String operation) {
+                                                      String instantTime, String operation) throws HoodieException {
     if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) {
-      return client.bulkInsert(hoodieRecords, instantTime);
+      Option<UserDefinedBulkInsertPartitioner> userDefinedBulkInsertPartitioner =
+          createUserDefinedBulkInsertPartitioner(client.getConfig());
+      return client.bulkInsert(hoodieRecords, instantTime, userDefinedBulkInsertPartitioner);
     } else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) {
       return client.insert(hoodieRecords, instantTime);
     } else {
diff --git a/hudi-spark/src/test/java/DataSourceTestUtils.java b/hudi-spark/src/test/java/DataSourceTestUtils.java
index 036e6c2..0d801bb 100644
--- a/hudi-spark/src/test/java/DataSourceTestUtils.java
+++ b/hudi-spark/src/test/java/DataSourceTestUtils.java
@@ -19,7 +19,10 @@
 import org.apache.hudi.common.TestRawTripPayload;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+import org.apache.spark.api.java.JavaRDD;
 
 import java.io.IOException;
 import java.util.List;
@@ -52,4 +55,14 @@ public class DataSourceTestUtils {
         .map(hr -> "{\"_row_key\":\"" + hr.getRecordKey() + "\",\"partition\":\"" + hr.getPartitionPath() + "\"}")
         .collect(Collectors.toList());
   }
+
+  public static class NoOpBulkInsertPartitioner<T extends HoodieRecordPayload>
+          implements UserDefinedBulkInsertPartitioner<T> {
+
+    @Override
+    public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
+      return records;
+    }
+  }
+
 }
diff --git a/hudi-spark/src/test/java/DataSourceUtilsTest.java b/hudi-spark/src/test/java/DataSourceUtilsTest.java
index 4bacb7c..c14b852 100644
--- a/hudi-spark/src/test/java/DataSourceUtilsTest.java
+++ b/hudi-spark/src/test/java/DataSourceUtilsTest.java
@@ -17,18 +17,58 @@
  */
 
 import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 import java.time.LocalDate;
 
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+@ExtendWith(MockitoExtension.class)
 public class DataSourceUtilsTest {
 
+  @Mock
+  private HoodieWriteClient hoodieWriteClient;
+
+  @Mock
+  private JavaRDD<HoodieRecord> hoodieRecords;
+
+  @Captor
+  private ArgumentCaptor<Option> optionCaptor;
+  private HoodieWriteConfig config;
+
+  @BeforeEach
+  public void setUp() {
+    config = HoodieWriteConfig.newBuilder().withPath("/").build();
+  }
+
   @Test
   public void testAvroRecordsFieldConversion() {
     // There are fields event_date1, event_date2, event_date3 with logical type as Date. event_date1 & event_date3 are
@@ -59,4 +99,50 @@ public class DataSourceUtilsTest {
     assertEquals("Hudi Meetup", DataSourceUtils.getNestedFieldValAsString(record, "event_name", true));
     assertEquals("Hudi PMC", DataSourceUtils.getNestedFieldValAsString(record, "event_organizer", true));
   }
+
+  @Test
+  public void testDoWriteOperationWithoutUserDefinedBulkInsertPartitioner() throws HoodieException {
+    when(hoodieWriteClient.getConfig()).thenReturn(config);
+
+    DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time",
+            DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL());
+
+    verify(hoodieWriteClient, times(1)).bulkInsert(any(hoodieRecords.getClass()), anyString(),
+            optionCaptor.capture());
+    assertThat(optionCaptor.getValue(), is(equalTo(Option.empty())));
+  }
+
+  @Test
+  public void testDoWriteOperationWithNonExistUserDefinedBulkInsertPartitioner() throws HoodieException {
+    setAndVerifyHoodieWriteClientWith("NonExistClassName");
+
+    Exception exception = assertThrows(HoodieException.class, () -> {
+      DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time",
+              DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL());
+    });
+
+    assertThat(exception.getMessage(), containsString("Could not create UserDefinedBulkInsertPartitioner"));
+  }
+
+  @Test
+  public void testDoWriteOperationWithUserDefinedBulkInsertPartitioner() throws HoodieException {
+    setAndVerifyHoodieWriteClientWith(DataSourceTestUtils.NoOpBulkInsertPartitioner.class.getName());
+
+    DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time",
+            DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL());
+
+    verify(hoodieWriteClient, times(1)).bulkInsert(any(hoodieRecords.getClass()), anyString(),
+            optionCaptor.capture());
+    assertThat(optionCaptor.getValue().get(), is(instanceOf(DataSourceTestUtils.NoOpBulkInsertPartitioner.class)));
+  }
+
+  private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) {
+    config = HoodieWriteConfig.newBuilder().withPath(config.getBasePath())
+            .withUserDefinedBulkInsertPartitionerClass(partitionerClassName)
+            .build();
+    when(hoodieWriteClient.getConfig()).thenReturn(config);
+
+    assertThat(config.getUserDefinedBulkInsertPartitionerClass(), is(equalTo(partitionerClassName)));
+  }
+
 }