You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/30 06:39:16 UTC

[hudi] 09/10: [HUDI-4237] Fixing empty partition-values being sync'd to HMS (#6821)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6fee77b76fa25be677b324804eae9801ca6b9f4c
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Thu Sep 29 09:07:56 2022 -0700

    [HUDI-4237] Fixing empty partition-values being sync'd to HMS (#6821)
    
    Co-authored-by: dujunling <du...@bytedance.com>
    Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
 .../hudi/common/table/HoodieTableConfig.java       |   6 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  13 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   2 +-
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |   8 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |   6 +-
 .../spark/sql/hudi/command/SqlKeyGenerator.scala   | 131 +++++++++++++--------
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   2 +-
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |   2 +-
 .../sql/hudi/procedure/TestRepairsProcedure.scala  |  35 +++++-
 9 files changed, 138 insertions(+), 67 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 6b64ec4897..89d01b53a6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -501,9 +501,13 @@ public class HoodieTableConfig extends HoodieConfig {
 
   /**
    * @returns the partition field prop.
+   * @deprecated please use {@link #getPartitionFields()} instead
    */
+  @Deprecated
   public String getPartitionFieldProp() {
-    return getString(PARTITION_FIELDS);
+    // NOTE: We're adding a stub returning empty string to stay compatible w/ pre-existing
+    //       behavior until this method is fully deprecated
+    return Option.ofNullable(getString(PARTITION_FIELDS)).orElse("");
   }
 
   /**
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 6370a0752e..9ce5086322 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -31,6 +31,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.sync.common.util.ConfigUtils
+import org.apache.hudi.util.JFunction
 import org.apache.log4j.LogManager
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils}
 
@@ -321,14 +322,14 @@ object DataSourceWriteOptions {
   /**
     * Key generator class, that implements will extract the key out of incoming record.
     */
-  val keyGeneraterInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => {
-    Option.of(DataSourceOptionsHelper.inferKeyGenClazz(p.getProps))
+  val keyGeneratorInferFunc = JFunction.toJavaFunction((config: HoodieConfig) => {
+    Option.of(DataSourceOptionsHelper.inferKeyGenClazz(config.getProps))
   })
 
   val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.keygenerator.class")
     .defaultValue(classOf[SimpleKeyGenerator].getName)
-    .withInferFunction(keyGeneraterInferFunc)
+    .withInferFunction(keyGeneratorInferFunc)
     .withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator`")
 
   val KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED: ConfigProperty[String] = KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED
@@ -801,12 +802,6 @@ object DataSourceOptionsHelper {
     }
   }
 
-  implicit def scalaFunctionToJavaFunction[From, To](function: (From) => To): JavaFunction[From, To] = {
-    new JavaFunction[From, To] {
-      override def apply (input: From): To = function (input)
-    }
-  }
-
   implicit def convert[T, U](prop: ConfigProperty[T])(implicit converter: T => U): ConfigProperty[U] = {
     checkState(prop.hasDefaultValue)
     var newProp: ConfigProperty[U] = ConfigProperty.key(prop.key())
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index aa29bad6b0..335fe68bd2 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -112,7 +112,7 @@ object HoodieWriterUtils {
   def getOriginKeyGenerator(parameters: Map[String, String]): String = {
     val kg = parameters.getOrElse(KEYGENERATOR_CLASS_NAME.key(), null)
     if (classOf[SqlKeyGenerator].getCanonicalName == kg) {
-      parameters.getOrElse(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
+      parameters.getOrElse(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null)
     } else {
       kg
     }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index f135772320..c31cd3b205 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -190,12 +190,18 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
     } else {
       val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table)
       val schema = SchemaConverters.toAvroType(finalSchema, false, recordName, namespace)
+      val partitionColumns = if (table.partitionColumnNames.isEmpty) {
+        null
+      } else {
+        table.partitionColumnNames.mkString(",")
+      }
+
       HoodieTableMetaClient.withPropertyBuilder()
         .fromProperties(properties)
         .setDatabaseName(catalogDatabaseName)
         .setTableName(table.identifier.table)
         .setTableCreateSchema(schema.toString())
-        .setPartitionFields(table.partitionColumnNames.mkString(","))
+        .setPartitionFields(partitionColumns)
         .initTable(hadoopConf, tableLocation)
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index f3f7e66490..332455ea21 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -69,7 +69,7 @@ trait ProvidesHoodieConfig extends Logging {
         HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
         URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
         KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-        SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+        SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
         OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
         PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
@@ -180,7 +180,7 @@ trait ProvidesHoodieConfig extends Logging {
         HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
         URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
         KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-        SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
+        SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
         RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
         PRECOMBINE_FIELD.key -> preCombineField,
         PARTITIONPATH_FIELD.key -> partitionFieldsStr,
@@ -264,7 +264,7 @@ trait ProvidesHoodieConfig extends Logging {
         HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
         URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
         KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-        SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+        SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
         OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
         PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index 798ed84b09..01c995fed4 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -18,24 +18,28 @@
 package org.apache.spark.sql.hudi.command
 
 import org.apache.avro.generic.GenericRecord
-import org.apache.hudi.DataSourceOptionsHelper
 import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model.HoodieKey
 import org.apache.hudi.common.util.PartitionPathEncodeUtils
+import org.apache.hudi.common.util.ValidationUtils.checkArgument
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.keygen._
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.{StructType, TimestampType}
+import org.apache.spark.unsafe.types.UTF8String
 import org.joda.time.format.DateTimeFormat
 
 import java.sql.Timestamp
+import java.util
+import java.util.Collections
 import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS}
 
 /**
- * A complex key generator for sql command which do some process for the
- * timestamp data type partition field.
+ * Custom Spark-specific [[KeyGenerator]] overriding behavior handling [[TimestampType]] partition values
  */
-class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) {
+class SqlKeyGenerator(props: TypedProperties) extends BuiltinKeyGenerator(props) {
 
   private lazy val partitionSchema = {
     val partitionSchema = props.getString(SqlKeyGenerator.PARTITION_SCHEMA, "")
@@ -45,37 +49,91 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
       None
     }
   }
-  // The origin key generator class for this table.
-  private lazy val originKeyGen = {
-    val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
-    if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) {
-      val keyGenProps = new TypedProperties()
-      keyGenProps.putAll(props)
-      keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME)
-      val convertedKeyGenClassName = SqlKeyGenerator.getRealKeyGenClassName(props)
-      keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName)
-      Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
-    } else {
-      None
+
+  private lazy val complexKeyGen = new ComplexKeyGenerator(props)
+  private lazy val originalKeyGen =
+    Option(props.getString(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null))
+      .map { originalKeyGenClassName =>
+        checkArgument(originalKeyGenClassName.nonEmpty)
+
+        val convertedKeyGenClassName = HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(originalKeyGenClassName)
+
+        val keyGenProps = new TypedProperties()
+        keyGenProps.putAll(props)
+        keyGenProps.remove(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME)
+        keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, convertedKeyGenClassName)
+
+        KeyGenUtils.createKeyGeneratorByClassName(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
+      }
+
+  override def getRecordKey(record: GenericRecord): String =
+    originalKeyGen.map {
+      _.getKey(record).getRecordKey
+    } getOrElse {
+      complexKeyGen.getRecordKey(record)
     }
+
+  override def getRecordKey(row: Row): String =
+    originalKeyGen.map {
+      _.getRecordKey(row)
+    } getOrElse {
+      complexKeyGen.getRecordKey(row)
+    }
+
+
+  override def getRecordKey(internalRow: InternalRow, schema: StructType): UTF8String =
+    originalKeyGen.map {
+      _.getRecordKey(internalRow, schema)
+    } getOrElse {
+      complexKeyGen.getRecordKey(internalRow, schema)
+    }
+
+  override def getPartitionPath(record: GenericRecord): String = {
+    val partitionPath = originalKeyGen.map {
+      _.getKey(record).getPartitionPath
+    } getOrElse {
+      complexKeyGen.getPartitionPath(record)
+    }
+
+    convertPartitionPathToSqlType(partitionPath, rowType = false)
   }
 
-  override def getRecordKey(record: GenericRecord): String = {
-    if (originKeyGen.isDefined) {
-      originKeyGen.get.getKey(record).getRecordKey
-    } else {
-      super.getRecordKey(record)
+  override def getPartitionPath(row: Row): String = {
+    val partitionPath = originalKeyGen.map {
+      _.getPartitionPath(row)
+    } getOrElse {
+      complexKeyGen.getPartitionPath(row)
     }
+
+    convertPartitionPathToSqlType(partitionPath, rowType = true)
   }
 
-  override def getRecordKey(row: Row): String = {
-    if (originKeyGen.isDefined) {
-      originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row)
-    } else {
-      super.getRecordKey(row)
+  override def getPartitionPath(internalRow: InternalRow, schema: StructType): UTF8String = {
+    val partitionPath = originalKeyGen.map {
+      _.getPartitionPath(internalRow, schema)
+    } getOrElse {
+      complexKeyGen.getPartitionPath(internalRow, schema)
+    }
+
+    UTF8String.fromString(convertPartitionPathToSqlType(partitionPath.toString, rowType = true))
+  }
+
+  override def getRecordKeyFieldNames: util.List[String] = {
+    originalKeyGen.map(_.getRecordKeyFieldNames)
+      .getOrElse(complexKeyGen.getRecordKeyFieldNames)
+  }
+
+  override def getPartitionPathFields: util.List[String] = {
+    originalKeyGen.map {
+      case bkg: BaseKeyGenerator => bkg.getPartitionPathFields
+      case _ =>
+        Option(super.getPartitionPathFields).getOrElse(Collections.emptyList[String])
+    } getOrElse {
+      complexKeyGen.getPartitionPathFields
     }
   }
 
+  // TODO clean up
   private def convertPartitionPathToSqlType(partitionPath: String, rowType: Boolean): String = {
     if (partitionSchema.isDefined) {
       // we can split the partitionPath here because we enable the URL_ENCODE_PARTITIONING_OPT
@@ -113,30 +171,11 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
       }
     } else partitionPath
   }
-
-  override def getPartitionPath(record: GenericRecord): String = {
-    val partitionPath = super.getPartitionPath(record)
-    convertPartitionPathToSqlType(partitionPath, rowType = false)
-  }
-
-  override def getPartitionPath(row: Row): String = {
-    val partitionPath = super.getPartitionPath(row)
-    convertPartitionPathToSqlType(partitionPath, rowType = true)
-  }
 }
 
 object SqlKeyGenerator {
   val PARTITION_SCHEMA = "hoodie.sql.partition.schema"
-  val ORIGIN_KEYGEN_CLASS_NAME = "hoodie.sql.origin.keygen.class"
+  val ORIGINAL_KEYGEN_CLASS_NAME = "hoodie.sql.origin.keygen.class"
   private val timestampTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
   private val sqlTimestampFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.S")
-
-  def getRealKeyGenClassName(props: TypedProperties): String = {
-    val beforeKeyGenClassName = props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
-    if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) {
-      HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName)
-    } else {
-      DataSourceOptionsHelper.inferKeyGenClazz(props)
-    }
-  }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 17ff34c909..2761a00205 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -534,7 +534,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
         HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
         URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
         KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
-        SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+        SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
         HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
         HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE),
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 60d870a05f..4e4fe43ff9 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -1064,7 +1064,7 @@ class TestHoodieSparkSqlWriter {
     // for sql write
     val m2 = Map(
       HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getName,
-      SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName
+      SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> classOf[SimpleKeyGenerator].getName
     )
     val kg2 = HoodieWriterUtils.getOriginKeyGenerator(m2)
     assertTrue(kg2 == classOf[SimpleKeyGenerator].getName)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
index f6ce92b415..86dbae5b1e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, SchemaTestUtil}
 import org.apache.hudi.testutils.HoodieSparkWriteableTestTable
 import org.apache.spark.api.java.JavaSparkContext
+import org.junit.jupiter.api.Assertions.assertEquals
 
 import java.io.IOException
 import java.net.URL
@@ -109,10 +110,36 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase {
       val newProps: URL = this.getClass.getClassLoader.getResource("table-config.properties")
 
       // overwrite hoodie props
-      val Result = spark.sql(s"""call repair_overwrite_hoodie_props(table => '$tableName', new_props_file_path => '${newProps.getPath}')""").collect()
-      assertResult(15) {
-        Result.length
-      }
+      val expectedOutput ="""
+          |[hoodie.archivelog.folder,archived,archive]
+          |[hoodie.database.name,default,null]
+          |[hoodie.datasource.write.drop.partition.columns,false,false]
+          |[hoodie.datasource.write.hive_style_partitioning,true,null]
+          |[hoodie.datasource.write.partitionpath.urlencode,false,null]
+          |[hoodie.table.checksum,,]
+          |[hoodie.table.create.schema,,]
+          |[hoodie.table.keygenerator.class,org.apache.hudi.keygen.NonpartitionedKeyGenerator,null]
+          |[hoodie.table.name,,]
+          |[hoodie.table.precombine.field,ts,null]
+          |[hoodie.table.recordkey.fields,id,null]
+          |[hoodie.table.type,COPY_ON_WRITE,COPY_ON_WRITE]
+          |[hoodie.table.version,,]
+          |[hoodie.timeline.layout.version,,]""".stripMargin.trim
+
+      val actual = spark.sql(s"""call repair_overwrite_hoodie_props(table => '$tableName', new_props_file_path => '${newProps.getPath}')""")
+        .collect()
+        .map {
+          // omit these properties with variant values
+          case row if row.getString(0).equals("hoodie.table.checksum") => "[hoodie.table.checksum,,]"
+          case row if row.getString(0).equals("hoodie.table.create.schema") => "[hoodie.table.create.schema,,]"
+          case row if row.getString(0).equals("hoodie.table.name") => "[hoodie.table.name,,]"
+          case row if row.getString(0).equals("hoodie.table.version") => "[hoodie.table.version,,]"
+          case row if row.getString(0).equals("hoodie.timeline.layout.version") => "[hoodie.timeline.layout.version,,]"
+          case o => o.toString()
+        }
+        .mkString("\n")
+
+      assertEquals(expectedOutput, actual)
     }
   }