You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/06/09 12:38:03 UTC
[hudi] branch master updated: [HUDI-4213] Infer keygen clazz for Spark SQL (#5815)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c608dbd6c2 [HUDI-4213] Infer keygen clazz for Spark SQL (#5815)
c608dbd6c2 is described below
commit c608dbd6c2d00f29f350ad1ed0fd04f7eb3c1b6b
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Jun 9 20:37:58 2022 +0800
[HUDI-4213] Infer keygen clazz for Spark SQL (#5815)
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 34 +++++++++++++---------
.../spark/sql/hudi/command/SqlKeyGenerator.scala | 9 +++---
2 files changed, 25 insertions(+), 18 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 0102870e92..819c4b55a9 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig}
+import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, TypedProperties}
import org.apache.hudi.common.fs.ConsistencyGuardConfig
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
@@ -323,22 +323,12 @@ object DataSourceWriteOptions {
val HIVE_STYLE_PARTITIONING = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE
/**
- * Key generator class, that implements will extract the key out of incoming record
- *
+ * Key generator class, that implements will extract the key out of incoming record.
*/
val keyGeneraterInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => {
- if (!p.contains(PARTITIONPATH_FIELD)) {
- Option.of(classOf[NonpartitionedKeyGenerator].getName)
- } else {
- val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length
- val numOfRecordKeyFields = p.getString(RECORDKEY_FIELD).split(",").length
- if (numOfPartFields == 1 && numOfRecordKeyFields == 1) {
- Option.of(classOf[SimpleKeyGenerator].getName)
- } else {
- Option.of(classOf[ComplexKeyGenerator].getName)
- }
- }
+ Option.of(DataSourceOptionsHelper.inferKeyGenClazz(p.getProps))
})
+
val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.keygenerator.class")
.defaultValue(classOf[SimpleKeyGenerator].getName)
@@ -804,6 +794,22 @@ object DataSourceOptionsHelper {
) ++ translateConfigurations(parameters)
}
+ def inferKeyGenClazz(props: TypedProperties): String = {
+ val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null)
+ if (partitionFields != null) {
+ val numPartFields = partitionFields.split(",").length
+ val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
+ val numRecordKeyFields = recordsKeyFields.split(",").length
+ if (numPartFields == 1 && numRecordKeyFields == 1) {
+ classOf[SimpleKeyGenerator].getName
+ } else {
+ classOf[ComplexKeyGenerator].getName
+ }
+ } else {
+ classOf[NonpartitionedKeyGenerator].getName
+ }
+ }
+
implicit def scalaFunctionToJavaFunction[From, To](function: (From) => To): JavaFunction[From, To] = {
new JavaFunction[From, To] {
override def apply (input: From): To = function (input)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index 9d139389fd..798ed84b09 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi.command
import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.DataSourceOptionsHelper
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig
@@ -113,14 +114,14 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
} else partitionPath
}
- override def getPartitionPath(record: GenericRecord) = {
+ override def getPartitionPath(record: GenericRecord): String = {
val partitionPath = super.getPartitionPath(record)
- convertPartitionPathToSqlType(partitionPath, false)
+ convertPartitionPathToSqlType(partitionPath, rowType = false)
}
override def getPartitionPath(row: Row): String = {
val partitionPath = super.getPartitionPath(row)
- convertPartitionPathToSqlType(partitionPath, true)
+ convertPartitionPathToSqlType(partitionPath, rowType = true)
}
}
@@ -135,7 +136,7 @@ object SqlKeyGenerator {
if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) {
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName)
} else {
- classOf[ComplexKeyGenerator].getCanonicalName
+ DataSourceOptionsHelper.inferKeyGenClazz(props)
}
}
}