You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/30 06:39:16 UTC
[hudi] 09/10: [HUDI-4237] Fixing empty partition-values being sync'd to HMS (#6821)
This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 6fee77b76fa25be677b324804eae9801ca6b9f4c
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Sep 29 09:07:56 2022 -0700
[HUDI-4237] Fixing empty partition-values being sync'd to HMS (#6821)
Co-authored-by: dujunling <du...@bytedance.com>
Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
.../hudi/common/table/HoodieTableConfig.java | 6 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 13 +-
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 2 +-
.../sql/catalyst/catalog/HoodieCatalogTable.scala | 8 +-
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 6 +-
.../spark/sql/hudi/command/SqlKeyGenerator.scala | 131 +++++++++++++--------
.../hudi/command/MergeIntoHoodieTableCommand.scala | 2 +-
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +-
.../sql/hudi/procedure/TestRepairsProcedure.scala | 35 +++++-
9 files changed, 138 insertions(+), 67 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 6b64ec4897..89d01b53a6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -501,9 +501,13 @@ public class HoodieTableConfig extends HoodieConfig {
/**
* @returns the partition field prop.
+ * @deprecated please use {@link #getPartitionFields()} instead
*/
+ @Deprecated
public String getPartitionFieldProp() {
- return getString(PARTITION_FIELDS);
+ // NOTE: We're adding a stub returning empty string to stay compatible w/ pre-existing
+ // behavior until this method is fully deprecated
+ return Option.ofNullable(getString(PARTITION_FIELDS)).orElse("");
}
/**
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 6370a0752e..9ce5086322 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -31,6 +31,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.ConfigUtils
+import org.apache.hudi.util.JFunction
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils}
@@ -321,14 +322,14 @@ object DataSourceWriteOptions {
/**
* Key generator class, that implements will extract the key out of incoming record.
*/
- val keyGeneraterInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => {
- Option.of(DataSourceOptionsHelper.inferKeyGenClazz(p.getProps))
+ val keyGeneratorInferFunc = JFunction.toJavaFunction((config: HoodieConfig) => {
+ Option.of(DataSourceOptionsHelper.inferKeyGenClazz(config.getProps))
})
val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.keygenerator.class")
.defaultValue(classOf[SimpleKeyGenerator].getName)
- .withInferFunction(keyGeneraterInferFunc)
+ .withInferFunction(keyGeneratorInferFunc)
.withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator`")
val KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED: ConfigProperty[String] = KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED
@@ -801,12 +802,6 @@ object DataSourceOptionsHelper {
}
}
- implicit def scalaFunctionToJavaFunction[From, To](function: (From) => To): JavaFunction[From, To] = {
- new JavaFunction[From, To] {
- override def apply (input: From): To = function (input)
- }
- }
-
implicit def convert[T, U](prop: ConfigProperty[T])(implicit converter: T => U): ConfigProperty[U] = {
checkState(prop.hasDefaultValue)
var newProp: ConfigProperty[U] = ConfigProperty.key(prop.key())
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index aa29bad6b0..335fe68bd2 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -112,7 +112,7 @@ object HoodieWriterUtils {
def getOriginKeyGenerator(parameters: Map[String, String]): String = {
val kg = parameters.getOrElse(KEYGENERATOR_CLASS_NAME.key(), null)
if (classOf[SqlKeyGenerator].getCanonicalName == kg) {
- parameters.getOrElse(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
+ parameters.getOrElse(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null)
} else {
kg
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index f135772320..c31cd3b205 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -190,12 +190,18 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
} else {
val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table)
val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace)
+ val partitionColumns = if (table.partitionColumnNames.isEmpty) {
+ null
+ } else {
+ table.partitionColumnNames.mkString(",")
+ }
+
HoodieTableMetaClient.withPropertyBuilder()
.fromProperties(properties)
.setDatabaseName(catalogDatabaseName)
.setTableName(table.identifier.table)
.setTableCreateSchema(schema.toString())
- .setPartitionFields(table.partitionColumnNames.mkString(","))
+ .setPartitionFields(partitionColumns)
.initTable(hadoopConf, tableLocation)
}
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index f3f7e66490..332455ea21 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -69,7 +69,7 @@ trait ProvidesHoodieConfig extends Logging {
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
- SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+ SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
@@ -180,7 +180,7 @@ trait ProvidesHoodieConfig extends Logging {
HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
- SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
+ SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PRECOMBINE_FIELD.key -> preCombineField,
PARTITIONPATH_FIELD.key -> partitionFieldsStr,
@@ -264,7 +264,7 @@ trait ProvidesHoodieConfig extends Logging {
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
- SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+ SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index 798ed84b09..01c995fed4 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -18,24 +18,28 @@
package org.apache.spark.sql.hudi.command
import org.apache.avro.generic.GenericRecord
-import org.apache.hudi.DataSourceOptionsHelper
import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieKey
import org.apache.hudi.common.util.PartitionPathEncodeUtils
+import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{StructType, TimestampType}
+import org.apache.spark.unsafe.types.UTF8String
import org.joda.time.format.DateTimeFormat
import java.sql.Timestamp
+import java.util
+import java.util.Collections
import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS}
/**
- * A complex key generator for sql command which do some process for the
- * timestamp data type partition field.
+ * Custom Spark-specific [[KeyGenerator]] overriding behavior handling [[TimestampType]] partition values
*/
-class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) {
+class SqlKeyGenerator(props: TypedProperties) extends BuiltinKeyGenerator(props) {
private lazy val partitionSchema = {
val partitionSchema = props.getString(SqlKeyGenerator.PARTITION_SCHEMA, "")
@@ -45,37 +49,91 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
None
}
}
- // The origin key generator class for this table.
- private lazy val originKeyGen = {
- val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
- if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) {
- val keyGenProps = new TypedProperties()
- keyGenProps.putAll(props)
- keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME)
- val convertedKeyGenClassName = SqlKeyGenerator.getRealKeyGenClassName(props)
- keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName)
- Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
- } else {
- None
+
+ private lazy val complexKeyGen = new ComplexKeyGenerator(props)
+ private lazy val originalKeyGen =
+ Option(props.getString(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null))
+ .map { originalKeyGenClassName =>
+ checkArgument(originalKeyGenClassName.nonEmpty)
+
+ val convertedKeyGenClassName = HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(originalKeyGenClassName)
+
+ val keyGenProps = new TypedProperties()
+ keyGenProps.putAll(props)
+ keyGenProps.remove(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME)
+ keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName)
+
+ KeyGenUtils.createKeyGeneratorByClassName(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
+ }
+
+ override def getRecordKey(record: GenericRecord): String =
+ originalKeyGen.map {
+ _.getKey(record).getRecordKey
+ } getOrElse {
+ complexKeyGen.getRecordKey(record)
}
+
+ override def getRecordKey(row: Row): String =
+ originalKeyGen.map {
+ _.getRecordKey(row)
+ } getOrElse {
+ complexKeyGen.getRecordKey(row)
+ }
+
+
+ override def getRecordKey(internalRow: InternalRow, schema: StructType): UTF8String =
+ originalKeyGen.map {
+ _.getRecordKey(internalRow, schema)
+ } getOrElse {
+ complexKeyGen.getRecordKey(internalRow, schema)
+ }
+
+ override def getPartitionPath(record: GenericRecord): String = {
+ val partitionPath = originalKeyGen.map {
+ _.getKey(record).getPartitionPath
+ } getOrElse {
+ complexKeyGen.getPartitionPath(record)
+ }
+
+ convertPartitionPathToSqlType(partitionPath, rowType = false)
}
- override def getRecordKey(record: GenericRecord): String = {
- if (originKeyGen.isDefined) {
- originKeyGen.get.getKey(record).getRecordKey
- } else {
- super.getRecordKey(record)
+ override def getPartitionPath(row: Row): String = {
+ val partitionPath = originalKeyGen.map {
+ _.getPartitionPath(row)
+ } getOrElse {
+ complexKeyGen.getPartitionPath(row)
}
+
+ convertPartitionPathToSqlType(partitionPath, rowType = true)
}
- override def getRecordKey(row: Row): String = {
- if (originKeyGen.isDefined) {
- originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row)
- } else {
- super.getRecordKey(row)
+ override def getPartitionPath(internalRow: InternalRow, schema: StructType): UTF8String = {
+ val partitionPath = originalKeyGen.map {
+ _.getPartitionPath(internalRow, schema)
+ } getOrElse {
+ complexKeyGen.getPartitionPath(internalRow, schema)
+ }
+
+ UTF8String.fromString(convertPartitionPathToSqlType(partitionPath.toString, rowType = true))
+ }
+
+ override def getRecordKeyFieldNames: util.List[String] = {
+ originalKeyGen.map(_.getRecordKeyFieldNames)
+ .getOrElse(complexKeyGen.getRecordKeyFieldNames)
+ }
+
+ override def getPartitionPathFields: util.List[String] = {
+ originalKeyGen.map {
+ case bkg: BaseKeyGenerator => bkg.getPartitionPathFields
+ case _ =>
+ Option(super.getPartitionPathFields).getOrElse(Collections.emptyList[String])
+ } getOrElse {
+ complexKeyGen.getPartitionPathFields
}
}
+ // TODO clean up
private def convertPartitionPathToSqlType(partitionPath: String, rowType: Boolean): String = {
if (partitionSchema.isDefined) {
// we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT
@@ -113,30 +171,11 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
}
} else partitionPath
}
-
- override def getPartitionPath(record: GenericRecord): String = {
- val partitionPath = super.getPartitionPath(record)
- convertPartitionPathToSqlType(partitionPath, rowType = false)
- }
-
- override def getPartitionPath(row: Row): String = {
- val partitionPath = super.getPartitionPath(row)
- convertPartitionPathToSqlType(partitionPath, rowType = true)
- }
}
object SqlKeyGenerator {
val PARTITION_SCHEMA = "hoodie.sql.partition.schema"
- val ORIGIN_KEYGEN_CLASS_NAME = "hoodie.sql.origin.keygen.class"
+ val ORIGINAL_KEYGEN_CLASS_NAME = "hoodie.sql.origin.keygen.class"
private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
private val sqlTimestampFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.S")
-
- def getRealKeyGenClassName(props: TypedProperties): String = {
- val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
- if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) {
- HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName)
- } else {
- DataSourceOptionsHelper.inferKeyGenClazz(props)
- }
- }
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 17ff34c909..2761a00205 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -534,7 +534,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
- SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+ SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE),
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 60d870a05f..4e4fe43ff9 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -1064,7 +1064,7 @@ class TestHoodieSparkSqlWriter {
// for sql write
val m2 = Map(
HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getName,
- SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName
+ SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName
)
val kg2 = HoodieWriterUtils.getOriginKeyGenerator(m2)
assertTrue(kg2 == classOf[SimpleKeyGenerator].getName)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
index f6ce92b415..86dbae5b1e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, SchemaTestUtil}
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable
import org.apache.spark.api.java.JavaSparkContext
+import org.junit.jupiter.api.Assertions.assertEquals
import java.io.IOException
import java.net.URL
@@ -109,10 +110,36 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase {
val newProps: URL = this.getClass.getClassLoader.getResource("table-config.properties")
// overwrite hoodie props
- val Result = spark.sql(s"""call repair_overwrite_hoodie_props(table => '$tableName', new_props_file_path => '${newProps.getPath}')""").collect()
- assertResult(15) {
- Result.length
- }
+ val expectedOutput ="""
+ |[hoodie.archivelog.folder,archived,archive]
+ |[hoodie.database.name,default,null]
+ |[hoodie.datasource.write.drop.partition.columns,false,false]
+ |[hoodie.datasource.write.hive_style_partitioning,true,null]
+ |[hoodie.datasource.write.partitionpath.urlencode,false,null]
+ |[hoodie.table.checksum,,]
+ |[hoodie.table.create.schema,,]
+ |[hoodie.table.keygenerator.class,org.apache.hudi.keygen.NonpartitionedKeyGenerator,null]
+ |[hoodie.table.name,,]
+ |[hoodie.table.precombine.field,ts,null]
+ |[hoodie.table.recordkey.fields,id,null]
+ |[hoodie.table.type,COPY_ON_WRITE,COPY_ON_WRITE]
+ |[hoodie.table.version,,]
+ |[hoodie.timeline.layout.version,,]""".stripMargin.trim
+
+ val actual = spark.sql(s"""call repair_overwrite_hoodie_props(table => '$tableName', new_props_file_path => '${newProps.getPath}')""")
+ .collect()
+ .map {
+ // omit these properties with variant values
+ case row if row.getString(0).equals("hoodie.table.checksum") => "[hoodie.table.checksum,,]"
+ case row if row.getString(0).equals("hoodie.table.create.schema") => "[hoodie.table.create.schema,,]"
+ case row if row.getString(0).equals("hoodie.table.name") => "[hoodie.table.name,,]"
+ case row if row.getString(0).equals("hoodie.table.version") => "[hoodie.table.version,,]"
+ case row if row.getString(0).equals("hoodie.timeline.layout.version") => "[hoodie.timeline.layout.version,,]"
+ case o => o.toString()
+ }
+ .mkString("\n")
+
+ assertEquals(expectedOutput, actual)
}
}