You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/03/24 13:46:08 UTC
[hudi] branch master updated: [HUDI-5941] Support savepoint call procedure with base path in Spark SQL (#8271)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6916803f7a4 [HUDI-5941] Support savepoint call procedure with base path in Spark SQL (#8271)
6916803f7a4 is described below
commit 6916803f7a40a4af57e7de1f927a4d5aa7025e32
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Fri Mar 24 06:45:58 2023 -0700
[HUDI-5941] Support savepoint call procedure with base path in Spark SQL (#8271)
---
.../procedures/CreateSavepointProcedure.scala | 8 ++-
.../procedures/DeleteSavepointProcedure.scala | 8 ++-
.../procedures/RollbackToSavepointProcedure.scala | 8 ++-
.../procedures/ShowSavepointsProcedure.scala | 6 +-
.../hudi/procedure/TestSavepointsProcedure.scala | 70 +++++++++++++++-------
5 files changed, 69 insertions(+), 31 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
index e81b6f086a2..8a40cfb502d 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CreateSavepointProcedure.scala
@@ -28,10 +28,11 @@ import java.util.function.Supplier
class CreateSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "commit_time", DataTypes.StringType, None),
ProcedureParameter.optional(2, "user", DataTypes.StringType, ""),
- ProcedureParameter.optional(3, "comments", DataTypes.StringType, "")
+ ProcedureParameter.optional(3, "comments", DataTypes.StringType, ""),
+ ProcedureParameter.optional(4, "path", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -46,11 +47,12 @@ class CreateSavepointProcedure extends BaseProcedure with ProcedureBuilder with
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(4))
val commitTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
val user = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
val comments = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]
- val basePath: String = getBasePath(tableName)
+ val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val activeTimeline: HoodieActiveTimeline = metaClient.getActiveTimeline
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
index 1cdd0638f1a..5d3b9b22285 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/DeleteSavepointProcedure.scala
@@ -28,8 +28,9 @@ import java.util.function.Supplier
class DeleteSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
+ ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None),
+ ProcedureParameter.optional(2, "path", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -44,9 +45,10 @@ class DeleteSavepointProcedure extends BaseProcedure with ProcedureBuilder with
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(2))
val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
- val basePath: String = getBasePath(tableName)
+ val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val completedInstants = metaClient.getActiveTimeline.getSavePointTimeline.filterCompletedInstants
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
index 11f06d4a7c6..f5a20b0e201 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RollbackToSavepointProcedure.scala
@@ -28,8 +28,9 @@ import java.util.function.Supplier
class RollbackToSavepointProcedure extends BaseProcedure with ProcedureBuilder with Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None),
- ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None)
+ ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(1, "instant_time", DataTypes.StringType, None),
+ ProcedureParameter.optional(2, "path", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -44,9 +45,10 @@ class RollbackToSavepointProcedure extends BaseProcedure with ProcedureBuilder w
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(2))
val instantTime = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]
- val basePath: String = getBasePath(tableName)
+ val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val completedInstants = metaClient.getActiveTimeline.getSavePointTimeline.filterCompletedInstants
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
index e866e21555b..37ac857bb97 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowSavepointsProcedure.scala
@@ -28,7 +28,8 @@ import java.util.stream.Collectors
class ShowSavepointsProcedure extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType, None)
+ ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType, None)
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -43,8 +44,9 @@ class ShowSavepointsProcedure extends BaseProcedure with ProcedureBuilder {
super.checkArgs(PARAMETERS, args)
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
- val basePath: String = getBasePath(tableName)
+ val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val activeTimeline: HoodieActiveTimeline = metaClient.getActiveTimeline
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
index c0ca6735a35..33aa53cea50 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestSavepointsProcedure.scala
@@ -22,6 +22,7 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
test("Test Call create_savepoint Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
// create table
spark.sql(
s"""
@@ -31,7 +32,7 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
| price double,
| ts long
|) using hudi
- | location '${tmp.getCanonicalPath}/$tableName'
+ | location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
@@ -40,20 +41,29 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
- val commits = spark.sql(s"""call show_commits(table => '$tableName')""").limit(1).collect()
- assertResult(1) {
+ val commits = spark.sql(s"""call show_commits(table => '$tableName')""").limit(2).collect()
+ assertResult(2) {
commits.length
}
- val commitTime = commits.apply(0).getString(0)
- checkAnswer(s"""call create_savepoint('$tableName', '$commitTime', 'admin', '1')""")(Seq(true))
+ // Create savepoint using table name
+ val commitTime1 = commits.apply(0).getString(0)
+ checkAnswer(s"""call create_savepoint('$tableName', '$commitTime1', 'admin', '1')""")(Seq(true))
+
+ // Create savepoint using table base path
+ val commitTime2 = commits.apply(1).getString(0)
+ checkAnswer(
+ s"""call create_savepoint(path => '$tablePath', commit_time => '$commitTime2',
+ | user => 'admin', comment => '2')""".stripMargin)(Seq(true))
}
}
test("Test Call show_savepoints Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
// create table
spark.sql(
s"""
@@ -63,7 +73,7 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
| price double,
| ts long
|) using hudi
- | location '${tmp.getCanonicalPath}/$tableName'
+ | location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
@@ -83,17 +93,22 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
val commitTime = commits.apply(1).getString(0)
checkAnswer(s"""call create_savepoint('$tableName', '$commitTime')""")(Seq(true))
- // show savepoints
+ // Show savepoints using table name
val savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect()
assertResult(1) {
savepoints.length
}
+ // Show savepoints using table base path
+ assertResult(1) {
+ spark.sql(s"""call show_savepoints(path => '$tablePath')""").collect().length
+ }
}
}
test("Test Call delete_savepoint Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
// create table
spark.sql(
s"""
@@ -103,7 +118,7 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
| price double,
| ts long
|) using hudi
- | location '${tmp.getCanonicalPath}/$tableName'
+ | location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
@@ -125,20 +140,26 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
checkAnswer(s"""call create_savepoint('$tableName', '${r.getString(0)}')""")(Seq(true))
})
- // delete savepoints
+ // Delete a savepoint with table name
checkAnswer(s"""call delete_savepoint('$tableName', '${commits.apply(1).getString(0)}')""")(Seq(true))
+ // Delete a savepoint with table base path
+ checkAnswer(
+ s"""call delete_savepoint(path => '$tablePath',
+ | instant_time => '${commits.apply(0).getString(0)}')""".stripMargin)(Seq(true))
- // show savepoints with only 2
+ // show_savepoints should return one savepoint
val savepoints = spark.sql(s"""call show_savepoints(table => '$tableName')""").collect()
- assertResult(2) {
+ assertResult(1) {
savepoints.length
}
+ assertResult(commits(2).getString(0))(savepoints(0).getString(0))
}
}
test("Test Call rollback_to_savepoint Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
// create table
spark.sql(
s"""
@@ -148,7 +169,7 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
| price double,
| ts long
|) using hudi
- | location '${tmp.getCanonicalPath}/$tableName'
+ | location '$tablePath'
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
@@ -158,19 +179,28 @@ class TestSavepointsProcedure extends HoodieSparkProcedureTestBase {
// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+ spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
val commits = spark.sql(s"""call show_commits(table => '$tableName')""").collect()
- assertResult(2) {
+ .map(c => c.getString(0)).sorted
+ assertResult(3) {
commits.length
}
- // create 2 savepoints
- commits.foreach(r => {
- checkAnswer(s"""call create_savepoint('$tableName', '${r.getString(0)}')""")(Seq(true))
- })
-
- // rollback savepoints
- checkAnswer(s"""call rollback_to_savepoint('$tableName', '${commits.apply(0).getString(0)}')""")(Seq(true))
+ // create 3 savepoints
+ checkAnswer(s"""call create_savepoint('$tableName', '${commits(0)}')""")(Seq(true))
+ checkAnswer(s"""call create_savepoint('$tableName', '${commits(1)}')""")(Seq(true))
+
+ // rollback to the second savepoint with the table name
+ checkAnswer(s"""call rollback_to_savepoint('$tableName', '${commits(1)}')""")(Seq(true))
+ checkAnswer(
+ s"""call delete_savepoint(path => '$tablePath',
+ | instant_time => '${commits(1)}')""".stripMargin)(Seq(true))
+
+ // rollback to the first savepoint with the table base path
+ checkAnswer(
+ s"""call rollback_to_savepoint(path => '$tablePath',
+ | instant_time => '${commits(0)}')""".stripMargin)(Seq(true))
}
}
}