You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "alexeykudinkin (via GitHub)" <gi...@apache.org> on 2023/01/29 05:07:05 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089793370


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -505,31 +495,6 @@ object HoodieSparkSqlWriter {
     HoodieAvroUtils.removeFields(schema, partitionColumns.toSet.asJava)
   }
 
-  def generateSparkSchemaWithoutPartitionColumns(partitionParam: String, schema: StructType): StructType = {

Review Comment:
   Dead code



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -82,12 +84,11 @@ object HoodieSparkSqlWriter {
             optParams: Map[String, String],
             df: DataFrame,
             hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
-            hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
-            asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
-            asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
-            extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty)
-  : (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
-    SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
+            hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,

Review Comment:
   This was just search-and-replace removing invalid type references 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -308,14 +175,351 @@ object HoodieInternalRowUtils {
         }
       case _ =>
     }
+
     if (value == None) {
       throw new HoodieException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
     } else {
       CatalystTypeConverters.convertToCatalyst(value)
     }
   }
+   */
+  
+  private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+  private def genUnsafeRowWriterRenaming(prevStructType: StructType, newStructType: StructType, renamedColumnsMap: JMap[String, String], fieldNames: JDeque[String]): (CatalystDataUpdater, Any) => Unit = {
+    // TODO need to canonicalize schemas (casing)
+    val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+    val positionMap = ArrayBuffer.empty[Int]
+
+    for (newField <- newStructType.fields) {
+      fieldNames.push(newField.name)
+
+      val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+        prevStructType.getFieldIndex(newField.name) match {
+          case Some(prevFieldPos) =>
+            val prevField = prevStructType(prevFieldPos)
+            (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNames), prevFieldPos)
+
+          case None =>
+            val newFieldQualifiedName = createFullName(fieldNames)
+            val prevFieldName: String = lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+            // Handle rename
+            prevStructType.getFieldIndex(prevFieldName) match {
+              case Some(prevFieldPos) =>
+                val prevField = prevStructType.fields(prevFieldPos)
+                (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNames), prevFieldPos)
+
+              case None =>
+                // TODO handle defaults
+                val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) => fieldUpdater.setNullAt(ordinal)
+                (updater, -1)
+            }
+        }
+
+      fieldWriters += fieldWriter
+      positionMap += prevFieldPos
+
+      fieldNames.pop()
+    }
+
+    (fieldUpdater, row) => {
+      var pos = 0
+      while (pos < fieldWriters.length) {
+        val prevPos = positionMap(pos)
+        val prevValue = if (prevPos >= 0) {
+          row.asInstanceOf[InternalRow].get(prevPos, prevStructType.fields(prevPos).dataType)
+        } else {
+          null
+        }
+
+        fieldWriters(pos)(fieldUpdater, pos, prevValue)
+        pos += 1
+      }
+    }
+  }
+
+  private def newWriterRenaming(prevDataType: DataType,
+                                newDataType: DataType,
+                                renamedColumnsMap: JMap[String, String],
+                                fieldNames: JDeque[String]): RowFieldUpdater = {
+    (prevDataType, newDataType) match {
+      case (prevType, newType) if prevType == newType =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
+
+      case (prevStructType: StructType, newStructType: StructType) =>
+        val writer = genUnsafeRowWriterRenaming(prevStructType, newStructType, renamedColumnsMap, fieldNames)
+        val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
+        val rowUpdater = new RowUpdater(newRow)
+
+        (fieldUpdater, ordinal, value) => {
+          // TODO elaborate

Review Comment:
   Note to self to explain particular piece



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org