You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:28 UTC
[hudi] 18/45: [HUDI-5105] Add Call show_commit_extra_metadata for spark sql (#7091)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 90c09053da765e735fee90a19ede2d78dba62a2b
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Mon Oct 31 18:21:29 2022 +0800
[HUDI-5105] Add Call show_commit_extra_metadata for spark sql (#7091)
* [HUDI-5105] Add Call show_commit_extra_metadata for spark sql
(cherry picked from commit 79ad3571db62b51e8fe8cc9183c8c787e9ef57fe)
---
.../hudi/command/procedures/HoodieProcedures.scala | 1 +
.../ShowCommitExtraMetadataProcedure.scala | 138 +++++++++++++++++++++
.../sql/hudi/procedure/TestCommitsProcedure.scala | 54 +++++++-
3 files changed, 187 insertions(+), 6 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index b308480c6d..fabfda9367 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -82,6 +82,7 @@ object HoodieProcedures {
,(HiveSyncProcedure.NAME, HiveSyncProcedure.builder)
,(BackupInvalidParquetProcedure.NAME, BackupInvalidParquetProcedure.builder)
,(CopyToTempView.NAME, CopyToTempView.builder)
+ ,(ShowCommitExtraMetadataProcedure.NAME, ShowCommitExtraMetadataProcedure.builder)
)
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala
new file mode 100644
index 0000000000..1a8f4dd9e4
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitExtraMetadataProcedure.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}
+
+import java.util
+import java.util.function.Supplier
+import scala.collection.JavaConversions._
+
+class ShowCommitExtraMetadataProcedure() extends BaseProcedure with ProcedureBuilder {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 100),
+ ProcedureParameter.optional(2, "instant_time", DataTypes.StringType, None),
+ ProcedureParameter.optional(3, "metadata_key", DataTypes.StringType, None)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty),
+ StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
+ StructField("metadata_key", DataTypes.StringType, nullable = true, Metadata.empty),
+ StructField("metadata_value", DataTypes.StringType, nullable = true, Metadata.empty)
+ ))
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
+ val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
+ val instantTime = getArgValueOrDefault(args, PARAMETERS(2))
+ val metadataKey = getArgValueOrDefault(args, PARAMETERS(3))
+
+ val hoodieCatalogTable = HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
+ val basePath = hoodieCatalogTable.tableLocation
+ val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
+ val activeTimeline = metaClient.getActiveTimeline
+ val timeline = activeTimeline.getCommitsTimeline.filterCompletedInstants
+
+ val hoodieInstantOption: Option[HoodieInstant] = if (instantTime.isEmpty) {
+ getCommitForLastInstant(timeline)
+ } else {
+ getCommitForInstant(timeline, instantTime.get.asInstanceOf[String])
+ }
+
+ if (hoodieInstantOption.isEmpty) {
+ throw new HoodieException(s"Commit $instantTime not found in Commits $timeline.")
+ }
+
+ val commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption)
+
+ if (commitMetadataOptional.isEmpty) {
+ throw new HoodieException(s"Commit $instantTime not found commitMetadata in Commits $timeline.")
+ }
+
+ val meta = commitMetadataOptional.get
+ val timestamp: String = hoodieInstantOption.get.getTimestamp
+ val action: String = hoodieInstantOption.get.getAction
+ val metadatas: util.Map[String, String] = if (metadataKey.isEmpty) {
+ meta.getExtraMetadata
+ } else {
+ meta.getExtraMetadata.filter(r => r._1.equals(metadataKey.get.asInstanceOf[String].trim))
+ }
+
+ val rows = new util.ArrayList[Row]
+ metadatas.foreach(r => rows.add(Row(timestamp, action, r._1, r._2)))
+ rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
+ }
+
+ override def build: Procedure = new ShowCommitExtraMetadataProcedure()
+
+ private def getCommitForLastInstant(timeline: HoodieTimeline): Option[HoodieInstant] = {
+ val instantOptional = timeline.getReverseOrderedInstants
+ .findFirst
+ if (instantOptional.isPresent) {
+ Option.apply(instantOptional.get())
+ } else {
+ Option.empty
+ }
+ }
+
+ private def getCommitForInstant(timeline: HoodieTimeline, instantTime: String): Option[HoodieInstant] = {
+ val instants: util.List[HoodieInstant] = util.Arrays.asList(
+ new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime),
+ new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime),
+ new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime))
+
+ val hoodieInstant: Option[HoodieInstant] = instants.find((i: HoodieInstant) => timeline.containsInstant(i))
+ hoodieInstant
+ }
+
+ private def getHoodieCommitMetadata(timeline: HoodieTimeline, hoodieInstant: Option[HoodieInstant]): Option[HoodieCommitMetadata] = {
+ if (hoodieInstant.isDefined) {
+ if (hoodieInstant.get.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) {
+ Option(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get,
+ classOf[HoodieReplaceCommitMetadata]))
+ } else {
+ Option(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get).get,
+ classOf[HoodieCommitMetadata]))
+ }
+ } else {
+ Option.empty
+ }
+ }
+}
+
+object ShowCommitExtraMetadataProcedure {
+ val NAME = "show_commit_extra_metadata"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new ShowCommitExtraMetadataProcedure()
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala
index 2840b22434..03cf26800d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCommitsProcedure.scala
@@ -61,9 +61,7 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {
// collect archived commits for table
val endTs = commits(0).get(0).toString
val archivedCommits = spark.sql(s"""call show_archived_commits(table => '$tableName', end_ts => '$endTs')""").collect()
- assertResult(4) {
- archivedCommits.length
- }
+ assertResult(4){archivedCommits.length}
}
}
@@ -109,9 +107,7 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {
// collect archived commits for table
val endTs = commits(0).get(0).toString
val archivedCommits = spark.sql(s"""call show_archived_commits_metadata(table => '$tableName', end_ts => '$endTs')""").collect()
- assertResult(4) {
- archivedCommits.length
- }
+ assertResult(4){archivedCommits.length}
}
}
@@ -288,4 +284,50 @@ class TestCommitsProcedure extends HoodieSparkProcedureTestBase {
assertResult(1){result.length}
}
}
+
+ test("Test Call show_commit_extra_metadata Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+ // Check required fields
+ checkExceptionContain(s"""call show_commit_extra_metadata()""")(
+ s"arguments is empty")
+
+ // collect commits for table
+ val commits = spark.sql(s"""call show_commits(table => '$tableName', limit => 10)""").collect()
+ assertResult(2){commits.length}
+
+ val instant_time = commits(0).get(0).toString
+ // get specify instantTime's extraMetadatas
+ val metadatas1 = spark.sql(s"""call show_commit_extra_metadata(table => '$tableName', instant_time => '$instant_time')""").collect()
+ assertResult(true){metadatas1.length > 0}
+
+ // get last instantTime's extraMetadatas
+ val metadatas2 = spark.sql(s"""call show_commit_extra_metadata(table => '$tableName')""").collect()
+ assertResult(true){metadatas2.length > 0}
+
+ // get last instantTime's extraMetadatas and filter extraMetadatas with metadata_key
+ val metadatas3 = spark.sql(s"""call show_commit_extra_metadata(table => '$tableName', metadata_key => 'schema')""").collect()
+ assertResult(1){metadatas3.length}
+ }
+ }
}