You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/09/16 05:58:10 UTC
[hudi] branch master updated: [HUDI-4813] Fix infer keygen not work in sparksql side issue (#6634)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3faddb7da0 [HUDI-4813] Fix infer keygen not work in sparksql side issue (#6634)
3faddb7da0 is described below
commit 3faddb7da09e5e11d1b126ba49cea4ebdeba8fc7
Author: FocusComputing <xi...@gmail.com>
AuthorDate: Fri Sep 16 13:58:05 2022 +0800
[HUDI-4813] Fix infer keygen not work in sparksql side issue (#6634)
* [HUDI-4813] Fix infer keygen not work in sparksql side issue
Co-authored-by: xiaoxingstack <xi...@didiglobal.com>
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 10 ++-
.../sql/catalyst/catalog/HoodieCatalogTable.scala | 9 ++-
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +-
.../apache/spark/sql/hudi/TestCreateTable.scala | 74 +++++++++++++++++++++-
4 files changed, 86 insertions(+), 9 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index c694174b8c..e8ffb09ff9 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.common.config.{ConfigProperty, DFSPropertiesConfiguration
import org.apache.hudi.common.fs.ConsistencyGuardConfig
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.common.util.Option
+import org.apache.hudi.common.util.{Option, StringUtils}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, HiveSyncTool}
@@ -787,9 +787,13 @@ object DataSourceOptionsHelper {
def inferKeyGenClazz(props: TypedProperties): String = {
val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null)
- if (partitionFields != null) {
+ val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
+ inferKeyGenClazz(recordsKeyFields, partitionFields)
+ }
+
+ def inferKeyGenClazz(recordsKeyFields: String, partitionFields: String): String = {
+ if (!StringUtils.isNullOrEmpty(partitionFields)) {
val numPartFields = partitionFields.split(",").length
- val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
val numRecordKeyFields = recordsKeyFields.split(",").length
if (numPartFields == 1 && numRecordKeyFields == 1) {
classOf[SimpleKeyGenerator].getName
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 09981e845a..f135772320 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -17,19 +17,19 @@
package org.apache.spark.sql.catalyst.catalog
-import org.apache.hudi.AvroConversionUtils
import org.apache.hudi.DataSourceWriteOptions.OPERATION
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
-import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hudi.HoodieOptionConfig
+import org.apache.spark.sql.hudi.HoodieOptionConfig.SQL_KEY_TABLE_PRIMARY_KEY
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -288,7 +288,10 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
} else {
- extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName
+ val primaryKeys = table.properties.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName).getOrElse(SQL_KEY_TABLE_PRIMARY_KEY.defaultValue.get)
+ val partitions = table.partitionColumnNames.mkString(",")
+ extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
+ DataSourceOptionsHelper.inferKeyGenClazz(primaryKeys, partitions)
}
extraConfig.toMap
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 89f0acaf01..60d870a05f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -874,7 +874,7 @@ class TestHoodieSparkSqlWriter {
.setBasePath(tablePath1).build().getTableConfig
assert(tableConfig1.getHiveStylePartitioningEnable == "true")
assert(tableConfig1.getUrlEncodePartitioning == "false")
- assert(tableConfig1.getKeyGeneratorClassName == classOf[ComplexKeyGenerator].getName)
+ assert(tableConfig1.getKeyGeneratorClassName == classOf[SimpleKeyGenerator].getName)
df.write.format("hudi")
.options(options)
.option(HoodieWriteConfig.TBL_NAME.key, tableName1)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index d3dbf9a6e6..6a6b41da7f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.types._
-
import org.junit.jupiter.api.Assertions.assertFalse
import scala.collection.JavaConverters._
@@ -137,7 +136,7 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
assertResult("dt")(tableConfig(HoodieTableConfig.PARTITION_FIELDS.key))
assertResult("id")(tableConfig(HoodieTableConfig.RECORDKEY_FIELDS.key))
assertResult("ts")(tableConfig(HoodieTableConfig.PRECOMBINE_FIELD.key))
- assertResult(classOf[ComplexKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
+ assertResult(classOf[SimpleKeyGenerator].getCanonicalName)(tableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
assertResult("default")(tableConfig(HoodieTableConfig.DATABASE_NAME.key()))
assertResult(tableName)(tableConfig(HoodieTableConfig.NAME.key()))
assertFalse(tableConfig.contains(OPERATION.key()))
@@ -944,4 +943,75 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
spark.sql("use default")
}
+
+ test("Test Infer KegGenClazz") {
+ def checkKeyGenerator(targetGenerator: String, tableName: String) = {
+ val tablePath = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).location.getPath
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(tablePath)
+ .setConf(spark.sessionState.newHadoopConf())
+ .build()
+ val realKeyGenerator =
+ metaClient.getTableConfig.getProps.asScala.toMap.get(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key).get
+ assertResult(targetGenerator)(realKeyGenerator)
+ }
+
+ val tableName = generateTableName
+
+ // Test Nonpartitioned table
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | comment "This is a simple hudi table"
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ checkKeyGenerator("org.apache.hudi.keygen.NonpartitionedKeyGenerator", tableName)
+ spark.sql(s"drop table $tableName")
+
+ // Test single partitioned table
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | comment "This is a simple hudi table"
+ | partitioned by (ts)
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ checkKeyGenerator("org.apache.hudi.keygen.SimpleKeyGenerator", tableName)
+ spark.sql(s"drop table $tableName")
+
+ // Test single partitioned dual record keys table
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | comment "This is a simple hudi table"
+ | partitioned by (ts)
+ | tblproperties (
+ | primaryKey = 'id,name',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ checkKeyGenerator("org.apache.hudi.keygen.ComplexKeyGenerator", tableName)
+ spark.sql(s"drop table $tableName")
+ }
}