You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/06/09 12:38:03 UTC

[hudi] branch master updated: [HUDI-4213] Infer keygen clazz for Spark SQL (#5815)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c608dbd6c2 [HUDI-4213] Infer keygen clazz for Spark SQL (#5815)
c608dbd6c2 is described below

commit c608dbd6c2d00f29f350ad1ed0fd04f7eb3c1b6b
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Jun 9 20:37:58 2022 +0800

    [HUDI-4213] Infer keygen clazz for Spark SQL (#5815)
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  | 34 +++++++++++++---------
 .../spark/sql/hudi/command/SqlKeyGenerator.scala   |  9 +++---
 2 files changed, 25 insertions(+), 18 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 0102870e92..819c4b55a9 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
 
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig}
+import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, TypedProperties}
 import org.apache.hudi.common.fs.ConsistencyGuardConfig
 import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
@@ -323,22 +323,12 @@ object DataSourceWriteOptions {
   val HIVE_STYLE_PARTITIONING = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE
 
   /**
-    * Key generator class, that implements will extract the key out of incoming record
-    *
+    * Key generator class, that implements will extract the key out of incoming record.
     */
   val keyGeneraterInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => {
-    if (!p.contains(PARTITIONPATH_FIELD)) {
-      Option.of(classOf[NonpartitionedKeyGenerator].getName)
-    } else {
-      val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length
-      val numOfRecordKeyFields = p.getString(RECORDKEY_FIELD).split(",").length
-      if (numOfPartFields == 1 && numOfRecordKeyFields == 1) {
-        Option.of(classOf[SimpleKeyGenerator].getName)
-      } else {
-        Option.of(classOf[ComplexKeyGenerator].getName)
-      }
-    }
+    Option.of(DataSourceOptionsHelper.inferKeyGenClazz(p.getProps))
   })
+
   val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.keygenerator.class")
     .defaultValue(classOf[SimpleKeyGenerator].getName)
@@ -804,6 +794,22 @@ object DataSourceOptionsHelper {
     ) ++ translateConfigurations(parameters)
   }
 
+  def inferKeyGenClazz(props: TypedProperties): String = {
+    val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null)
+    if (partitionFields != null) {
+      val numPartFields = partitionFields.split(",").length
+      val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
+      val numRecordKeyFields = recordsKeyFields.split(",").length
+      if (numPartFields == 1 && numRecordKeyFields == 1) {
+        classOf[SimpleKeyGenerator].getName
+      } else {
+        classOf[ComplexKeyGenerator].getName
+      }
+    } else {
+      classOf[NonpartitionedKeyGenerator].getName
+    }
+  }
+
   implicit def scalaFunctionToJavaFunction[From, To](function: (From) => To): JavaFunction[From, To] = {
     new JavaFunction[From, To] {
       override def apply (input: From): To = function (input)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index 9d139389fd..798ed84b09 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hudi.command
 
 import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.DataSourceOptionsHelper
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.util.PartitionPathEncodeUtils
 import org.apache.hudi.config.HoodieWriteConfig
@@ -113,14 +114,14 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
     } else partitionPath
   }
 
-  override def getPartitionPath(record: GenericRecord) = {
+  override def getPartitionPath(record: GenericRecord): String = {
     val partitionPath = super.getPartitionPath(record)
-    convertPartitionPathToSqlType(partitionPath, false)
+    convertPartitionPathToSqlType(partitionPath, rowType = false)
   }
 
   override def getPartitionPath(row: Row): String = {
     val partitionPath = super.getPartitionPath(row)
-    convertPartitionPathToSqlType(partitionPath, true)
+    convertPartitionPathToSqlType(partitionPath, rowType = true)
   }
 }
 
@@ -135,7 +136,7 @@ object SqlKeyGenerator {
     if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) {
       HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName)
     } else {
-      classOf[ComplexKeyGenerator].getCanonicalName
+      DataSourceOptionsHelper.inferKeyGenClazz(props)
     }
   }
 }