You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/04/25 03:11:00 UTC

[hudi] branch master updated: [HUDI-5957] Fix table not exist when using 'db.table' in createHoodieClientFromPath (#8488)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 51ccb2c61b9 [HUDI-5957] Fix table not exist when using 'db.table' in createHoodieClientFromPath (#8488)
51ccb2c61b9 is described below

commit 51ccb2c61b9d31c4533946c0b407812a80d6796f
Author: Zouxxyy <zo...@alibaba-inc.com>
AuthorDate: Tue Apr 25 11:10:52 2023 +0800

    [HUDI-5957] Fix table not exist when using 'db.table' in createHoodieClientFromPath (#8488)
---
 .../main/scala/org/apache/hudi/HoodieCLIUtils.scala  | 20 ++++++++++++++------
 .../sql/hudi/command/procedures/BaseProcedure.scala  |  8 --------
 .../procedures/CreateSavepointProcedure.scala        |  4 +++-
 .../command/procedures/DeleteMarkerProcedure.scala   |  4 +++-
 .../procedures/DeleteSavepointProcedure.scala        |  4 +++-
 .../procedures/RollbackToInstantTimeProcedure.scala  |  4 +---
 .../procedures/RollbackToSavepointProcedure.scala    |  4 +++-
 .../hudi/command/procedures/RunCleanProcedure.scala  |  3 ++-
 .../command/procedures/RunClusteringProcedure.scala  |  3 ++-
 .../command/procedures/RunCompactionProcedure.scala  |  3 ++-
 .../spark/sql/hudi/TestAlterTableDropPartition.scala |  6 +++---
 .../sql/hudi/procedure/TestClusteringProcedure.scala |  7 ++++---
 12 files changed, 40 insertions(+), 30 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
index 1ef91c7cd35..5f0cba6fd7c 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
@@ -27,21 +27,29 @@ import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
-import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.withSparkConf
 
 import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter}
 
-object HoodieCLIUtils extends ProvidesHoodieConfig{
+object HoodieCLIUtils {
 
-  def createHoodieClientFromPath(sparkSession: SparkSession,
-                                 basePath: String,
-                                 conf: Map[String, String]): SparkRDDWriteClient[_] = {
+  def createHoodieWriteClient(sparkSession: SparkSession,
+                              basePath: String,
+                              conf: Map[String, String],
+                              tableName: Option[String]): SparkRDDWriteClient[_] = {
     val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
       .setConf(sparkSession.sessionState.newHadoopConf()).build()
     val schemaUtil = new TableSchemaResolver(metaClient)
     val schemaStr = schemaUtil.getTableAvroSchema(false).toString
+    // If tableName is provided, we need to add catalog props
+    val catalogProps = tableName match {
+      case Some(value) => getHoodieCatalogTable(sparkSession, value).catalogProperties
+      case None => Map.empty
+    }
     val finalParameters = HoodieWriterUtils.parametersWithWriteDefaults(
-      buildHoodieConfig(getHoodieCatalogTable(sparkSession, metaClient.getTableConfig.getTableName)) ++ conf)
+      withSparkConf(sparkSession, Map.empty)(
+        catalogProps ++ conf + (DataSourceWriteOptions.TABLE_TYPE.key() -> metaClient.getTableType.name()))
+    )
 
     val jsc = new JavaSparkContext(sparkSession.sparkContext)
     DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath,
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
index 67930cb3ed4..2930fc36c4c 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
@@ -18,9 +18,6 @@
 package org.apache.spark.sql.hudi.command.procedures
 
 import org.apache.hudi.HoodieCLIUtils
-import org.apache.hudi.client.SparkRDDWriteClient
-import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.model.HoodieRecordPayload
 import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.index.HoodieIndex.IndexType
@@ -37,11 +34,6 @@ abstract class BaseProcedure extends Procedure {
 
   protected def sparkSession: SparkSession = spark
 
-  protected def createHoodieClient(jsc: JavaSparkContext, basePath: String): SparkRDDWriteClient[_ <: HoodieRecordPayload[_ <: AnyRef]] = {
-    val config = getWriteConfig(basePath)
-    new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config)
-  }
-
   protected def getWriteConfig(basePath: String): HoodieWriteConfig = {
     HoodieWriteConfig.newBuilder
       .withPath(basePath)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
index 3d018230b4f..1983d825ad4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
+import org.apache.hudi.HoodieCLIUtils
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.exception.{HoodieException, HoodieSavepointException}
@@ -60,7 +61,8 @@ class CreateSavepointProcedure extends BaseProcedure with ProcedureBuilder with
       throw new HoodieException("Commit " + commitTime + " not found in Commits " + activeTimeline)
     }
 
-    val client = createHoodieClient(jsc, basePath)
+    val client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, Map.empty,
+      tableName.asInstanceOf[Option[String]])
     var result = false
 
     try {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
index d99a5489799..32c853345d4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteMarkerProcedure.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
+import org.apache.hudi.HoodieCLIUtils
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.table.HoodieSparkTable
 import org.apache.hudi.table.marker.WriteMarkersFactory
@@ -50,7 +51,8 @@ class DeleteMarkerProcedure extends BaseProcedure with ProcedureBuilder with Log
 
     var client: SparkRDDWriteClient[_] = null
     val result = Try {
-      client = createHoodieClient(jsc, basePath)
+      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, Map.empty,
+        tableName.asInstanceOf[Option[String]])
       val config = client.getConfig
       val context = client.getEngineContext
       val table = HoodieSparkTable.create(config, context)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
index d234eae224e..e9d13914789 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
+import org.apache.hudi.HoodieCLIUtils
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.exception.{HoodieException, HoodieSavepointException}
@@ -59,7 +60,8 @@ class DeleteSavepointProcedure extends BaseProcedure with ProcedureBuilder with
       throw new HoodieException("Commit " + instantTime + " not found in Commits " + completedInstants)
     }
 
-    val client = createHoodieClient(jsc, basePath)
+    val client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, Map.empty,
+      tableName.asInstanceOf[Option[String]])
     var result = false
 
     try {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
index c39cca7d68c..862b74b427a 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToInstantTimeProcedure.scala
@@ -26,8 +26,6 @@ import org.apache.hudi.common.util.Option
 import org.apache.hudi.config.HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE
 import org.apache.hudi.exception.HoodieException
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
 
 import java.util.function.Supplier
@@ -55,7 +53,7 @@ class RollbackToInstantTimeProcedure extends BaseProcedure with ProcedureBuilder
     val basePath = hoodieCatalogTable.tableLocation
     var client: SparkRDDWriteClient[_] = null
     try {
-      client = createHoodieClient(jsc, basePath)
+      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, Map.empty, scala.Option(table))
       client.getConfig.setValue(ROLLBACK_USING_MARKERS_ENABLE, "false")
       val config = getWriteConfig(basePath)
       val metaClient = HoodieTableMetaClient.builder
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
index 15c2a083c69..955ceb01045 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
+import org.apache.hudi.HoodieCLIUtils
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
 import org.apache.hudi.exception.{HoodieException, HoodieSavepointException}
@@ -59,7 +60,8 @@ class RollbackToSavepointProcedure extends BaseProcedure with ProcedureBuilder w
       throw new HoodieException("Commit " + instantTime + " not found in Commits " + completedInstants)
     }
 
-    val client = createHoodieClient(jsc, basePath)
+    val client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, Map.empty,
+      tableName.asInstanceOf[Option[String]])
     var result = false
 
     try {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
index 43d636b65ec..f7653ce680e 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
@@ -82,7 +82,8 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging
 
     var client: SparkRDDWriteClient[_] = null
     try {
-      client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props)
+      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, props,
+        tableName.asInstanceOf[Option[String]])
       val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking)
 
       if (hoodieCleanMeta == null) Seq.empty
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index 88e5c4100b9..eba972382ea 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -174,7 +174,8 @@ class RunClusteringProcedure extends BaseProcedure
 
     var client: SparkRDDWriteClient[_] = null
     try {
-      client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
+      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, conf,
+        tableName.asInstanceOf[Option[String]])
       if (operator.isSchedule) {
         val instantTime = HoodieActiveTimeline.createNewInstantTime
         if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
index d79cf8c302f..47a20f3cf6c 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCompactionProcedure.scala
@@ -68,7 +68,8 @@ class RunCompactionProcedure extends BaseProcedure with ProcedureBuilder with Sp
 
     var client: SparkRDDWriteClient[_] = null
     try {
-      client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, Map.empty)
+      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, Map.empty,
+        tableName.asInstanceOf[Option[String]])
       var willCompactionInstants: Seq[String] = Seq.empty
       operation match {
         case "schedule" =>
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
index 80cea8b0310..c1e0643b005 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala
@@ -511,7 +511,7 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
       spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
       spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
       spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
-      val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+      val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, Map.empty, Option(tableName))
 
       // Generate the first clustering plan
       val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
@@ -555,7 +555,7 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
       spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
       spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)")
       spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)")
-      val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+      val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, Map.empty, Option(tableName))
 
       // Generate the first compaction plan
       val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
@@ -600,7 +600,7 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
       spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
       spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
       spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")
-      val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+      val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, Map.empty, Option(tableName))
 
       // Generate the first log_compaction plan
       val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 8124427488c..7b8dc8f8a90 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -62,7 +62,7 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
         spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
         spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
         spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
-        val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+        val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, Map.empty, Option(tableName))
         // Generate the first clustering plan
         val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
         client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
@@ -163,7 +163,7 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
         spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
         spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
         spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
-        val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)
+        val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, Map.empty, Option(tableName))
         // Generate the first clustering plan
         val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
         client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
@@ -694,7 +694,8 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
            |  type = 'cow',
            |  preCombineField = 'ts',
            |  hoodie.index.type = 'BUCKET',
-           |  hoodie.bucket.index.hash.field = 'id'
+           |  hoodie.bucket.index.hash.field = 'id',
+           |  hoodie.datasource.write.recordkey.field = 'id'
            | )
            | partitioned by (ts)
            | location '$basePath'