You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by me...@apache.org on 2022/06/15 08:50:25 UTC

[hudi] branch master updated: [HUDI-3499] Add Call Procedure for show rollbacks (#5848)

This is an automated email from the ASF dual-hosted git repository.

mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b946cf351 [HUDI-3499] Add Call Procedure for show rollbacks (#5848)
7b946cf351 is described below

commit 7b946cf35142139ab90320d943c90d905722989f
Author: superche <73...@users.noreply.github.com>
AuthorDate: Wed Jun 15 16:50:15 2022 +0800

    [HUDI-3499] Add Call Procedure for show rollbacks (#5848)
    
    * Add Call Procedure for show rollbacks
    
    * fix
    
    * add ut for show_rollback_detail and exception handle
    
    Co-authored-by: superche <su...@tencent.com>
---
 .../hudi/command/procedures/HoodieProcedures.scala |   2 +
 .../procedures/ShowRollbacksProcedure.scala        | 147 +++++++++++++++++++++
 .../sql/hudi/procedure/TestCallProcedure.scala     | 105 +++++++++++++++
 3 files changed, 254 insertions(+)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 2b720bb94d..7cfeaaa0b6 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -45,6 +45,8 @@ object HoodieProcedures {
     mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder)
     mapBuilder.put(ShowSavepointsProcedure.NAME, ShowSavepointsProcedure.builder)
     mapBuilder.put(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder)
+    mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
+    mapBuilder.put(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder)
     mapBuilder.build
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala
new file mode 100644
index 0000000000..e5cacdb062
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowRollbacksProcedure.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import java.io.IOException
+import java.util
+import java.util.function.Supplier
+
+import org.apache.hudi.avro.model.HoodieRollbackMetadata
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstant.State
+import org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline, TimelineMetadataUtils}
+import org.apache.hudi.common.util.CollectionUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
+
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.collection.JavaConverters._
+
+class ShowRollbacksProcedure(showDetails: Boolean) extends BaseProcedure with ProcedureBuilder {
+  private val ROLLBACKS_PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10)
+  )
+
+  private val ROLLBACK_PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
+    ProcedureParameter.required(2, "instant_time", DataTypes.StringType, None)
+  )
+
+  private val ROLLBACKS_OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("rollback_instant", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("total_files_deleted", DataTypes.IntegerType, nullable = true, Metadata.empty),
+    StructField("time_taken_in_millis", DataTypes.LongType, nullable = true, Metadata.empty),
+    StructField("total_partitions", DataTypes.IntegerType, nullable = true, Metadata.empty)
+  ))
+
+  private val ROLLBACK_OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("rollback_instant", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("deleted_file", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("succeeded", DataTypes.BooleanType, nullable = true, Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = if (showDetails) ROLLBACK_PARAMETERS else ROLLBACKS_PARAMETERS
+
+  def outputType: StructType = if (showDetails) ROLLBACK_OUTPUT_TYPE else ROLLBACKS_OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(parameters, args)
+
+    val tableName = getArgValueOrDefault(args, parameters(0))
+    val limit = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[Int]
+
+    val basePath = getBasePath(tableName)
+    val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val activeTimeline = new RollbackTimeline(metaClient)
+    if (showDetails) {
+      val instantTime = getArgValueOrDefault(args, parameters(2)).get.asInstanceOf[String]
+      getRollbackDetail(activeTimeline, instantTime, limit)
+    } else {
+      getRollbacks(activeTimeline, limit)
+    }
+  }
+
+  override def build: Procedure = new ShowRollbacksProcedure(showDetails)
+
+  class RollbackTimeline(metaClient: HoodieTableMetaClient) extends HoodieActiveTimeline(metaClient,
+    CollectionUtils.createImmutableSet(HoodieTimeline.ROLLBACK_EXTENSION)) {
+  }
+
+  def getRollbackDetail(activeTimeline: RollbackTimeline,
+                  instantTime: String,
+                  limit: Int): Seq[Row] = {
+    val rows = new util.ArrayList[Row]
+    val metadata = TimelineMetadataUtils.deserializeAvroMetadata(activeTimeline.getInstantDetails(
+      new HoodieInstant(State.COMPLETED, ROLLBACK_ACTION, instantTime)).get, classOf[HoodieRollbackMetadata])
+
+    metadata.getPartitionMetadata.asScala.toMap.iterator.foreach(entry => Stream
+      .concat(entry._2.getSuccessDeleteFiles.map(f => (f, true)),
+        entry._2.getFailedDeleteFiles.map(f => (f, false)))
+      .iterator.foreach(fileWithDeleteStatus => {
+        rows.add(Row(metadata.getStartRollbackTime, metadata.getCommitsRollback.toString,
+          entry._1, fileWithDeleteStatus._1, fileWithDeleteStatus._2))
+      }))
+    rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  def getRollbacks(activeTimeline: RollbackTimeline,
+                   limit: Int): Seq[Row] = {
+    val rows = new util.ArrayList[Row]
+    val rollback = activeTimeline.getRollbackTimeline.filterCompletedInstants
+
+    rollback.getInstants.iterator().asScala.foreach(instant => {
+      try {
+        val metadata = TimelineMetadataUtils.deserializeAvroMetadata(activeTimeline.getInstantDetails(instant).get,
+          classOf[HoodieRollbackMetadata])
+
+        metadata.getCommitsRollback.iterator().asScala.foreach(c => {
+          rows.add(Row(metadata.getStartRollbackTime, c,
+            metadata.getTotalFilesDeleted, metadata.getTimeTakenInMillis,
+            if (metadata.getPartitionMetadata != null) metadata.getPartitionMetadata.size else 0))
+        })
+      } catch {
+        case e: IOException =>
+          throw new HoodieException(s"Failed to get rollback's info from instant ${instant.getTimestamp}.")
+      }
+    })
+    rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+}
+
+object ShowRollbacksProcedure {
+  val NAME = "show_rollbacks"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ShowRollbacksProcedure(false)
+  }
+}
+
+object ShowRollbackDetailProcedure {
+  val NAME = "show_rollback_detail"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ShowRollbacksProcedure(true)
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
index 848d09ab62..02b0e930fb 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
@@ -171,4 +171,109 @@ class TestCallProcedure extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test Call show_rollbacks Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+      spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
+
+      // 3 commits are left before rollback
+      var commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
+      assertResult(3){commits.length}
+
+      // Call rollback_to_instant Procedure with Named Arguments
+      var instant_time = commits(0).get(0).toString
+      checkAnswer(s"""call rollback_to_instant(table => '$tableName', instant_time => '$instant_time')""")(Seq(true))
+      // Call rollback_to_instant Procedure with Positional Arguments
+      instant_time = commits(1).get(0).toString
+      checkAnswer(s"""call rollback_to_instant('$tableName', '$instant_time')""")(Seq(true))
+
+      // 1 commits are left after rollback
+      commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
+      assertResult(1){commits.length}
+
+      // collect rollbacks for table
+      val rollbacks = spark.sql(s"""call show_rollbacks(table => '$tableName', limit => 10)""").collect()
+      assertResult(2) {rollbacks.length}
+    }
+  }
+
+  test("Test Call show_rollback_detail Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+      spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
+
+      // 3 commits are left before rollback
+      var commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
+      assertResult(3) {
+        commits.length
+      }
+
+      // Call rollback_to_instant Procedure with Named Arguments
+      var instant_time = commits(0).get(0).toString
+      checkAnswer(s"""call rollback_to_instant(table => '$tableName', instant_time => '$instant_time')""")(Seq(true))
+      // Call rollback_to_instant Procedure with Positional Arguments
+      instant_time = commits(1).get(0).toString
+      checkAnswer(s"""call rollback_to_instant('$tableName', '$instant_time')""")(Seq(true))
+
+      // 1 commits are left after rollback
+      commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
+      assertResult(1) {
+        commits.length
+      }
+
+      // collect rollbacks for table
+      val rollbacks = spark.sql(s"""call show_rollbacks(table => '$tableName', limit => 10)""").collect()
+      assertResult(2) {
+        rollbacks.length
+      }
+
+      // Check required fields
+      checkExceptionContain(s"""call show_rollback_detail(table => '$tableName')""")(
+        s"Argument: instant_time is required")
+
+      // collect rollback's info for table
+      instant_time = rollbacks(1).get(0).toString
+      val rollback = spark.sql(s"""call show_rollback_detail(table => '$tableName', instant_time => '$instant_time')""").collect()
+      assertResult(1) {
+        rollback.length
+      }
+    }
+  }
 }