You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/03/24 13:46:08 UTC

[hudi] branch master updated: [HUDI-5941] Support savepoint call procedure with base path in Spark SQL (#8271)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6916803f7a4 [HUDI-5941] Support savepoint call procedure with base path in Spark SQL (#8271)
6916803f7a4 is described below

commit 6916803f7a40a4af57e7de1f927a4d5aa7025e32
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Fri Mar 24 06:45:58 2023 -0700

    [HUDI-5941] Support savepoint call procedure with base path in Spark SQL (#8271)
---
 .../procedures/CreateSavepointProcedure.scala      |  8 ++-
 .../procedures/DeleteSavepointProcedure.scala      |  8 ++-
 .../procedures/RollbackToSavepointProcedure.scala  |  8 ++-
 .../procedures/ShowSavepointsProcedure.scala       |  6 +-
 .../hudi/procedure/TestSavepointsProcedure.scala   | 70 +++++++++++++++-------
 5 files changed, 69 insertions(+), 31 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
index e81b6f086a2..8a40cfb502d 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
@@ -28,10 +28,11 @@ import java.util.function.Supplier
 
 class CreateSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
     ProcedureParameter.required(1, "commit_time", DataTypes.StringType, None),
     ProcedureParameter.optional(2, "user", DataTypes.StringType, ""),
-    ProcedureParameter.optional(3, "comments", DataTypes.StringType, "")
+    ProcedureParameter.optional(3, "comments", DataTypes.StringType, ""),
+    ProcedureParameter.optional(4, "path", DataTypes.StringType, None)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -46,11 +47,12 @@ class CreateSavepointProcedure extends BaseProcedure with ProcedureBuilder with
     super.checkArgs(PARAMETERS, args)
 
     val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(4))
     val commitTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
     val user = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
     val comments = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
 
-    val basePath: String = getBasePath(tableName)
+    val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
 
     val activeTimeline: HoodieActiveTimeline = metaClient.getActiveTimeline
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
index 1cdd0638f1a..5d3b9b22285 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
@@ -28,8 +28,9 @@ import java.util.function.Supplier
 
 class DeleteSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
+    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None),
+    ProcedureParameter.optional(2, "path", DataTypes.StringType, None)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -44,9 +45,10 @@ class DeleteSavepointProcedure extends BaseProcedure with ProcedureBuilder with
     super.checkArgs(PARAMETERS, args)
 
     val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(2))
     val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
 
-    val basePath: String = getBasePath(tableName)
+    val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
 
     val completedInstants = metaClient.getActiveTimeline.getSavePointTimeline.filterCompletedInstants
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
index 11f06d4a7c6..f5a20b0e201 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
@@ -28,8 +28,9 @@ import java.util.function.Supplier
 
 class RollbackToSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
-    ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
+    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None),
+    ProcedureParameter.optional(2, "path", DataTypes.StringType, None)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -44,9 +45,10 @@ class RollbackToSavepointProcedure extends BaseProcedure with ProcedureBuilder w
     super.checkArgs(PARAMETERS, args)
 
     val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(2))
     val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
 
-    val basePath: String = getBasePath(tableName)
+    val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
 
     val completedInstants = metaClient.getActiveTimeline.getSavePointTimeline.filterCompletedInstants
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
index e866e21555b..37ac857bb97 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
@@ -28,7 +28,8 @@ import java.util.stream.Collectors
 
 class ShowSavepointsProcedure extends BaseProcedure with ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
-    ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType, None)
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -43,8 +44,9 @@ class ShowSavepointsProcedure extends BaseProcedure with ProcedureBuilder {
     super.checkArgs(PARAMETERS, args)
 
     val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
 
-    val basePath: String = getBasePath(tableName)
+    val basePath: String = getBasePath(tableName, tablePath)
     val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
 
     val activeTimeline: HoodieActiveTimeline = metaClient.getActiveTimeline
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
index c0ca6735a35..33aa53cea50 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
@@ -22,6 +22,7 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
   test("Test Call create_savepoint Procedure") {
     withTempDir { tmp =>
       val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
       // create table
       spark.sql(
         s"""
@@ -31,7 +32,7 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
            |  price double,
            |  ts long
            |) using hudi
-           | location '${tmp.getCanonicalPath}/$tableName'
+           | location '$tablePath'
            | tblproperties (
            |  primaryKey = 'id',
            |  preCombineField = 'ts'
@@ -40,20 +41,29 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
 
       // insert data to table
       spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
 
-      val commits = spark.sql(s"""call show_commits(table => '$tableName')""").limit(1).collect()
-      assertResult(1) {
+      val commits = spark.sql(s"""call show_commits(table => '$tableName')""").limit(2).collect()
+      assertResult(2) {
         commits.length
       }
 
-      val commitTime = commits.apply(0).getString(0)
-      checkAnswer(s"""call create_savepoint('$tableName', '$commitTime', 'admin', '1')""")(Seq(true))
+      // Create savepoint using table name
+      val commitTime1 = commits.apply(0).getString(0)
+      checkAnswer(s"""call create_savepoint('$tableName', '$commitTime1', 'admin', '1')""")(Seq(true))
+
+      // Create savepoint using table base path
+      val commitTime2 = commits.apply(1).getString(0)
+      checkAnswer(
+        s"""call create_savepoint(path => '$tablePath', commit_time => '$commitTime2',
+           | user => 'admin', comment => '2')""".stripMargin)(Seq(true))
     }
   }
 
   test("Test Call show_savepoints Procedure") {
     withTempDir { tmp =>
       val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
       // create table
       spark.sql(
         s"""
@@ -63,7 +73,7 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
            |  price double,
            |  ts long
            |) using hudi
-           | location '${tmp.getCanonicalPath}/$tableName'
+           | location '$tablePath'
            | tblproperties (
            |  primaryKey = 'id',
            |  preCombineField = 'ts'
@@ -83,17 +93,22 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
       val commitTime = commits.apply(1).getString(0)
       checkAnswer(s"""call create_savepoint('$tableName', '$commitTime')""")(Seq(true))
 
-      // show savepoints
+      // Show savepoints using table name
       val savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect()
       assertResult(1) {
         savepoints.length
       }
+      // Show savepoints using table base path
+      assertResult(1) {
+        spark.sql(s"""call show_savepoints(path => '$tablePath')""").collect().length
+      }
     }
   }
 
   test("Test Call delete_savepoint Procedure") {
     withTempDir { tmp =>
       val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
       // create table
       spark.sql(
         s"""
@@ -103,7 +118,7 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
            |  price double,
            |  ts long
            |) using hudi
-           | location '${tmp.getCanonicalPath}/$tableName'
+           | location '$tablePath'
            | tblproperties (
            |  primaryKey = 'id',
            |  preCombineField = 'ts'
@@ -125,20 +140,26 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
         checkAnswer(s"""call create_savepoint('$tableName', '${r.getString(0)}')""")(Seq(true))
       })
 
-      // delete savepoints
+      // Delete a savepoint with table name
       checkAnswer(s"""call delete_savepoint('$tableName', '${commits.apply(1).getString(0)}')""")(Seq(true))
+      // Delete a savepoint with table base path
+      checkAnswer(
+        s"""call delete_savepoint(path => '$tablePath',
+           | instant_time => '${commits.apply(0).getString(0)}')""".stripMargin)(Seq(true))
 
-      // show savepoints with only 2
+      // show_savepoints should return one savepoint
       val savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect()
-      assertResult(2) {
+      assertResult(1) {
         savepoints.length
       }
+      assertResult(commits(2).getString(0))(savepoints(0).getString(0))
     }
   }
 
   test("Test Call rollback_to_savepoint Procedure") {
     withTempDir { tmp =>
       val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
       // create table
       spark.sql(
         s"""
@@ -148,7 +169,7 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
            |  price double,
            |  ts long
            |) using hudi
-           | location '${tmp.getCanonicalPath}/$tableName'
+           | location '$tablePath'
            | tblproperties (
            |  primaryKey = 'id',
            |  preCombineField = 'ts'
@@ -158,19 +179,28 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
       // insert data to table
       spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
       spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+      spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
 
       val commits = spark.sql(s"""call show_commits(table => '$tableName')""").collect()
-      assertResult(2) {
+        .map(c => c.getString(0)).sorted
+      assertResult(3) {
         commits.length
       }
 
-      // create 2 savepoints
-      commits.foreach(r => {
-        checkAnswer(s"""call create_savepoint('$tableName', '${r.getString(0)}')""")(Seq(true))
-      })
-
-      // rollback savepoints
-      checkAnswer(s"""call rollback_to_savepoint('$tableName', '${commits.apply(0).getString(0)}')""")(Seq(true))
+      // create 3 savepoints
+      checkAnswer(s"""call create_savepoint('$tableName', '${commits(0)}')""")(Seq(true))
+      checkAnswer(s"""call create_savepoint('$tableName', '${commits(1)}')""")(Seq(true))
+
+      // rollback to the second savepoint with the table name
+      checkAnswer(s"""call rollback_to_savepoint('$tableName', '${commits(1)}')""")(Seq(true))
+      checkAnswer(
+        s"""call delete_savepoint(path => '$tablePath',
+           | instant_time => '${commits(1)}')""".stripMargin)(Seq(true))
+
+      // rollback to the first savepoint with the table base path
+      checkAnswer(
+        s"""call rollback_to_savepoint(path => '$tablePath',
+           | instant_time => '${commits(0)}')""".stripMargin)(Seq(true))
     }
   }
 }