You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2022/09/23 08:04:48 UTC

[hudi] branch master updated: [HUDI-4559] Support hiveSync command based on Call Produce Command (#6322)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a47dec71c3 [HUDI-4559] Support hiveSync command based on Call Produce Command (#6322)
a47dec71c3 is described below

commit a47dec71c3b6b5faff63512b42814078d0911c21
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Fri Sep 23 16:04:42 2022 +0800

    [HUDI-4559] Support hiveSync command based on Call Produce Command (#6322)
---
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |  33 ++++---
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   6 +-
 .../hudi/command/procedures/BaseProcedure.scala    |  34 +------
 .../command/procedures/HiveSyncProcedure.scala     | 107 +++++++++++++++++++++
 .../hudi/command/procedures/HoodieProcedures.scala |   1 +
 5 files changed, 131 insertions(+), 50 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 7d6db19edf..f3f7e66490 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -55,7 +55,6 @@ trait ProvidesHoodieConfig extends Logging {
 
     require(hoodieCatalogTable.primaryKeys.nonEmpty,
       s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator")
-    val enableHive = isUsingHiveCatalog(sparkSession)
 
     val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
 
@@ -73,8 +72,8 @@ trait ProvidesHoodieConfig extends Logging {
         SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
         OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
         PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
         HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
         HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
         HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
@@ -172,8 +171,6 @@ trait ProvidesHoodieConfig extends Logging {
 
     logInfo(s"Insert statement use write operation type: $operation, payloadClass: $payloadClassName")
 
-    val enableHive = isUsingHiveCatalog(sparkSession)
-
     withSparkConf(sparkSession, catalogProperties) {
       Map(
         "path" -> path,
@@ -191,8 +188,8 @@ trait ProvidesHoodieConfig extends Logging {
         ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
         HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
         HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
         HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
         HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
@@ -211,7 +208,6 @@ trait ProvidesHoodieConfig extends Logging {
                                  hoodieCatalogTable: HoodieCatalogTable,
                                  partitionsToDrop: String): Map[String, String] = {
     val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
-    val enableHive = isUsingHiveCatalog(sparkSession)
     val catalogProperties = hoodieCatalogTable.catalogProperties
     val tableConfig = hoodieCatalogTable.tableConfig
 
@@ -228,8 +224,8 @@ trait ProvidesHoodieConfig extends Logging {
         RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
         PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
         PARTITIONPATH_FIELD.key -> partitionFields,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
         HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
         HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
         HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
@@ -271,8 +267,8 @@ trait ProvidesHoodieConfig extends Logging {
         SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
         OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
         PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
         HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
         HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
         HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
@@ -291,12 +287,23 @@ trait ProvidesHoodieConfig extends Logging {
     hoodieConfig.getProps
   }
 
-  def buildHiveSyncConfig(props: TypedProperties, hoodieCatalogTable: HoodieCatalogTable): HiveSyncConfig = {
+  def buildHiveSyncConfig(
+     props: TypedProperties,
+     hoodieCatalogTable: HoodieCatalogTable,
+     sparkSession: SparkSession = SparkSession.active): HiveSyncConfig = {
+    // Enable the hive sync by default if spark have enable the hive metastore.
+    val enableHive = isUsingHiveCatalog(sparkSession)
     val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig(props)
+    hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_ENABLED.key, enableHive.toString)
+    hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key, enableHive.toString)
+    hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, props.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name()))
     hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_PATH, hoodieCatalogTable.tableLocation)
     hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT, hoodieCatalogTable.baseFileFormat)
     hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_DATABASE_NAME, hoodieCatalogTable.table.identifier.database.getOrElse("default"))
     hiveSyncConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_TABLE_NAME, hoodieCatalogTable.table.identifier.table)
+    if (props.get(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key) != null) {
+      hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS, props.getString(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key))
+    }
     hiveSyncConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS, classOf[MultiPartKeysValueExtractor].getName)
     hiveSyncConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE, "true")
     if (hiveSyncConfig.useBucketSync())
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 7fb2dc1ebd..d403f1998c 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -487,8 +487,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
     val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
     val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
 
-    // Enable the hive sync by default if spark have enable the hive metastore.
-    val enableHive = isUsingHiveCatalog(sparkSession)
     withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
       Map(
         "path" -> path,
@@ -501,9 +499,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
         URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
         KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
         SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
-        HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
+        HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
         HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE),
-        HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> enableHive.toString,
         HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb,
         HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName,
         HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
index 70799d3dc1..d0404664f4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
@@ -24,17 +24,11 @@ import org.apache.hudi.common.model.HoodieRecordPayload
 import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieClusteringException
 import org.apache.hudi.index.HoodieIndex.IndexType
-import org.apache.spark.SparkException
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types._
 
-import java.nio.charset.Charset
-import java.sql.{Date, Timestamp}
-
 abstract class BaseProcedure extends Procedure {
   val INVALID_ARG_INDEX: Int = -1
 
@@ -121,30 +115,4 @@ abstract class BaseProcedure extends Procedure {
       )
   }
 
-  protected def convertCatalystType(value: String, dataType: DataType): Any = {
-    try {
-      val valueWithType = dataType match {
-        case StringType => value
-        case BinaryType => value.getBytes(Charset.forName("utf-8"))
-        case BooleanType => value.toBoolean
-        case DoubleType => value.toDouble
-        case d: DecimalType => Decimal.apply(BigDecimal(value), d.precision, d.scale)
-        case FloatType => value.toFloat
-        case ByteType => value.toByte
-        case IntegerType => value.toInt
-        case LongType => value.toLong
-        case ShortType => value.toShort
-        case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value))
-        case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value))
-        case _ => throw new HoodieClusteringException("Data type not support:" + dataType)
-      }
-
-      valueWithType
-    } catch {
-      case e: HoodieClusteringException =>
-        throw e
-      case _ =>
-        throw new HoodieClusteringException("Data type not match, value:" + value + ", dataType:" + dataType)
-    }
-  }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
new file mode 100644
index 0000000000..24944b1270
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, HiveSyncTool}
+import org.apache.hudi.sync.common.HoodieSyncConfig
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
+
+import java.util.function.Supplier
+
+class HiveSyncProcedure extends BaseProcedure with ProcedureBuilder
+  with ProvidesHoodieConfig with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "metastore_uri", DataTypes.StringType, ""),
+    ProcedureParameter.optional(2, "username", DataTypes.StringType, ""),
+    ProcedureParameter.optional(3, "password", DataTypes.StringType, ""),
+    ProcedureParameter.optional(4, "use_jdbc", DataTypes.StringType, ""),
+    ProcedureParameter.optional(5, "mode", DataTypes.StringType, ""),
+    ProcedureParameter.optional(6, "partition_fields", DataTypes.StringType, ""),
+    ProcedureParameter.optional(7, "partition_extractor_class", DataTypes.StringType, "")
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.StringType, nullable = true, Metadata.empty)
+  ))
+
+  override def build: Procedure = new HiveSyncProcedure
+
+  override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  override def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
+    val metastoreUri = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
+    val username = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
+    val password = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
+    val useJdbc = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[String]
+    val mode = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[String]
+    val partitionFields = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[String]
+    val partitionExtractorClass = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[String]
+
+    val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, tableName)
+    val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
+    val hiveConf = new HiveConf
+    val sqlConf = sparkSession.sqlContext.conf
+
+    if (metastoreUri.nonEmpty) hadoopConf.set(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUri)
+    if (username.nonEmpty) sqlConf.setConfString(HiveSyncConfig.HIVE_USER.key, username)
+    if (password.nonEmpty) sqlConf.setConfString(HiveSyncConfig.HIVE_PASS.key, password)
+    if (useJdbc.nonEmpty) sqlConf.setConfString(HiveSyncConfig.HIVE_USE_JDBC.key, useJdbc)
+    if (mode.nonEmpty) sqlConf.setConfString(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, mode)
+    if (partitionFields.nonEmpty) sqlConf.setConfString(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key, partitionFields)
+    if (partitionExtractorClass.nonEmpty) sqlConf.setConfString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key, partitionExtractorClass)
+
+    hiveConf.addResource(hadoopConf)
+
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val hoodieProps = getHoodieProps(hoodieCatalogTable.catalogProperties, tableConfig, sqlConf)
+    val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
+
+    var hiveSyncTool: HiveSyncTool = null
+    try {
+      hiveSyncTool = new HiveSyncTool(hiveSyncConfig.getProps, hiveConf)
+      hiveSyncTool.syncHoodieTable()
+    } catch {
+      case e: RuntimeException => throw new HoodieException("hive sync failed", e)
+    } finally {
+      if (hiveSyncTool != null) hiveSyncTool.close()
+    }
+
+    Seq(Row("hive sync success."))
+  }
+}
+
+object HiveSyncProcedure {
+  val NAME = "hive_sync"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new HiveSyncProcedure
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 2eee942ff0..b2bbec8489 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -79,6 +79,7 @@ object HoodieProcedures {
       ,(RunCleanProcedure.NAME, RunCleanProcedure.builder)
       ,(ValidateHoodieSyncProcedure.NAME, ValidateHoodieSyncProcedure.builder)
       ,(ShowInvalidParquetProcedure.NAME, ShowInvalidParquetProcedure.builder)
+      ,(HiveSyncProcedure.NAME, HiveSyncProcedure.builder)
     )
   }
 }