You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/28 22:17:26 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5462: [HUDI-3995] Making pref optimizations for bulk insert row writer path

alexeykudinkin commented on code in PR #5462:
URL: https://github.com/apache/hudi/pull/5462#discussion_r861339922


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -97,88 +98,69 @@ public String getPartitionPath(Row row) {
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getPartitionPath(InternalRow internalRow, StructType structType) {
     try {
-      initDeserializer(structType);
-      Row row = sparkRowSerDe.deserializeRow(internalRow);
-      return getPartitionPath(row);
+      buildFieldSchemaInfoIfNeeded(structType);
+      return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow, getPartitionPathFields(),
+          hiveStylePartitioning, partitionPathSchemaInfo);
     } catch (Exception e) {
       throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e);
     }
   }
 
-  private void initDeserializer(StructType structType) {
-    if (sparkRowSerDe == null) {
-      sparkRowSerDe = HoodieSparkUtils.getDeserializer(structType);
-    }
-  }
-
-  void buildFieldPositionMapIfNeeded(StructType structType) {
+  void buildFieldSchemaInfoIfNeeded(StructType structType) {
     if (this.structType == null) {
       // parse simple fields
       getRecordKeyFields().stream()
           .filter(f -> !(f.contains(".")))
           .forEach(f -> {
             if (structType.getFieldIndex(f).isDefined()) {
-              recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
+              int fieldIndex = (int) structType.getFieldIndex(f).get();

Review Comment:
   Why do need this bifurcation b/w "simple" and "nested" fields?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -107,16 +107,15 @@ public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, S
   /**
    * Writes an {@link InternalRow} to the underlying HoodieInternalRowFileWriter. Before writing, value for meta columns are computed as required
    * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is what gets written to HoodieInternalRowFileWriter.
+   *
    * @param record instance of {@link InternalRow} that needs to be written to the fileWriter.
    * @throws IOException
    */
   public void write(InternalRow record) throws IOException {
     try {
-      String partitionPath = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
-          HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
+      String partitionPath = record.getUTF8String(3).toString();

Review Comment:
   Let's create constants for these 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java:
##########
@@ -234,13 +237,14 @@ public static Object getNestedFieldVal(Row row, List<Integer> positions) {
    * @param structType  schema of interest
    * @param field       field of interest for which the positions are requested for
    * @param isRecordKey {@code true} if the field requested for is a record key. {@code false} in case of a partition path.
-   * @return the positions of the field as per the struct type.
+   * @return the positions of the field as per the struct type and the root fields datatype.
    */
-  public static List<Integer> getNestedFieldIndices(StructType structType, String field, boolean isRecordKey) {
+  public static Pair<List<Integer>, DataType> getNestedFieldSchemaInfo(StructType structType, String field, boolean isRecordKey) {

Review Comment:
   What do we need `isRecordKey` for?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -97,88 +98,69 @@ public String getPartitionPath(Row row) {
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getPartitionPath(InternalRow internalRow, StructType structType) {
     try {
-      initDeserializer(structType);
-      Row row = sparkRowSerDe.deserializeRow(internalRow);
-      return getPartitionPath(row);
+      buildFieldSchemaInfoIfNeeded(structType);
+      return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow, getPartitionPathFields(),
+          hiveStylePartitioning, partitionPathSchemaInfo);
     } catch (Exception e) {
       throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e);
     }
   }
 
-  private void initDeserializer(StructType structType) {
-    if (sparkRowSerDe == null) {
-      sparkRowSerDe = HoodieSparkUtils.getDeserializer(structType);
-    }
-  }
-
-  void buildFieldPositionMapIfNeeded(StructType structType) {
+  void buildFieldSchemaInfoIfNeeded(StructType structType) {
     if (this.structType == null) {
       // parse simple fields
       getRecordKeyFields().stream()
           .filter(f -> !(f.contains(".")))
           .forEach(f -> {
             if (structType.getFieldIndex(f).isDefined()) {
-              recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
+              int fieldIndex = (int) structType.getFieldIndex(f).get();
+              recordKeySchemaInfo.put(f, Pair.of(Collections.singletonList((fieldIndex)), structType.fields()[fieldIndex].dataType()));
             } else {
               throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\"");
             }
           });
       // parse nested fields
       getRecordKeyFields().stream()
           .filter(f -> f.contains("."))
-          .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
+          .forEach(f -> recordKeySchemaInfo.put(f, RowKeyGeneratorHelper.getNestedFieldSchemaInfo(structType, f, true)));
       // parse simple fields
       if (getPartitionPathFields() != null) {
         getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
             .forEach(f -> {
               if (structType.getFieldIndex(f).isDefined()) {
-                partitionPathPositions.put(f,
-                    Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
+                int fieldIndex = (int) structType.getFieldIndex(f).get();
+                partitionPathSchemaInfo.put(f,
+                    Pair.of(Collections.singletonList(fieldIndex), structType.fields()[fieldIndex].dataType()));
               } else {
-                partitionPathPositions.put(f, Collections.singletonList(-1));
+                partitionPathSchemaInfo.put(f, Pair.of(Collections.singletonList(-1), null));
               }
             });
         // parse nested fields
         getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
-            .forEach(f -> partitionPathPositions.put(f,
-                RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
+            .forEach(f -> partitionPathSchemaInfo.put(f,
+                RowKeyGeneratorHelper.getNestedFieldSchemaInfo(structType, f, false)));
       }
       this.structType = structType;
     }
   }
 
   protected String getPartitionPathInternal(InternalRow row, StructType structType) {
-    buildFieldDataTypesMapIfNeeded(structType);
+    buildFieldSchemaInfoIfNeeded(structType);
     validatePartitionFieldsForInternalRow();
     return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(row, getPartitionPathFields(),
-        hiveStylePartitioning, partitionPathPositions, partitionPathDataTypes);
+        hiveStylePartitioning, partitionPathSchemaInfo);
   }
 
   protected void validatePartitionFieldsForInternalRow() {

Review Comment:
   We should run validation once when we build the mapping, instead of doing it for every lookup



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java:
##########
@@ -57,18 +61,18 @@ public class HoodieDatasetBulkInsertHelper {
 
   /**
    * Prepares input hoodie spark dataset for bulk insert. It does the following steps.
-   *  1. Uses KeyGenerator to generate hoodie record keys and partition path.
-   *  2. Add hoodie columns to input spark dataset.
-   *  3. Reorders input dataset columns so that hoodie columns appear in the beginning.
-   *  4. Sorts input dataset by hoodie partition path and record key
+   * 1. Uses KeyGenerator to generate hoodie record keys and partition path.
+   * 2. Add hoodie columns to input spark dataset.
+   * 3. Reorders input dataset columns so that hoodie columns appear in the beginning.
+   * 4. Sorts input dataset by hoodie partition path and record key
    *
    * @param sqlContext SQL Context
-   * @param config Hoodie Write Config
-   * @param rows Spark Input dataset
+   * @param config     Hoodie Write Config
+   * @param rows       Spark Input dataset
    * @return hoodie dataset which is ready for bulk insert.
    */
   public static Dataset<Row> prepareHoodieDatasetForBulkInsert(SQLContext sqlContext,
-      HoodieWriteConfig config, Dataset<Row> rows, String structName, String recordNamespace,
+                                                               HoodieWriteConfig config, Dataset<Row> rows, String structName, String recordNamespace,

Review Comment:
   As we've discussed, i'm going to rewrite this method as part of HUDI-3993, there's no point in changing this method



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -386,12 +386,14 @@ public void validateTableProperties(Properties properties, WriteOperationType op
       throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back");
     }
 
-    // meta fields can be disabled only with SimpleKeyGenerator
-    if (!getTableConfig().populateMetaFields()
-        && !properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator")
-        .equals("org.apache.hudi.keygen.SimpleKeyGenerator")) {
-      throw new HoodieException("Only simple key generator is supported when meta fields are disabled. KeyGenerator used : "
-          + properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key()));
+    // meta fields can be disabled only with SimpleKeyGenerator, NonPartitioned and ComplexKeyGen.
+    if (!getTableConfig().populateMetaFields()) {

Review Comment:
   Why is this changing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org