You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/19 01:01:06 UTC

[GitHub] [hudi] yihua commented on a diff in pull request #5470: [HUDI-3993] Replacing UDF in Bulk Insert w/ RDD transformation

yihua commented on code in PR #5470:
URL: https://github.com/apache/hudi/pull/5470#discussion_r923937210


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -57,187 +92,153 @@ public int numFields() {
   }
 
   @Override
-  public void setNullAt(int i) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = null;
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = null;
-          break;
-        }
-        case 2: {
-          this.recordKey = null;
-          break;
-        }
-        case 3: {
-          this.partitionPath = null;
-          break;
-        }
-        case 4: {
-          this.fileName = null;
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
-      }
+  public void setNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      metaFields[ordinal] = null;
     } else {
-      row.setNullAt(i);
+      row.setNullAt(rebaseOrdinal(ordinal));
     }
   }
 
   @Override
-  public void update(int i, Object value) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = value.toString();
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = value.toString();
-          break;
-        }
-        case 2: {
-          this.recordKey = value.toString();
-          break;
-        }
-        case 3: {
-          this.partitionPath = value.toString();
-          break;
-        }
-        case 4: {
-          this.fileName = value.toString();
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
+  public void update(int ordinal, Object value) {
+    if (ordinal < metaFields.length) {

Review Comment:
   Do we need to check `containsMetaFields` here?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -24,31 +24,66 @@
 import org.apache.spark.sql.catalyst.util.MapData;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.StringType$;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
+import java.util.Arrays;
+
 /**
- * Internal Row implementation for Hoodie Row. It wraps an {@link InternalRow} and keeps meta columns locally. But the {@link InternalRow}
- * does include the meta columns as well just that {@link HoodieInternalRow} will intercept queries for meta columns and serve from its
- * copy rather than fetching from {@link InternalRow}.
+ * Hudi internal implementation of the {@link InternalRow} allowing to extend arbitrary
+ * {@link InternalRow} overlaying Hudi-internal meta-fields on top of it.
+ *
+ * Capable of overlaying meta-fields in both cases: whether original {@link #row} contains
+ * meta columns or not. This allows to handle following use-cases allowing to avoid any
+ * manipulation (reshuffling) of the source row, by simply creating new instance
+ * of {@link HoodieInternalRow} with all the meta-values provided
+ *
+ * <ul>
+ *   <li>When meta-fields need to be prepended to the source {@link InternalRow}</li>
+ *   <li>When meta-fields need to be updated w/in the source {@link InternalRow}
+ *   ({@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} currently does not
+ *   allow in-place updates due to its memory layout)</li>
+ * </ul>
  */
 public class HoodieInternalRow extends InternalRow {
 
-  private String commitTime;
-  private String commitSeqNumber;
-  private String recordKey;
-  private String partitionPath;
-  private String fileName;
-  private InternalRow row;
-
-  public HoodieInternalRow(String commitTime, String commitSeqNumber, String recordKey, String partitionPath,
-      String fileName, InternalRow row) {
-    this.commitTime = commitTime;
-    this.commitSeqNumber = commitSeqNumber;
-    this.recordKey = recordKey;
-    this.partitionPath = partitionPath;
-    this.fileName = fileName;
+  /**
+   * Collection of meta-fields as defined by {@link HoodieRecord#HOODIE_META_COLUMNS}
+   */
+  private final UTF8String[] metaFields;
+  private final InternalRow row;
+
+  /**
+   * Specifies whether source {@link #row} contains meta-fields
+   */
+  private final boolean containsMetaFields;
+
+  public HoodieInternalRow(UTF8String commitTime,
+                           UTF8String commitSeqNumber,
+                           UTF8String recordKey,
+                           UTF8String partitionPath,
+                           UTF8String fileName,
+                           InternalRow row,
+                           boolean containsMetaFields) {
+    this.metaFields = new UTF8String[] {
+        commitTime,
+        commitSeqNumber,
+        recordKey,
+        partitionPath,
+        fileName
+    };
+
     this.row = row;
+    this.containsMetaFields = containsMetaFields;

Review Comment:
   if `containsMetaFields` is false, should the length of `metaFields` be 0?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -57,187 +92,153 @@ public int numFields() {
   }
 
   @Override
-  public void setNullAt(int i) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = null;
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = null;
-          break;
-        }
-        case 2: {
-          this.recordKey = null;
-          break;
-        }
-        case 3: {
-          this.partitionPath = null;
-          break;
-        }
-        case 4: {
-          this.fileName = null;
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
-      }
+  public void setNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      metaFields[ordinal] = null;
     } else {
-      row.setNullAt(i);
+      row.setNullAt(rebaseOrdinal(ordinal));
     }
   }
 
   @Override
-  public void update(int i, Object value) {
-    if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      switch (i) {
-        case 0: {
-          this.commitTime = value.toString();
-          break;
-        }
-        case 1: {
-          this.commitSeqNumber = value.toString();
-          break;
-        }
-        case 2: {
-          this.recordKey = value.toString();
-          break;
-        }
-        case 3: {
-          this.partitionPath = value.toString();
-          break;
-        }
-        case 4: {
-          this.fileName = value.toString();
-          break;
-        }
-        default: throw new IllegalArgumentException("Not expected");
+  public void update(int ordinal, Object value) {
+    if (ordinal < metaFields.length) {
+      if (value instanceof UTF8String) {
+        metaFields[ordinal] = (UTF8String) value;
+      } else if (value instanceof String) {
+        metaFields[ordinal] = UTF8String.fromString((String) value);
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Could not update the row at (%d) with value of type (%s), either UTF8String or String are expected", ordinal, value.getClass().getSimpleName()));
       }
     } else {
-      row.update(i, value);
+      row.update(rebaseOrdinal(ordinal), value);
     }
   }
 
-  private String getMetaColumnVal(int ordinal) {
-    switch (ordinal) {
-      case 0: {
-        return commitTime;
-      }
-      case 1: {
-        return commitSeqNumber;
-      }
-      case 2: {
-        return recordKey;
-      }
-      case 3: {
-        return partitionPath;
-      }
-      case 4: {
-        return fileName;
-      }
-      default: throw new IllegalArgumentException("Not expected");
+  @Override
+  public boolean isNullAt(int ordinal) {
+    if (ordinal < metaFields.length) {
+      return metaFields[ordinal] == null;
     }
+    return row.isNullAt(rebaseOrdinal(ordinal));
   }
 
   @Override
-  public boolean isNullAt(int ordinal) {
+  public UTF8String getUTF8String(int ordinal) {
+    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+      return metaFields[ordinal];
+    }
+    return row.getUTF8String(rebaseOrdinal(ordinal));
+  }
+
+  @Override
+  public Object get(int ordinal, DataType dataType) {
     if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return null == getMetaColumnVal(ordinal);
+      validateMetaFieldDataType(dataType);
+      return metaFields[ordinal];
     }
-    return row.isNullAt(ordinal);
+    return row.get(rebaseOrdinal(ordinal), dataType);
   }
 
   @Override
   public boolean getBoolean(int ordinal) {
-    return row.getBoolean(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Boolean.class);
+    return row.getBoolean(rebaseOrdinal(ordinal));
   }
 
   @Override
   public byte getByte(int ordinal) {
-    return row.getByte(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Byte.class);
+    return row.getByte(rebaseOrdinal(ordinal));
   }
 
   @Override
   public short getShort(int ordinal) {
-    return row.getShort(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Short.class);
+    return row.getShort(rebaseOrdinal(ordinal));
   }
 
   @Override
   public int getInt(int ordinal) {
-    return row.getInt(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Integer.class);
+    return row.getInt(rebaseOrdinal(ordinal));
   }
 
   @Override
   public long getLong(int ordinal) {
-    return row.getLong(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Long.class);
+    return row.getLong(rebaseOrdinal(ordinal));
   }
 
   @Override
   public float getFloat(int ordinal) {
-    return row.getFloat(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Float.class);
+    return row.getFloat(rebaseOrdinal(ordinal));
   }
 
   @Override
   public double getDouble(int ordinal) {
-    return row.getDouble(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Double.class);
+    return row.getDouble(rebaseOrdinal(ordinal));
   }
 
   @Override
   public Decimal getDecimal(int ordinal, int precision, int scale) {
-    return row.getDecimal(ordinal, precision, scale);
-  }
-
-  @Override
-  public UTF8String getUTF8String(int ordinal) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
-    }
-    return row.getUTF8String(ordinal);
-  }
-
-  @Override
-  public String getString(int ordinal) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return new String(getMetaColumnVal(ordinal).getBytes());
-    }
-    return row.getString(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Decimal.class);
+    return row.getDecimal(rebaseOrdinal(ordinal), precision, scale);
   }
 
   @Override
   public byte[] getBinary(int ordinal) {
-    return row.getBinary(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, Byte[].class);
+    return row.getBinary(rebaseOrdinal(ordinal));
   }
 
   @Override
   public CalendarInterval getInterval(int ordinal) {
-    return row.getInterval(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, CalendarInterval.class);
+    return row.getInterval(rebaseOrdinal(ordinal));
   }
 
   @Override
   public InternalRow getStruct(int ordinal, int numFields) {
-    return row.getStruct(ordinal, numFields);
+    ruleOutMetaFieldsAccess(ordinal, InternalRow.class);
+    return row.getStruct(rebaseOrdinal(ordinal), numFields);
   }
 
   @Override
   public ArrayData getArray(int ordinal) {
-    return row.getArray(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, ArrayData.class);
+    return row.getArray(rebaseOrdinal(ordinal));
   }
 
   @Override
   public MapData getMap(int ordinal) {
-    return row.getMap(ordinal);
+    ruleOutMetaFieldsAccess(ordinal, MapData.class);
+    return row.getMap(rebaseOrdinal(ordinal));
   }
 
   @Override
-  public Object get(int ordinal, DataType dataType) {
-    if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
-      return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
+  public InternalRow copy() {
+    return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), row.copy(), containsMetaFields);
+  }
+
+  private int rebaseOrdinal(int ordinal) {
+    // NOTE: In cases when source row does not contain meta fields, we will have to
+    //       rebase ordinal onto its indexes
+    return containsMetaFields ? ordinal : ordinal - metaFields.length;

Review Comment:
   If the source row does not contain meta fields (`containsMetaFields` is false), and assuming `metaFields` is empty, the logic here for adjusting the ordinal is not necessary?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriter.java:
##########
@@ -37,7 +38,7 @@ public interface HoodieInternalRowFileWriter {
    *
    * @throws IOException on any exception while writing.
    */
-  void writeRow(String key, InternalRow row) throws IOException;
+  void writeRow(UTF8String key, InternalRow row) throws IOException;

Review Comment:
   Is the usage of `UTF8String` type for performance?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java:
##########
@@ -30,7 +30,17 @@
 public class HoodieTimer {
 
   // Ordered stack of TimeInfo's to make sure stopping the timer returns the correct elapsed time
-  Deque<TimeInfo> timeInfoDeque = new ArrayDeque<>();
+  private final Deque<TimeInfo> timeInfoDeque = new ArrayDeque<>();
+
+  public HoodieTimer() {
+    this(false);
+  }
+
+  public HoodieTimer(boolean shouldStart) {
+    if (shouldStart) {
+      startTimer();
+    }
+  }

Review Comment:
   This is not obvious (`timer = new HoodieTime(true)`) compared to exsiting way (`timer = new HoodieTimer().startTimer()`).  Should we revert the change?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -73,18 +75,15 @@ public WriteSupport.FinalizedWriteContext finalizeWrite() {
     return new WriteSupport.FinalizedWriteContext(extraMetaData);
   }
 
-  public void add(String recordKey) {
-    this.bloomFilter.add(recordKey);
-    if (minRecordKey != null) {
-      minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
-    } else {
-      minRecordKey = recordKey;
+  public void add(UTF8String recordKey) {
+    this.bloomFilter.add(recordKey.getBytes());

Review Comment:
   So the content or the byte array of the `String` and `UTF8String` instances should be the same here, right?  So that the bloom filter lookup is not affected based on the String key.



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -88,13 +91,21 @@ public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteCo
     this.populateMetaFields = populateMetaFields;
     this.arePartitionRecordsSorted = arePartitionRecordsSorted;
     this.fileIdPrefix = UUID.randomUUID().toString();
+
     if (!populateMetaFields) {
       this.keyGeneratorOpt = getKeyGenerator(writeConfig.getProps());
-      if (keyGeneratorOpt.isPresent() && keyGeneratorOpt.get() instanceof SimpleKeyGenerator) {
-        simpleKeyGen = true;
-        simplePartitionFieldIndex = (Integer) structType.getFieldIndex((keyGeneratorOpt.get()).getPartitionPathFields().get(0)).get();
-        simplePartitionFieldDataType = structType.fields()[simplePartitionFieldIndex].dataType();
-      }
+    } else {
+      this.keyGeneratorOpt = Option.empty();
+    }
+
+    if (keyGeneratorOpt.isPresent() && keyGeneratorOpt.get() instanceof SimpleKeyGenerator) {
+      this.simpleKeyGen = true;
+      this.simplePartitionFieldIndex = (Integer) structType.getFieldIndex(keyGeneratorOpt.get().getPartitionPathFields().get(0)).get();
+      this.simplePartitionFieldDataType = structType.fields()[simplePartitionFieldIndex].dataType();
+    } else {
+      this.simpleKeyGen = false;
+      this.simplePartitionFieldIndex = -1;
+      this.simplePartitionFieldDataType = null;

Review Comment:
   The question is out-of-scope for this PR, but why do we need special-case handling for the simple key generator here?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -102,20 +114,20 @@ public String getPartitionPath(InternalRow internalRow, StructType structType) {
       return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow, getPartitionPathFields(),
           hiveStylePartitioning, partitionPathSchemaInfo);
     } catch (Exception e) {
-      throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e);
+      throw new HoodieException("Conversion of InternalRow to Row failed with exception", e);
     }
   }
 
   void buildFieldSchemaInfoIfNeeded(StructType structType) {
     if (this.structType == null) {
+      this.structType = structType;

Review Comment:
   Does the change or order have any side effect?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -73,18 +75,15 @@ public WriteSupport.FinalizedWriteContext finalizeWrite() {
     return new WriteSupport.FinalizedWriteContext(extraMetaData);
   }
 
-  public void add(String recordKey) {
-    this.bloomFilter.add(recordKey);
-    if (minRecordKey != null) {
-      minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
-    } else {
-      minRecordKey = recordKey;
+  public void add(UTF8String recordKey) {
+    this.bloomFilter.add(recordKey.getBytes());
+
+    if (minRecordKey == null || minRecordKey.compareTo(recordKey) < 0) {
+      minRecordKey =  recordKey.copy();
     }
 
-    if (maxRecordKey != null) {
-      maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
-    } else {
-      maxRecordKey = recordKey;
+    if (maxRecordKey == null || maxRecordKey.compareTo(recordKey) > 0) {
+      maxRecordKey = recordKey.copy();

Review Comment:
   Could the `copy()` here introduce overhead?  Does it have to be a deep copy?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.client.model.HoodieInternalRow
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.BuiltinKeyGenerator
+import org.apache.hudi.table.BulkInsertPartitioner
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+object HoodieDatasetBulkInsertHelper extends Logging {

Review Comment:
   Is this refactored based on `hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java`?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieUnsafeRowUtils {

Review Comment:
   Are these code copied from Spark OSS?  Wondering if they work across Spark versions.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.client.model.HoodieInternalRow
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.BuiltinKeyGenerator
+import org.apache.hudi.table.BulkInsertPartitioner
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+object HoodieDatasetBulkInsertHelper extends Logging {
+
+  /**
+   * Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following steps:
+   *
+   * <ol>
+   *   <li>Invoking configured [[KeyGenerator]] to produce record key, alas partition-path value</li>
+   *   <li>Prepends Hudi meta-fields to every row in the dataset</li>
+   *   <li>Dedupes rows (if necessary)</li>
+   *   <li>Partitions dataset using provided [[partitioner]]</li>
+   * </ol>
+   */
+  def prepareForBulkInsert(df: DataFrame,
+                           config: HoodieWriteConfig,
+                           partitioner: BulkInsertPartitioner[Dataset[Row]],
+                           isGlobalIndex: Boolean,
+                           dropPartitionColumns: Boolean): Dataset[Row] = {
+    val populateMetaFields = config.populateMetaFields()
+    val schema = df.schema
+
+    val keyGeneratorClassName = config.getStringOrThrow(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME,
+      "Key-generator class name is required")
+
+    val prependedRdd: RDD[InternalRow] =
+      df.queryExecution.toRdd.mapPartitions { iter =>
+        val keyGenerator =
+          ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps))
+            .asInstanceOf[BuiltinKeyGenerator]
+
+        iter.map { row =>
+          val (recordKey, partitionPath) =
+            if (populateMetaFields) {
+              (UTF8String.fromString(keyGenerator.getRecordKey(row, schema)),
+                UTF8String.fromString(keyGenerator.getPartitionPath(row, schema)))
+            } else {
+              (UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8)
+            }
+          val commitTimestamp = UTF8String.EMPTY_UTF8
+          val commitSeqNo = UTF8String.EMPTY_UTF8
+          val filename = UTF8String.EMPTY_UTF8
+
+          // TODO use mutable row, avoid re-allocating
+          new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, row, false)
+        }
+      }
+
+    val metaFields = Seq(

Review Comment:
   nit: use `HoodieRecord.HOODIE_META_COLUMNS` and transformation to form the meta fields?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.client.model.HoodieInternalRow
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.BuiltinKeyGenerator
+import org.apache.hudi.table.BulkInsertPartitioner
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+object HoodieDatasetBulkInsertHelper extends Logging {
+
+  /**
+   * Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following steps:
+   *
+   * <ol>
+   *   <li>Invoking configured [[KeyGenerator]] to produce record key, alas partition-path value</li>
+   *   <li>Prepends Hudi meta-fields to every row in the dataset</li>
+   *   <li>Dedupes rows (if necessary)</li>
+   *   <li>Partitions dataset using provided [[partitioner]]</li>
+   * </ol>
+   */
+  def prepareForBulkInsert(df: DataFrame,
+                           config: HoodieWriteConfig,
+                           partitioner: BulkInsertPartitioner[Dataset[Row]],
+                           isGlobalIndex: Boolean,
+                           dropPartitionColumns: Boolean): Dataset[Row] = {
+    val populateMetaFields = config.populateMetaFields()
+    val schema = df.schema
+
+    val keyGeneratorClassName = config.getStringOrThrow(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME,
+      "Key-generator class name is required")
+
+    val prependedRdd: RDD[InternalRow] =
+      df.queryExecution.toRdd.mapPartitions { iter =>
+        val keyGenerator =
+          ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps))
+            .asInstanceOf[BuiltinKeyGenerator]
+
+        iter.map { row =>
+          val (recordKey, partitionPath) =
+            if (populateMetaFields) {
+              (UTF8String.fromString(keyGenerator.getRecordKey(row, schema)),
+                UTF8String.fromString(keyGenerator.getPartitionPath(row, schema)))
+            } else {
+              (UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8)
+            }
+          val commitTimestamp = UTF8String.EMPTY_UTF8
+          val commitSeqNo = UTF8String.EMPTY_UTF8
+          val filename = UTF8String.EMPTY_UTF8
+
+          // TODO use mutable row, avoid re-allocating
+          new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, row, false)
+        }
+      }
+
+    val metaFields = Seq(
+      StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType))
+
+    val updatedSchema = StructType(metaFields ++ schema.fields)
+    val updatedDF = HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, prependedRdd, updatedSchema)
+
+    if (!populateMetaFields) {
+      updatedDF
+    } else {
+      val trimmedDF = if (dropPartitionColumns) {
+        val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
+        val partitionPathFields = keyGenerator.getPartitionPathFields.asScala
+        val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.'))
+        if (nestedPartitionPathFields.nonEmpty) {
+          logWarning(s"Can not drop nested partition path fields: $nestedPartitionPathFields")
+        }
+
+        val partitionPathCols = partitionPathFields -- nestedPartitionPathFields
+        updatedDF.drop(partitionPathCols: _*)
+      } else {
+        updatedDF
+      }
+
+      val dedupedDF = if (config.shouldCombineBeforeInsert) {
+        dedupeRows(trimmedDF, config.getPreCombineField, isGlobalIndex)
+      } else {
+        trimmedDF
+      }
+
+      partitioner.repartitionRecords(dedupedDF, config.getBulkInsertShuffleParallelism)
+    }
+  }
+
+  private def dedupeRows(df: DataFrame, preCombineFieldRef: String, isGlobalIndex: Boolean): DataFrame = {
+    val recordKeyMetaFieldOrd = df.schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
+    val partitionPathMetaFieldOrd = df.schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
+    // NOTE: Pre-combine field could be a nested field
+    val preCombineFieldPath = composeNestedFieldPath(df.schema, preCombineFieldRef)
+
+    val dedupedRdd = df.queryExecution.toRdd
+      .map { row =>
+        val rowKey = if (isGlobalIndex) {
+          row.getString(recordKeyMetaFieldOrd)
+        } else {
+          val partitionPath = row.getString(partitionPathMetaFieldOrd)
+          val recordKey = row.getString(recordKeyMetaFieldOrd)
+          s"$partitionPath:$recordKey"
+        }
+        // NOTE: It's critical whenever we keep the reference to the row, to make a copy
+        //       since Spark might be providing us with a mutable copy (updated during the iteration)
+        (rowKey, row.copy())

Review Comment:
   is `row.copy()` needed here for `reduceByKey`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org