You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:28 UTC

[hudi] 18/45: [HUDI-5105] Add Call show_commit_extra_metadata for spark sql (#7091)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 90c09053da765e735fee90a19ede2d78dba62a2b
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Mon Oct 31 18:21:29 2022 +0800

    [HUDI-5105] Add Call show_commit_extra_metadata for spark sql (#7091)
    
    * [HUDI-5105] Add Call show_commit_extra_metadata for spark sql
    
    (cherry picked from commit 79ad3571db62b51e8fe8cc9183c8c787e9ef57fe)
---
 .../hudi/command/procedures/HoodieProcedures.scala |   1 +
 .../ShowCommitExtraMetadataProcedure.scala         | 138 +++++++++++++++++++++
 .../sql/hudi/procedure/TestCommitsProcedure.scala  |  54 +++++++-
 3 files changed, 187 insertions(+), 6 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index b308480c6d..fabfda9367 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -82,6 +82,7 @@ object HoodieProcedures {
       ,(HiveSyncProcedure.NAME, HiveSyncProcedure.builder)
       ,(BackupInvalidParquetProcedure.NAME, BackupInvalidParquetProcedure.builder)
       ,(CopyToTempView.NAME, CopyToTempView.builder)
+      ,(ShowCommitExtraMetadataProcedure.NAME, ShowCommitExtraMetadataProcedure.builder)
     )
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala
new file mode 100644
index 0000000000..1a8f4dd9e4
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
+
+import java.util
+import java.util.function.Supplier
+import scala.collection.JavaConversions._
+
+class ShowCommitExtraMetadataProcedure() extends BaseProcedure with ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 100),
+    ProcedureParameter.optional(2, "instant_time", DataTypes.StringType, None),
+    ProcedureParameter.optional(3, "metadata_key", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("metadata_key", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("metadata_value", DataTypes.StringType, nullable = true, Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
+    val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
+    val instantTime = getArgValueOrDefault(args, PARAMETERS(2))
+    val metadataKey = getArgValueOrDefault(args, PARAMETERS(3))
+
+    val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
+    val basePath = hoodieCatalogTable.tableLocation
+    val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+    val activeTimeline = metaClient.getActiveTimeline
+    val timeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
+
+    val hoodieInstantOption: Option[HoodieInstant] = if (instantTime.isEmpty) {
+      getCommitForLastInstant(timeline)
+    } else {
+      getCommitForInstant(timeline, instantTime.get.asInstanceOf[String])
+    }
+
+    if (hoodieInstantOption.isEmpty) {
+      throw new HoodieException(s"Commit $instantTime not found in Commits $timeline.")
+    }
+
+    val commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption)
+
+    if (commitMetadataOptional.isEmpty) {
+      throw new HoodieException(s"Commit $instantTime not found commitMetadata in Commits $timeline.")
+    }
+
+    val meta = commitMetadataOptional.get
+    val timestamp: String = hoodieInstantOption.get.getTimestamp
+    val action: String = hoodieInstantOption.get.getAction
+    val metadatas: util.Map[String, String] = if (metadataKey.isEmpty) {
+      meta.getExtraMetadata
+    } else {
+      meta.getExtraMetadata.filter(r => r._1.equals(metadataKey.get.asInstanceOf[String].trim))
+    }
+
+    val rows = new util.ArrayList[Row]
+    metadatas.foreach(r => rows.add(Row(timestamp, action, r._1, r._2)))
+    rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
+  }
+
+  override def build: Procedure = new ShowCommitExtraMetadataProcedure()
+
+  private def getCommitForLastInstant(timeline: HoodieTimeline): Option[HoodieInstant] = {
+    val instantOptional = timeline.getReverseOrderedInstants
+      .findFirst
+    if (instantOptional.isPresent) {
+      Option.apply(instantOptional.get())
+    } else {
+      Option.empty
+    }
+  }
+
+  private def getCommitForInstant(timeline: HoodieTimeline, instantTime: String): Option[HoodieInstant] = {
+    val instants: util.List[HoodieInstant] = util.Arrays.asList(
+      new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime),
+      new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime),
+      new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime))
+
+    val hoodieInstant: Option[HoodieInstant] = instants.find((i: HoodieInstant) => timeline.containsInstant(i))
+    hoodieInstant
+  }
+
+  private def getHoodieCommitMetadata(timeline: HoodieTimeline, hoodieInstant: Option[HoodieInstant]): Option[HoodieCommitMetadata] = {
+    if (hoodieInstant.isDefined) {
+      if (hoodieInstant.get.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) {
+        Option(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get,
+          classOf[HoodieReplaceCommitMetadata]))
+      } else {
+        Option(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get,
+          classOf[HoodieCommitMetadata]))
+      }
+    } else {
+      Option.empty
+    }
+  }
+}
+
+object ShowCommitExtraMetadataProcedure {
+  val NAME = "show_commit_extra_metadata"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ShowCommitExtraMetadataProcedure()
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala
index 2840b22434..03cf26800d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala
@@ -61,9 +61,7 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {
       // collect archived commits for table
       val endTs = commits(0).get(0).toString
       val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', end_ts => '$endTs')""").collect()
-      assertResult(4) {
-        archivedCommits.length
-      }
+      assertResult(4){archivedCommits.length}
     }
   }
 
@@ -109,9 +107,7 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {
       // collect archived commits for table
       val endTs = commits(0).get(0).toString
       val archivedCommits = spark.sql(s"""call show_archived_commits_metadata(table => '$tableName', end_ts => '$endTs')""").collect()
-      assertResult(4) {
-        archivedCommits.length
-      }
+      assertResult(4){archivedCommits.length}
     }
   }
 
@@ -288,4 +284,50 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {
       assertResult(1){result.length}
     }
   }
+
+  test("Test Call show_commit_extra_metadata Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+     """.stripMargin)
+
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // Check required fields
+      checkExceptionContain(s"""call show_commit_extra_metadata()""")(
+        s"arguments is empty")
+
+      // collect commits for table
+      val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
+      assertResult(2){commits.length}
+
+      val instant_time = commits(0).get(0).toString
+      // get specify instantTime's extraMetadatas
+      val metadatas1 = spark.sql(s"""call show_commit_extra_metadata(table => '$tableName', instant_time => '$instant_time')""").collect()
+      assertResult(true){metadatas1.length > 0}
+
+      // get last instantTime's extraMetadatas
+      val metadatas2 = spark.sql(s"""call show_commit_extra_metadata(table => '$tableName')""").collect()
+      assertResult(true){metadatas2.length > 0}
+
+      // get last instantTime's extraMetadatas and filter extraMetadatas with metadata_key
+      val metadatas3 = spark.sql(s"""call show_commit_extra_metadata(table => '$tableName', metadata_key => 'schema')""").collect()
+      assertResult(1){metadatas3.length}
+    }
+  }
 }