You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/06/06 19:48:26 UTC

[hudi] branch master updated: [HUDI-4171] Fixing Non partitioned with virtual keys in read path (#5747)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7da97c8096 [HUDI-4171] Fixing Non partitioned with virtual keys in read path (#5747)
7da97c8096 is described below

commit 7da97c8096ad4eabb49b42c86013679ed5009451
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Mon Jun 6 15:48:21 2022 -0400

    [HUDI-4171] Fixing Non partitioned with virtual keys in read path (#5747)
    
    - When Non partitioned key gen is used with virtual keys, read path could break since partition path may not exist.
---
 .../hudi/common/testutils/HoodieTestUtils.java     | 11 +++++++-
 .../hadoop/HoodieCopyOnWriteTableInputFormat.java  |  7 +++---
 .../hudi/hadoop/realtime/HoodieVirtualKeyInfo.java | 16 ++++++------
 .../apache/hudi/hadoop/realtime/RealtimeSplit.java | 12 ++++++---
 .../utils/HoodieRealtimeInputFormatUtils.java      |  7 ++++--
 .../hudi/hadoop/TestHoodieParquetInputFormat.java  | 27 ++++++++++++++++++++
 .../hudi/hadoop/testutils/InputFormatTestUtil.java | 29 ++++++++++++++++++----
 7 files changed, 87 insertions(+), 22 deletions(-)

diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index f9c9898f20..4e507436b9 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -103,10 +103,19 @@ public class HoodieTestUtils {
   }
 
   public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
-                                           HoodieFileFormat baseFileFormat)
+                                           HoodieFileFormat baseFileFormat) throws IOException {
+    return init(hadoopConf, basePath, tableType, baseFileFormat, false, null, true);
+  }
+
+  public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
+                                           HoodieFileFormat baseFileFormat, boolean setKeyGen, String keyGenerator, boolean populateMetaFields)
       throws IOException {
     Properties properties = new Properties();
     properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), baseFileFormat.toString());
+    if (setKeyGen) {
+      properties.setProperty("hoodie.datasource.write.keygenerator.class", keyGenerator);
+    }
+    properties.setProperty("hoodie.populate.meta.fields", Boolean.toString(populateMetaFields));
     return init(hadoopConf, basePath, tableType, properties);
   }
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
index 5ae91dc46d..f11ca88d29 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
@@ -43,6 +43,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
@@ -275,16 +276,16 @@ public class HoodieCopyOnWriteTableInputFormat extends HoodieTableInputFormat {
     if (tableConfig.populateMetaFields()) {
       return Option.empty();
     }
-
     TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
     try {
       Schema schema = tableSchemaResolver.getTableAvroSchema();
+      boolean isNonPartitionedKeyGen = StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp());
       return Option.of(
           new HoodieVirtualKeyInfo(
               tableConfig.getRecordKeyFieldProp(),
-              tableConfig.getPartitionFieldProp(),
+              isNonPartitionedKeyGen ? Option.empty() : Option.of(tableConfig.getPartitionFieldProp()),
               schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
-              schema.getField(tableConfig.getPartitionFieldProp()).pos()));
+              isNonPartitionedKeyGen ? Option.empty() : Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos())));
     } catch (Exception exception) {
       throw new HoodieException("Fetching table schema failed with exception ", exception);
     }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieVirtualKeyInfo.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieVirtualKeyInfo.java
index 763b80d4a1..41031e613d 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieVirtualKeyInfo.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieVirtualKeyInfo.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.hadoop.realtime;
 
+import org.apache.hudi.common.util.Option;
+
 import java.io.Serializable;
 
 /**
@@ -26,11 +28,11 @@ import java.io.Serializable;
 public class HoodieVirtualKeyInfo implements Serializable {
 
   private final String recordKeyField;
-  private final String partitionPathField;
+  private final Option<String> partitionPathField;
   private final int recordKeyFieldIndex;
-  private final int partitionPathFieldIndex;
+  private final Option<Integer> partitionPathFieldIndex;
 
-  public HoodieVirtualKeyInfo(String recordKeyField, String partitionPathField, int recordKeyFieldIndex, int partitionPathFieldIndex) {
+  public HoodieVirtualKeyInfo(String recordKeyField, Option<String> partitionPathField, int recordKeyFieldIndex, Option<Integer> partitionPathFieldIndex) {
     this.recordKeyField = recordKeyField;
     this.partitionPathField = partitionPathField;
     this.recordKeyFieldIndex = recordKeyFieldIndex;
@@ -41,7 +43,7 @@ public class HoodieVirtualKeyInfo implements Serializable {
     return recordKeyField;
   }
 
-  public String getPartitionPathField() {
+  public Option<String> getPartitionPathField() {
     return partitionPathField;
   }
 
@@ -49,7 +51,7 @@ public class HoodieVirtualKeyInfo implements Serializable {
     return recordKeyFieldIndex;
   }
 
-  public int getPartitionPathFieldIndex() {
+  public Option<Integer> getPartitionPathFieldIndex() {
     return partitionPathFieldIndex;
   }
 
@@ -57,9 +59,9 @@ public class HoodieVirtualKeyInfo implements Serializable {
   public String toString() {
     return "HoodieVirtualKeyInfo{"
         + "recordKeyField='" + recordKeyField + '\''
-        + ", partitionPathField='" + partitionPathField + '\''
+        + ", partitionPathField='" + (partitionPathField.isPresent() ? partitionPathField.get() : "null") + '\''
         + ", recordKeyFieldIndex=" + recordKeyFieldIndex
-        + ", partitionPathFieldIndex=" + partitionPathFieldIndex
+        + ", partitionPathFieldIndex=" + (partitionPathFieldIndex.isPresent() ? partitionPathFieldIndex.get() : "-1")
         + '}';
   }
 }
\ No newline at end of file
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
index d9b1923c60..4b0b2d6ea7 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java
@@ -107,9 +107,12 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
     } else {
       InputSplitUtils.writeBoolean(true, out);
       InputSplitUtils.writeString(virtualKeyInfoOpt.get().getRecordKeyField(), out);
-      InputSplitUtils.writeString(virtualKeyInfoOpt.get().getPartitionPathField(), out);
       InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getRecordKeyFieldIndex()), out);
-      InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getPartitionPathFieldIndex()), out);
+      InputSplitUtils.writeBoolean(virtualKeyInfoOpt.get().getPartitionPathField().isPresent(), out);
+      if (virtualKeyInfoOpt.get().getPartitionPathField().isPresent()) {
+        InputSplitUtils.writeString(virtualKeyInfoOpt.get().getPartitionPathField().get(), out);
+        InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getPartitionPathFieldIndex()), out);
+      }
     }
   }
 
@@ -130,9 +133,10 @@ public interface RealtimeSplit extends InputSplitWithLocationInfo {
     boolean hoodieVirtualKeyPresent = InputSplitUtils.readBoolean(in);
     if (hoodieVirtualKeyPresent) {
       String recordKeyField = InputSplitUtils.readString(in);
-      String partitionPathField = InputSplitUtils.readString(in);
       int recordFieldIndex = Integer.parseInt(InputSplitUtils.readString(in));
-      int partitionPathIndex = Integer.parseInt(InputSplitUtils.readString(in));
+      boolean isPartitionPathFieldPresent = InputSplitUtils.readBoolean(in);
+      Option<String> partitionPathField = isPartitionPathFieldPresent ? Option.of(InputSplitUtils.readString(in)) : Option.empty();
+      Option<Integer> partitionPathIndex = isPartitionPathFieldPresent ? Option.of(Integer.parseInt(InputSplitUtils.readString(in))) : Option.empty();
       setVirtualKeyInfo(Option.of(new HoodieVirtualKeyInfo(recordKeyField, partitionPathField, recordFieldIndex, partitionPathIndex)));
     }
   }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index eb44769d9f..42038e61f6 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -87,7 +87,9 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
     } else {
       HoodieVirtualKeyInfo hoodieVirtualKey = hoodieVirtualKeyInfo.get();
       addProjectionField(configuration, hoodieVirtualKey.getRecordKeyField(), hoodieVirtualKey.getRecordKeyFieldIndex());
-      addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField(), hoodieVirtualKey.getPartitionPathFieldIndex());
+      if (hoodieVirtualKey.getPartitionPathField().isPresent()) {
+        addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField().get(), hoodieVirtualKey.getPartitionPathFieldIndex().get());
+      }
     }
   }
 
@@ -99,7 +101,8 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
           && readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
     } else {
       return readColNames.contains(hoodieVirtualKeyInfo.get().getRecordKeyField())
-          && readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField());
+          && (hoodieVirtualKeyInfo.get().getPartitionPathField().isPresent() ? readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField().get())
+          : true);
     }
   }
 
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 10ea84f5dc..ccc57d0f61 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -27,12 +27,15 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -40,6 +43,9 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
@@ -55,6 +61,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
@@ -167,6 +174,26 @@ public class TestHoodieParquetInputFormat {
     assertEquals(10, files.length);
   }
 
+  @Test
+  public void testInputFormatLoadForNonPartitionedAndVirtualKeyedTable() throws IOException {
+    // initial commit
+    Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
+    File partitionDir = InputFormatTestUtil.prepareCustomizedTable(basePath, baseFileFormat, 10, "100", true, false,
+        true, schema);
+    HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
+        schema.toString(), HoodieTimeline.COMMIT_ACTION);
+    FileCreateUtils.createCommit(basePath.toString(), "100", Option.of(commitMetadata));
+
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+    InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10);
+    assertEquals(10, inputSplits.length);
+
+    FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length);
+  }
+
   @Test
   public void testInputFormatLoadWithEmptyTable() throws IOException {
     // initial hoodie table
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 1081e43175..db8002cd2d 100644
--- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -71,16 +71,35 @@ public class InputFormatTestUtil {
   private static String TEST_WRITE_TOKEN = "1-0-1";
 
   public static File prepareTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
-                                  String commitNumber)
+                                  String commitNumber) throws IOException {
+    return prepareCustomizedTable(basePath, baseFileFormat, numberOfFiles, commitNumber, false, true, false, null);
+  }
+
+  public static File prepareCustomizedTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
+                                  String commitNumber, boolean useNonPartitionedKeyGen, boolean populateMetaFields, boolean injectData, Schema schema)
       throws IOException {
-    HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
-        baseFileFormat);
+    if (useNonPartitionedKeyGen) {
+      HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
+          baseFileFormat, true, "org.apache.hudi.keygen.NonpartitionedKeyGenerator", populateMetaFields);
+    } else {
+      HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
+          baseFileFormat);
+    }
 
     java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
     setupPartition(basePath, partitionPath);
 
-    return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles,
-        commitNumber);
+    if (injectData) {
+      try {
+        createSimpleData(schema, partitionPath, numberOfFiles, 100, commitNumber);
+        return partitionPath.toFile();
+      } catch (Exception e) {
+        throw new IOException("Excpetion thrown while writing data ", e);
+      }
+    } else {
+      return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles,
+          commitNumber);
+    }
   }
 
   public static File prepareMultiPartitionTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,