You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/05 00:04:25 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5462: [HUDI-3995] Making pref optimizations for bulk insert row writer path

alexeykudinkin commented on code in PR #5462:
URL: https://github.com/apache/hudi/pull/5462#discussion_r865419555


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -97,87 +101,69 @@ public String getPartitionPath(Row row) {
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getPartitionPath(InternalRow internalRow, StructType structType) {
     try {
-      initDeserializer(structType);
-      Row row = sparkRowSerDe.deserializeRow(internalRow);
-      return getPartitionPath(row);
+      buildFieldSchemaInfoIfNeeded(structType);
+      return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow, getPartitionPathFields(),
+          hiveStylePartitioning, partitionPathSchemaInfo);
     } catch (Exception e) {
       throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e);
     }
   }
 
-  private void initDeserializer(StructType structType) {
-    if (sparkRowSerDe == null) {
-      sparkRowSerDe = HoodieSparkUtils.getDeserializer(structType);
-    }
-  }
-
-  void buildFieldPositionMapIfNeeded(StructType structType) {
+  void buildFieldSchemaInfoIfNeeded(StructType structType) {
     if (this.structType == null) {
-      // parse simple fields
-      getRecordKeyFields().stream()
-          .filter(f -> !(f.contains(".")))
+      getRecordKeyFields()
+          .stream().filter(f -> !f.isEmpty())
           .forEach(f -> {
-            if (structType.getFieldIndex(f).isDefined()) {
-              recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
+            if (f.contains(DOT_STRING)) {

Review Comment:
   We don't need this conditional -- simple field ref is a special case of nested field-ref and so should be handled by the same path that handles nested fields



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -18,48 +18,51 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.PublicAPIMethod;
-import org.apache.hudi.client.utils.SparkRowSerDe;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructType;
-import scala.Function1;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import scala.Function1;
 
 /**
  * Base class for the built-in key generators. Contains methods structured for
  * code reuse amongst them.
  */
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements SparkKeyGeneratorInterface {
 
+  private static final String DOT_STRING = ".";
   private static final String STRUCT_NAME = "hoodieRowTopLevelField";
   private static final String NAMESPACE = "hoodieRow";
-  private transient Function1<Row, GenericRecord> converterFn = null;
-  private SparkRowSerDe sparkRowSerDe;
+  private Function1<Row, GenericRecord> converterFn = null;
   protected StructType structType;
+  private static AtomicBoolean validatePartitionFields = new AtomicBoolean(false);

Review Comment:
   Why is this static?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -97,87 +101,69 @@ public String getPartitionPath(Row row) {
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getPartitionPath(InternalRow internalRow, StructType structType) {
     try {
-      initDeserializer(structType);
-      Row row = sparkRowSerDe.deserializeRow(internalRow);
-      return getPartitionPath(row);
+      buildFieldSchemaInfoIfNeeded(structType);
+      return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow, getPartitionPathFields(),
+          hiveStylePartitioning, partitionPathSchemaInfo);
     } catch (Exception e) {
       throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e);
     }
   }
 
-  private void initDeserializer(StructType structType) {
-    if (sparkRowSerDe == null) {
-      sparkRowSerDe = HoodieSparkUtils.getDeserializer(structType);
-    }
-  }
-
-  void buildFieldPositionMapIfNeeded(StructType structType) {
+  void buildFieldSchemaInfoIfNeeded(StructType structType) {
     if (this.structType == null) {
-      // parse simple fields
-      getRecordKeyFields().stream()
-          .filter(f -> !(f.contains(".")))
+      getRecordKeyFields()
+          .stream().filter(f -> !f.isEmpty())
           .forEach(f -> {
-            if (structType.getFieldIndex(f).isDefined()) {
-              recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
+            if (f.contains(DOT_STRING)) {
+              // nested field
+              recordKeySchemaInfo.put(f, RowKeyGeneratorHelper.getNestedFieldSchemaInfo(structType, f, true));
             } else {
-              throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\"");
+              // simple field
+              if (structType.getFieldIndex(f).isDefined()) {
+                int fieldIndex = (int) structType.getFieldIndex(f).get();
+                recordKeySchemaInfo.put(f, Pair.of(Collections.singletonList((fieldIndex)), structType.fields()[fieldIndex].dataType()));
+              } else {
+                throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\"");
+              }
             }
           });
-      // parse nested fields
-      getRecordKeyFields().stream()
-          .filter(f -> f.contains("."))
-          .forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
-      // parse simple fields
       if (getPartitionPathFields() != null) {
-        getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
+        getPartitionPathFields().stream().filter(f -> !f.isEmpty())
             .forEach(f -> {
-              if (structType.getFieldIndex(f).isDefined()) {
-                partitionPathPositions.put(f,
-                    Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
+              // nested field

Review Comment:
   Same comment as above



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java:
##########
@@ -42,7 +42,9 @@
   public static final String OPERATION_METADATA_FIELD = "_hoodie_operation";
   public static final String HOODIE_IS_DELETED = "_hoodie_is_deleted";
 
-  public static int FILENAME_METADATA_FIELD_POS = 4;
+  public static int RECORD_KEY_METAD_FIELD_POS = 2;

Review Comment:
   Let's tie this back to `HOODIE_META_COLUMNS_NAME_TO_POS`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java:
##########
@@ -234,13 +237,14 @@ public static Object getNestedFieldVal(Row row, List<Integer> positions) {
    * @param structType  schema of interest
    * @param field       field of interest for which the positions are requested for
    * @param isRecordKey {@code true} if the field requested for is a record key. {@code false} in case of a partition path.
-   * @return the positions of the field as per the struct type.
+   * @return the positions of the field as per the struct type and the root fields datatype.
    */
-  public static List<Integer> getNestedFieldIndices(StructType structType, String field, boolean isRecordKey) {
+  public static Pair<List<Integer>, DataType> getNestedFieldSchemaInfo(StructType structType, String field, boolean isRecordKey) {

Review Comment:
   This is a low-level utility that should not be aware what it reads -- it should simply be able to fetch particular field, provided the nested field path, and its up to caller to decide how to handle different cases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org