You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/04 18:38:04 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #5470: [WIP][HUDI-3993][Perf] Replacing UDF in Bulk Insert w/ RDD transformation

nsivabalan commented on code in PR #5470:
URL: https://github.com/apache/hudi/pull/5470#discussion_r864931185


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.BuiltinKeyGenerator
+import org.apache.hudi.table.BulkInsertPartitioner
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+object HoodieDatasetBulkInsertHelper extends Logging {
+
+  /**
+   * Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following steps:
+   *
+   * <ol>
+   *   <li>Invoking configured [[KeyGenerator]] to produce record key, alas partition-path value</li>
+   *   <li>Prepends Hudi meta-fields to every row in the dataset</li>
+   *   <li>Dedupes rows (if necessary)</li>
+   *   <li>Partitions dataset using provided [[partitioner]]</li>
+   * </ol>
+   */
+  def prepareForBulkInsert(df: DataFrame,
+                           config: HoodieWriteConfig,
+                           partitioner: BulkInsertPartitioner[Dataset[Row]],
+                           isGlobalIndex: Boolean,
+                           dropPartitionColumns: Boolean): Dataset[Row] = {
+    val populateMetaFields = config.populateMetaFields()
+    val schema = df.schema
+
+    val keyGeneratorClassName = config.getStringOrThrow(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME,
+      "Key-generator class name is required")
+
+    val prependedRdd: RDD[InternalRow] =
+      df.queryExecution.toRdd.mapPartitions { iter =>

Review Comment:
   does this toRdd incur any perf hit? if yes, can you do some benchmark w/ udfs based vs this and report what do you see. Alternatively you can also, run a benchmark w/ raw parquet write w/ bulk insert row writer non partitioned and no sort mode and ensure we see comparable nos w/ this patch. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.BuiltinKeyGenerator
+import org.apache.hudi.table.BulkInsertPartitioner
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+object HoodieDatasetBulkInsertHelper extends Logging {
+
+  /**
+   * Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following steps:
+   *
+   * <ol>
+   *   <li>Invoking configured [[KeyGenerator]] to produce record key, alas partition-path value</li>
+   *   <li>Prepends Hudi meta-fields to every row in the dataset</li>
+   *   <li>Dedupes rows (if necessary)</li>
+   *   <li>Partitions dataset using provided [[partitioner]]</li>
+   * </ol>
+   */
+  def prepareForBulkInsert(df: DataFrame,
+                           config: HoodieWriteConfig,
+                           partitioner: BulkInsertPartitioner[Dataset[Row]],
+                           isGlobalIndex: Boolean,
+                           dropPartitionColumns: Boolean): Dataset[Row] = {
+    val populateMetaFields = config.populateMetaFields()
+    val schema = df.schema
+
+    val keyGeneratorClassName = config.getStringOrThrow(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME,
+      "Key-generator class name is required")
+
+    val prependedRdd: RDD[InternalRow] =
+      df.queryExecution.toRdd.mapPartitions { iter =>
+        lazy val keyGenerator =
+          ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps))
+            .asInstanceOf[BuiltinKeyGenerator]
+
+        iter.map { row =>
+          val (recordKey, partitionPath) =
+            if (populateMetaFields) {
+              (UTF8String.fromString(keyGenerator.getRecordKey(row, schema)),
+                UTF8String.fromString(keyGenerator.getPartitionPath(row, schema)))
+            } else {
+              (UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8)
+            }
+          val commitTimestamp = UTF8String.EMPTY_UTF8
+          val commitSeqNo = UTF8String.EMPTY_UTF8
+          val filename = UTF8String.EMPTY_UTF8
+          // To minimize # of allocations, we're going to allocate a single array
+          // setting all column values in place for the updated row
+          val newColVals = new Array[Any](schema.fields.length + HoodieRecord.HOODIE_META_COLUMNS.size)
+          // NOTE: Order of the fields have to match that one of `HoodieRecord.HOODIE_META_COLUMNS`
+          newColVals.update(0, commitTimestamp)
+          newColVals.update(1, commitSeqNo)
+          newColVals.update(2, recordKey)
+          newColVals.update(3, partitionPath)
+          newColVals.update(4, filename)
+          // Prepend existing row column values
+          row.toSeq(schema).copyToArray(newColVals, 5)
+          new GenericInternalRow(newColVals)
+        }
+      }
+
+    val metaFields = Seq(
+      StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType))
+
+    val updatedSchema = StructType(metaFields ++ schema.fields)
+    val updatedDF = HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, prependedRdd, updatedSchema)
+
+    if (!populateMetaFields) {
+      updatedDF

Review Comment:
   probably it was a gap before. but we may not have to support dropPartitionColumns even with virtual key code path. can we fix that please



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -66,12 +66,26 @@ protected BuiltinKeyGenerator(TypedProperties config) {
   @Override
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
+    // TODO avoid conversion to avro
+    //      since converterFn is transient this will be repeatedly initialized over and over again
     if (null == converterFn) {
       converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
     }
     return getKey(converterFn.apply(row)).getRecordKey();
   }
 
+  @Override
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public String getRecordKey(InternalRow internalRow, StructType schema) {
+    try {

Review Comment:
   with the changes in my other patch, we don't need to deserialize to Row to fetch the value. Can you take a look



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java:
##########
@@ -51,6 +61,24 @@ public String getRecordKey(GenericRecord record) {
     return nonpartitionedAvroKeyGenerator.getRecordKey(record);
   }
 
+  @Override
+  public String getRecordKey(Row row) {

Review Comment:
   shouldn't we migrate this fix to SimpleKeyGen, if you feel existing impl in SimpleKeyGen could be fixed? why making changes just to NonPartitionedKeyGen only. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -153,7 +152,37 @@ public List<HoodieInternalWriteStatus> getWriteStatuses() throws IOException {
     return writeStatusList;
   }
 
-  public void abort() {
+  public void abort() {}
+
+  public void close() throws IOException {
+    for (HoodieRowCreateHandle rowCreateHandle : handles.values()) {
+      writeStatusList.add(rowCreateHandle.close());
+    }
+    handles.clear();
+    handle = null;
+  }
+
+  private String extractPartitionPath(InternalRow row) {
+    String partitionPath;
+    if (populateMetaFields) {
+      // In case meta-fields are materialized w/in the table itself, we can just simply extract
+      // partition path from there
+      //
+      // NOTE: Helper keeps track of [[lastKnownPartitionPath]] as [[UTF8String]] to avoid
+      //       conversion from Catalyst internal representation into a [[String]]
+      partitionPath = row.getString(
+          HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD));

Review Comment:
   we can directly use 3 here instead of looking up in hashmap



##########
hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/TestHoodieUnsafeRowUtils.scala:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.Test
+
+class TestHoodieUnsafeRowUtils {
+
+  @Test
+  def testComposeNestedFieldPath(): Unit = {
+    val schema = StructType(Seq(
+      StructField("foo", StringType),
+      StructField(
+        name = "bar",
+        dataType = StructType(Seq(
+          StructField("baz", DateType),
+          StructField("bor", LongType)
+        ))
+      )
+    ))
+
+    assertEquals(
+      Seq((1, schema(1)), (0, schema(1).dataType.asInstanceOf[StructType](0))),
+      composeNestedFieldPath(schema, "bar.baz").toSeq)
+
+    assertThrows(classOf[IllegalArgumentException]) { () =>
+      composeNestedFieldPath(schema, "foo.baz")
+    }
+  }
+
+  @Test
+  def testGetNestedRowValue(): Unit = {
+    val schema = StructType(Seq(

Review Comment:
   minor: if you intend to use the same schema for many tests, we can make this an instance variable and not declare in every test. its immutable and so we could even make static final. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.ReflectionUtils
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.BuiltinKeyGenerator
+import org.apache.hudi.table.BulkInsertPartitioner
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+object HoodieDatasetBulkInsertHelper extends Logging {
+
+  /**
+   * Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following steps:
+   *
+   * <ol>
+   *   <li>Invoking configured [[KeyGenerator]] to produce record key, alas partition-path value</li>
+   *   <li>Prepends Hudi meta-fields to every row in the dataset</li>
+   *   <li>Dedupes rows (if necessary)</li>
+   *   <li>Partitions dataset using provided [[partitioner]]</li>
+   * </ol>
+   */
+  def prepareForBulkInsert(df: DataFrame,
+                           config: HoodieWriteConfig,
+                           partitioner: BulkInsertPartitioner[Dataset[Row]],
+                           isGlobalIndex: Boolean,
+                           dropPartitionColumns: Boolean): Dataset[Row] = {
+    val populateMetaFields = config.populateMetaFields()
+    val schema = df.schema
+
+    val keyGeneratorClassName = config.getStringOrThrow(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME,
+      "Key-generator class name is required")
+
+    val prependedRdd: RDD[InternalRow] =
+      df.queryExecution.toRdd.mapPartitions { iter =>
+        lazy val keyGenerator =
+          ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps))
+            .asInstanceOf[BuiltinKeyGenerator]
+
+        iter.map { row =>
+          val (recordKey, partitionPath) =
+            if (populateMetaFields) {
+              (UTF8String.fromString(keyGenerator.getRecordKey(row, schema)),
+                UTF8String.fromString(keyGenerator.getPartitionPath(row, schema)))
+            } else {
+              (UTF8String.EMPTY_UTF8, UTF8String.EMPTY_UTF8)
+            }
+          val commitTimestamp = UTF8String.EMPTY_UTF8
+          val commitSeqNo = UTF8String.EMPTY_UTF8
+          val filename = UTF8String.EMPTY_UTF8
+          // To minimize # of allocations, we're going to allocate a single array
+          // setting all column values in place for the updated row
+          val newColVals = new Array[Any](schema.fields.length + HoodieRecord.HOODIE_META_COLUMNS.size)
+          // NOTE: Order of the fields have to match that one of `HoodieRecord.HOODIE_META_COLUMNS`
+          newColVals.update(0, commitTimestamp)
+          newColVals.update(1, commitSeqNo)
+          newColVals.update(2, recordKey)
+          newColVals.update(3, partitionPath)
+          newColVals.update(4, filename)
+          // Prepend existing row column values
+          row.toSeq(schema).copyToArray(newColVals, 5)
+          new GenericInternalRow(newColVals)
+        }
+      }
+
+    val metaFields = Seq(
+      StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
+      StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType))
+
+    val updatedSchema = StructType(metaFields ++ schema.fields)
+    val updatedDF = HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, prependedRdd, updatedSchema)
+
+    if (!populateMetaFields) {
+      updatedDF
+    } else {
+      val trimmedDF = if (dropPartitionColumns) {
+        val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
+        val partitionPathFields = keyGenerator.getPartitionPathFields.asScala
+        val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.'))
+        if (nestedPartitionPathFields.nonEmpty) {
+          logWarning(s"Can not drop nested partition path fields: $nestedPartitionPathFields")
+        }
+
+        val partitionPathCols = partitionPathFields -- nestedPartitionPathFields
+        updatedDF.drop(partitionPathCols: _*)
+      } else {
+        updatedDF
+      }
+
+      val dedupedDF = if (config.shouldCombineBeforeInsert) {
+        dedupeRows(trimmedDF, config.getPreCombineField, isGlobalIndex)
+      } else {
+        trimmedDF
+      }
+
+      partitioner.repartitionRecords(dedupedDF, config.getBulkInsertShuffleParallelism)
+    }
+  }
+
+  private def dedupeRows(df: DataFrame, preCombineFieldRef: String, isGlobalIndex: Boolean): DataFrame = {
+    val recordKeyMetaFieldOrd = df.schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)

Review Comment:
   this may also need to be fixed for virtual key path, or we can call it out that its not supported for now. even prior to this patch, we did have support for de-duping in virtual key flow in row writer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org