You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2022/09/23 08:04:48 UTC
[hudi] branch master updated: [HUDI-4559] Support hiveSync command based on Call Produce Command (#6322)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a47dec71c3 [HUDI-4559] Support hiveSync command based on Call Produce Command (#6322)
a47dec71c3 is described below
commit a47dec71c3b6b5faff63512b42814078d0911c21
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Fri Sep 23 16:04:42 2022 +0800
[HUDI-4559] Support hiveSync command based on Call Produce Command (#6322)
---
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 33 ++++---
.../hudi/command/MergeIntoHoodieTableCommand.scala | 6 +-
.../hudi/command/procedures/BaseProcedure.scala | 34 +------
.../command/procedures/HiveSyncProcedure.scala | 107 +++++++++++++++++++++
.../hudi/command/procedures/HoodieProcedures.scala | 1 +
5 files changed, 131 insertions(+), 50 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 7d6db19edf..f3f7e66490 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -55,7 +55,6 @@ trait ProvidesHoodieConfig extends Logging {
require(hoodieCatalogTable.primaryKeys.nonEmpty,
s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator")
- val enableHive = isUsingHiveCatalog(sparkSession)
val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
@@ -73,8 +72,8 @@ trait ProvidesHoodieConfig extends Logging {
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
- HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
- HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
@@ -172,8 +171,6 @@ trait ProvidesHoodieConfig extends Logging {
logInfo(s"Insert statement use write operation type: $operation, payloadClass: $payloadClassName")
- val enableHive = isUsingHiveCatalog(sparkSession)
-
withSparkConf(sparkSession, catalogProperties) {
Map(
"path" -> path,
@@ -191,8 +188,8 @@ trait ProvidesHoodieConfig extends Logging {
ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
- HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
- HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
@@ -211,7 +208,6 @@ trait ProvidesHoodieConfig extends Logging {
hoodieCatalogTable: HoodieCatalogTable,
partitionsToDrop: String): Map[String, String] = {
val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
- val enableHive = isUsingHiveCatalog(sparkSession)
val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig
@@ -228,8 +224,8 @@ trait ProvidesHoodieConfig extends Logging {
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
PARTITIONPATH_FIELD.key -> partitionFields,
- HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
- HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
@@ -271,8 +267,8 @@ trait ProvidesHoodieConfig extends Logging {
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
- HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
- HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()),
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
@@ -291,12 +287,23 @@ trait ProvidesHoodieConfig extends Logging {
hoodieConfig.getProps
}
- def buildHiveSyncConfig(props: TypedProperties, hoodieCatalogTable: HoodieCatalogTable): HiveSyncConfig = {
+ def buildHiveSyncConfig(
+ props: TypedProperties,
+ hoodieCatalogTable: HoodieCatalogTable,
+ sparkSession: SparkSession = SparkSession.active): HiveSyncConfig = {
+ // Enable the hive sync by default if spark have enable the hive metastore.
+ val enableHive = isUsingHiveCatalog(sparkSession)
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig(props)
+ hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_ENABLED.key, enableHive.toString)
+ hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key, enableHive.toString)
+ hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, props.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name()))
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_PATH, hoodieCatalogTable.tableLocation)
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT, hoodieCatalogTable.baseFileFormat)
hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_DATABASE_NAME, hoodieCatalogTable.table.identifier.database.getOrElse("default"))
hiveSyncConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_TABLE_NAME, hoodieCatalogTable.table.identifier.table)
+ if (props.get(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key) != null) {
+ hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS, props.getString(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key))
+ }
hiveSyncConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS, classOf[MultiPartKeysValueExtractor].getName)
hiveSyncConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE, "true")
if (hiveSyncConfig.useBucketSync())
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 7fb2dc1ebd..d403f1998c 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -487,8 +487,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
- // Enable the hive sync by default if spark have enable the hive metastore.
- val enableHive = isUsingHiveCatalog(sparkSession)
withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
Map(
"path" -> path,
@@ -501,9 +499,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
- HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
+ HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE),
- HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> enableHive.toString,
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName,
HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
index 70799d3dc1..d0404664f4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
@@ -24,17 +24,11 @@ import org.apache.hudi.common.model.HoodieRecordPayload
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieClusteringException
import org.apache.hudi.index.HoodieIndex.IndexType
-import org.apache.spark.SparkException
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
-import java.nio.charset.Charset
-import java.sql.{Date, Timestamp}
-
abstract class BaseProcedure extends Procedure {
val INVALID_ARG_INDEX: Int = -1
@@ -121,30 +115,4 @@ abstract class BaseProcedure extends Procedure {
)
}
- protected def convertCatalystType(value: String, dataType: DataType): Any = {
- try {
- val valueWithType = dataType match {
- case StringType => value
- case BinaryType => value.getBytes(Charset.forName("utf-8"))
- case BooleanType => value.toBoolean
- case DoubleType => value.toDouble
- case d: DecimalType => Decimal.apply(BigDecimal(value), d.precision, d.scale)
- case FloatType => value.toFloat
- case ByteType => value.toByte
- case IntegerType => value.toInt
- case LongType => value.toLong
- case ShortType => value.toShort
- case DateType => DateTimeUtils.fromJavaDate(Date.valueOf(value))
- case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(value))
- case _ => throw new HoodieClusteringException("Data type not support:" + dataType)
- }
-
- valueWithType
- } catch {
- case e: HoodieClusteringException =>
- throw e
- case _ =>
- throw new HoodieClusteringException("Data type not match, value:" + value + ", dataType:" + dataType)
- }
- }
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
new file mode 100644
index 0000000000..24944b1270
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, HiveSyncTool}
+import org.apache.hudi.sync.common.HoodieSyncConfig
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
+
+import java.util.function.Supplier
+
+class HiveSyncProcedure extends BaseProcedure with ProcedureBuilder
+ with ProvidesHoodieConfig with Logging {
+
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "metastore_uri", DataTypes.StringType, ""),
+ ProcedureParameter.optional(2, "username", DataTypes.StringType, ""),
+ ProcedureParameter.optional(3, "password", DataTypes.StringType, ""),
+ ProcedureParameter.optional(4, "use_jdbc", DataTypes.StringType, ""),
+ ProcedureParameter.optional(5, "mode", DataTypes.StringType, ""),
+ ProcedureParameter.optional(6, "partition_fields", DataTypes.StringType, ""),
+ ProcedureParameter.optional(7, "partition_extractor_class", DataTypes.StringType, "")
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("result", DataTypes.StringType, nullable = true, Metadata.empty)
+ ))
+
+ override def build: Procedure = new HiveSyncProcedure
+
+ override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ override def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
+ val metastoreUri = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
+ val username = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
+ val password = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
+ val useJdbc = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[String]
+ val mode = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[String]
+ val partitionFields = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[String]
+ val partitionExtractorClass = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[String]
+
+ val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, tableName)
+ val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
+ val hiveConf = new HiveConf
+ val sqlConf = sparkSession.sqlContext.conf
+
+ if (metastoreUri.nonEmpty) hadoopConf.set(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUri)
+ if (username.nonEmpty) sqlConf.setConfString(HiveSyncConfig.HIVE_USER.key, username)
+ if (password.nonEmpty) sqlConf.setConfString(HiveSyncConfig.HIVE_PASS.key, password)
+ if (useJdbc.nonEmpty) sqlConf.setConfString(HiveSyncConfig.HIVE_USE_JDBC.key, useJdbc)
+ if (mode.nonEmpty) sqlConf.setConfString(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, mode)
+ if (partitionFields.nonEmpty) sqlConf.setConfString(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key, partitionFields)
+ if (partitionExtractorClass.nonEmpty) sqlConf.setConfString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key, partitionExtractorClass)
+
+ hiveConf.addResource(hadoopConf)
+
+ val tableConfig = hoodieCatalogTable.tableConfig
+ val hoodieProps = getHoodieProps(hoodieCatalogTable.catalogProperties, tableConfig, sqlConf)
+ val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
+
+ var hiveSyncTool: HiveSyncTool = null
+ try {
+ hiveSyncTool = new HiveSyncTool(hiveSyncConfig.getProps, hiveConf)
+ hiveSyncTool.syncHoodieTable()
+ } catch {
+ case e: RuntimeException => throw new HoodieException("hive sync failed", e)
+ } finally {
+ if (hiveSyncTool != null) hiveSyncTool.close()
+ }
+
+ Seq(Row("hive sync success."))
+ }
+}
+
+object HiveSyncProcedure {
+ val NAME = "hive_sync"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new HiveSyncProcedure
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 2eee942ff0..b2bbec8489 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -79,6 +79,7 @@ object HoodieProcedures {
,(RunCleanProcedure.NAME, RunCleanProcedure.builder)
,(ValidateHoodieSyncProcedure.NAME, ValidateHoodieSyncProcedure.builder)
,(ShowInvalidParquetProcedure.NAME, ShowInvalidParquetProcedure.builder)
+ ,(HiveSyncProcedure.NAME, HiveSyncProcedure.builder)
)
}
}