You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/06/03 22:03:23 UTC

[GitHub] [hudi] yihua commented on a diff in pull request #5664: [HUDI-4140] Fixing hive style partitioning and default partition with bulk insert row writer with SimpleKeyGen and virtual keys

yihua commented on code in PR #5664:
URL: https://github.com/apache/hudi/pull/5664#discussion_r889380010


##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -87,6 +89,7 @@ public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteCo
     this.populateMetaFields = populateMetaFields;
     this.arePartitionRecordsSorted = arePartitionRecordsSorted;
     this.fileIdPrefix = UUID.randomUUID().toString();
+    this.isHiveStylePartitioning = writeConfig.isHiveStylePartitioningEnabled();

Review Comment:
   nit: `writeConfig` is saved inside this helper so we don't need to have another member variable `isHiveStylePartitioning`?



##########
hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java:
##########
@@ -109,6 +109,48 @@ public void testDataInternalWriter(boolean sorted, boolean populateMetaFields) t
     }
   }
 
+  @Test
+  public void testDataInternalWriterHiveStylePartitioning() throws Exception {
+    boolean sorted = true;
+    boolean populateMetaFields = false;
+    // init config and table
+    HoodieWriteConfig cfg = getWriteConfig(populateMetaFields, "true");
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    for (int i = 0; i < 1; i++) {
+      String instantTime = "00" + i;
+      // init writer
+      HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
+          STRUCT_TYPE, populateMetaFields, sorted);
+
+      int size = 10 + RANDOM.nextInt(1000);
+      // write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file
+      int batches = 3;
+      Dataset<Row> totalInputRows = null;
+
+      for (int j = 0; j < batches; j++) {
+        String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+        Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
+        writeRows(inputRows, writer);
+        if (totalInputRows == null) {
+          totalInputRows = inputRows;
+        } else {
+          totalInputRows = totalInputRows.union(inputRows);
+        }
+      }
+
+      BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage) writer.commit();
+      Option<List<String>> fileAbsPaths = Option.of(new ArrayList<>());
+      Option<List<String>> fileNames = Option.of(new ArrayList<>());
+
+      // verify write statuses
+      assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, sorted, fileAbsPaths, fileNames);
+
+      // verify rows
+      Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
+      assertOutput(totalInputRows, result, instantTime, fileNames, populateMetaFields);

Review Comment:
   Do we want to validate the hive-style partition path value somewhere?



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -128,7 +133,11 @@ public void write(InternalRow record) throws IOException {
         if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen
           partitionPath = "";
         } else if (simpleKeyGen) { // SimpleKeyGen
-          partitionPath = (record.get(simplePartitionFieldIndex, simplePartitionFieldDataType)).toString();
+          Object parititionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType);
+          partitionPath = parititionPathValue != null ? parititionPathValue.toString() : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
+          if (isHiveStylePartitioning) {
+            partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath;

Review Comment:
   For `SimpleKeyGenerator`, there could be only one partition path field.  Is that correct?



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -128,7 +133,11 @@ public void write(InternalRow record) throws IOException {
         if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen
           partitionPath = "";
         } else if (simpleKeyGen) { // SimpleKeyGen
-          partitionPath = (record.get(simplePartitionFieldIndex, simplePartitionFieldDataType)).toString();
+          Object parititionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType);
+          partitionPath = parititionPathValue != null ? parititionPathValue.toString() : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
+          if (isHiveStylePartitioning) {
+            partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath;

Review Comment:
   @nsivabalan could you simply leverage `SimpleKeyGenerator::getPartitionPath(GenericRecord record)` or `KeyGenUtils::getPartitionPath` API instead of hardcoding the value construction here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org