You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/08/24 13:45:41 UTC

[hudi] branch master updated: [HUDI-2345] Hoodie columns sort partitioner for bulk insert (#3523)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new de94787  [HUDI-2345] Hoodie columns sort partitioner for bulk insert (#3523)
de94787 is described below

commit de94787a85b272f79181dff73907b0f20855ee78
Author: zhangyue19921010 <69...@users.noreply.github.com>
AuthorDate: Tue Aug 24 21:45:17 2021 +0800

    [HUDI-2345] Hoodie columns sort partitioner for bulk insert (#3523)
    
    
    Co-authored-by: yuezhang <yu...@freewheel.tv>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 18 +++++++-
 .../RDDCustomColumnsSortPartitioner.java           | 10 ++++
 .../TestBulkInsertInternalPartitioner.java         | 18 +++++++-
 .../main/java/org/apache/hudi/DataSourceUtils.java |  4 +-
 .../java/org/apache/hudi/TestDataSourceUtils.java  | 54 +++++++++++++++-------
 5 files changed, 83 insertions(+), 21 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d16d417..04660fc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -158,11 +158,18 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("For large initial imports using bulk_insert operation, controls the parallelism to use for sort modes or custom partitioning done"
           + "before writing records to the table.");
 
+  public static final ConfigProperty<String> BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS = ConfigProperty
+          .key("hoodie.bulkinsert.user.defined.partitioner.sort.columns")
+          .noDefaultValue()
+          .withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. "
+                  + "For example 'column1,column2'");
+
   public static final ConfigProperty<String> BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME = ConfigProperty
       .key("hoodie.bulkinsert.user.defined.partitioner.class")
       .noDefaultValue()
       .withDocumentation("If specified, this class will be used to re-partition records before they are bulk inserted. This can be used to sort, pack, cluster data"
-          + " optimally for common query patterns.");
+          + " optimally for common query patterns. For now we support a build-in user defined bulkinsert partitioner org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner"
+          + " which can does sorting based on specified column values set by " + BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS.key());
 
   public static final ConfigProperty<String> UPSERT_PARALLELISM_VALUE = ConfigProperty
       .key("hoodie.upsert.shuffle.parallelism")
@@ -894,6 +901,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME);
   }
 
+  public String getUserDefinedBulkInsertPartitionerSortColumns() {
+    return getString(BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS);
+  }
+
   public int getInsertShuffleParallelism() {
     return getInt(INSERT_PARALLELISM_VALUE);
   }
@@ -1832,6 +1843,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withUserDefinedBulkInsertPartitionerSortColumns(String columns) {
+      writeConfig.setValue(BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS, columns);
+      return this;
+    }
+
     public Builder withDeleteParallelism(int parallelism) {
       writeConfig.setValue(DELETE_PARALLELISM_VALUE, String.valueOf(parallelism));
       return this;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index 209531d..fb3c5ec 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.spark.api.java.JavaRDD;
@@ -41,6 +42,11 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
   private final String[] sortColumnNames;
   private final SerializableSchema serializableSchema;
 
+  public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) {
+    this.serializableSchema = new SerializableSchema(new Schema.Parser().parse(config.getSchema()));
+    this.sortColumnNames = getSortColumnName(config);
+  }
+
   public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema) {
     this.sortColumnNames = columnNames;
     this.serializableSchema = new SerializableSchema(schema);
@@ -79,4 +85,8 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
       throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e);
     }
   }
+
+  private String[] getSortColumnName(HoodieWriteConfig config) {
+    return config.getUserDefinedBulkInsertPartitionerSortColumns().split(",");
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
index 81effaa..f4c5622 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.testutils.HoodieClientTestBase;
@@ -43,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
@@ -139,7 +141,8 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
 
   @Test
   public void testCustomColumnSortPartitioner() throws Exception {
-    String[] sortColumns = new String[] {"rider"};
+    String sortColumnString = "rider";
+    String[] sortColumns = sortColumnString.split(",");
     Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns);
 
     JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
@@ -148,6 +151,19 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
         records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
     testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA),
         records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
+
+    HoodieWriteConfig config = HoodieWriteConfig
+            .newBuilder()
+            .withPath("/")
+            .withSchema(TRIP_EXAMPLE_SCHEMA)
+            .withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
+            .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
+            .build();
+    testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config),
+            records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
+    testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config),
+            records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
+
   }
 
   private Comparator<HoodieRecord<? extends HoodieRecordPayload>> getCustomColumnComparator(Schema schema, String[] sortColumns) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 7e043eb..0dafba4 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -96,7 +96,7 @@ public class DataSourceUtils {
     try {
       return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass)
           ? Option.empty() :
-          Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass));
+          Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass, config));
     } catch (Throwable e) {
       throw new HoodieException("Could not create UserDefinedBulkInsertPartitioner class " + bulkInsertPartitionerClass, e);
     }
@@ -115,7 +115,7 @@ public class DataSourceUtils {
     try {
       return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass)
           ? Option.empty() :
-          Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass));
+          Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass, config));
     } catch (Throwable e) {
       throw new HoodieException("Could not create UserDefinedBulkInsertPartitionerRows class " + bulkInsertPartitionerClass, e);
     }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index 94f1a69..081a8e4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.avro.Conversions;
@@ -74,6 +75,24 @@ public class TestDataSourceUtils {
   private ArgumentCaptor<Option> optionCaptor;
   private HoodieWriteConfig config;
 
+  // There are fields event_date1, event_date2, event_date3 with logical type as Date. event_date1 & event_date3 are
+  // of UNION schema type, which is a union of null and date type in different orders. event_date2 is non-union
+  // date type. event_cost1, event_cost2, event3 are decimal logical types with UNION schema, which is similar to
+  // the event_date.
+  private String avroSchemaString = "{\"type\": \"record\"," + "\"name\": \"events\"," + "\"fields\": [ "
+          + "{\"name\": \"event_date1\", \"type\" : [{\"type\" : \"int\", \"logicalType\" : \"date\"}, \"null\"]},"
+          + "{\"name\": \"event_date2\", \"type\" : {\"type\": \"int\", \"logicalType\" : \"date\"}},"
+          + "{\"name\": \"event_date3\", \"type\" : [\"null\", {\"type\" : \"int\", \"logicalType\" : \"date\"}]},"
+          + "{\"name\": \"event_name\", \"type\": \"string\"},"
+          + "{\"name\": \"event_organizer\", \"type\": \"string\"},"
+          + "{\"name\": \"event_cost1\", \"type\": "
+          + "[{\"type\": \"fixed\", \"name\": \"dc\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}, \"null\"]},"
+          + "{\"name\": \"event_cost2\", \"type\": "
+          + "{\"type\": \"fixed\", \"name\": \"ef\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}},"
+          + "{\"name\": \"event_cost3\", \"type\": "
+          + "[\"null\", {\"type\": \"fixed\", \"name\": \"fg\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}]}"
+          + "]}";
+
   @BeforeEach
   public void setUp() {
     config = HoodieWriteConfig.newBuilder().withPath("/").build();
@@ -81,23 +100,6 @@ public class TestDataSourceUtils {
 
   @Test
   public void testAvroRecordsFieldConversion() {
-    // There are fields event_date1, event_date2, event_date3 with logical type as Date. event_date1 & event_date3 are
-    // of UNION schema type, which is a union of null and date type in different orders. event_date2 is non-union
-    // date type. event_cost1, event_cost2, event3 are decimal logical types with UNION schema, which is similar to
-    // the event_date.
-    String avroSchemaString = "{\"type\": \"record\"," + "\"name\": \"events\"," + "\"fields\": [ "
-        + "{\"name\": \"event_date1\", \"type\" : [{\"type\" : \"int\", \"logicalType\" : \"date\"}, \"null\"]},"
-        + "{\"name\": \"event_date2\", \"type\" : {\"type\": \"int\", \"logicalType\" : \"date\"}},"
-        + "{\"name\": \"event_date3\", \"type\" : [\"null\", {\"type\" : \"int\", \"logicalType\" : \"date\"}]},"
-        + "{\"name\": \"event_name\", \"type\": \"string\"},"
-        + "{\"name\": \"event_organizer\", \"type\": \"string\"},"
-        + "{\"name\": \"event_cost1\", \"type\": "
-        + "[{\"type\": \"fixed\", \"name\": \"dc\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}, \"null\"]},"
-        + "{\"name\": \"event_cost2\", \"type\": "
-        + "{\"type\": \"fixed\", \"name\": \"ef\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}},"
-        + "{\"name\": \"event_cost3\", \"type\": "
-        + "[\"null\", {\"type\": \"fixed\", \"name\": \"fg\", \"size\": 5, \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 6}]}"
-        + "]}";
 
     Schema avroSchema = new Schema.Parser().parse(avroSchemaString);
     GenericRecord record = new GenericData.Record(avroSchema);
@@ -183,6 +185,20 @@ public class TestDataSourceUtils {
     assertThat(partitioner.isPresent(), is(true));
   }
 
+  @Test
+  public void testCreateRDDCustomColumnsSortPartitionerWithValidPartitioner() throws HoodieException {
+    config = HoodieWriteConfig
+            .newBuilder()
+            .withPath("/")
+            .withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
+            .withUserDefinedBulkInsertPartitionerSortColumns("column1, column2")
+            .withSchema(avroSchemaString)
+            .build();
+
+    Option<BulkInsertPartitioner<Dataset<Row>>> partitioner = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(config);
+    assertThat(partitioner.isPresent(), is(true));
+  }
+
   private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) {
     config = HoodieWriteConfig.newBuilder().withPath(config.getBasePath())
         .withUserDefinedBulkInsertPartitionerClass(partitionerClassName)
@@ -195,6 +211,8 @@ public class TestDataSourceUtils {
   public static class NoOpBulkInsertPartitioner<T extends HoodieRecordPayload>
       implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
 
+    public NoOpBulkInsertPartitioner(HoodieWriteConfig config) {}
+
     @Override
     public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
       return records;
@@ -209,6 +227,8 @@ public class TestDataSourceUtils {
   public static class NoOpBulkInsertPartitionerRows
       implements BulkInsertPartitioner<Dataset<Row>> {
 
+    public NoOpBulkInsertPartitionerRows(HoodieWriteConfig config) {}
+
     @Override
     public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
       return records;