You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/31 11:05:53 UTC
[hudi] 15/17: [HUDI-5548] spark sql update hudi's table properties
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4bd7cf428bdcfababab67e2458b62cf20133454c
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Mon Jan 16 19:47:30 2023 +0800
[HUDI-5548] spark sql update hudi's table properties
---
.../command/ShowHoodieTablePropertiesCommand.scala | 61 ++++++++++++++++++++++
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 1 +
.../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 9 ++++
.../hudi/command/Spark31AlterTableCommand.scala | 31 ++++++++++-
.../sql/hudi/analysis/HoodieSpark3Analysis.scala | 6 ++-
.../spark/sql/hudi/command/AlterTableCommand.scala | 35 ++++++++++++-
6 files changed, 138 insertions(+), 5 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePropertiesCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePropertiesCommand.scala
new file mode 100644
index 00000000000..8faa3354bea
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePropertiesCommand.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.collection.JavaConversions.mapAsScalaMap
+
+/**
+ * Command for show hudi table's properties.
+ */
+case class ShowHoodieTablePropertiesCommand(
+ tableIdentifier: TableIdentifier,
+ propertyKey: Option[String])
+ extends HoodieLeafRunnableCommand {
+
+ override val output: Seq[Attribute] = {
+ val schema = AttributeReference("value", StringType, nullable = false)() :: Nil
+ propertyKey match {
+ case None => AttributeReference("key", StringType, nullable = false)() :: schema
+ case _ => schema
+ }
+ }
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ if (!sparkAdapter.isHoodieTable(tableIdentifier, sparkSession)) {
+ Seq.empty[Row]
+ } else {
+ val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
+ val tableProps = hoodieCatalogTable.metaClient.getTableConfig.getProps
+ propertyKey match {
+ case Some(p) =>
+ val propValue = tableProps
+ .getOrElse(p, s"Table ${tableIdentifier.unquotedString} does not have property: $p")
+ Seq(Row(propValue))
+ case None =>
+ tableProps.map(p => Row(p._1, p._2)).toSeq
+ }
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index c8add030981..e42bddbd102 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -610,6 +610,7 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
case TruncateTableCommand(tableName, partitionSpec)
if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
TruncateHoodieTableCommand(tableName, partitionSpec)
+ case s: ShowTablePropertiesCommand => ShowHoodieTablePropertiesCommand(s.table, s.propertyKey)
case _ => plan
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 65357b903b5..285fcc3f322 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -22,7 +22,10 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.sync.common.HoodieMetaSyncOperations
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
+import org.apache.hudi.sync.common.HoodieMetaSyncOperations
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.functions.{arrays_zip, col}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
@@ -188,6 +191,12 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val meta = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
assert(meta.comment.get.equals("it is a hudi table"))
assert(Seq("key1", "key2").filter(meta.properties.contains(_)).size == 2)
+
+ // test show properties
+ assertResult(tableName) {
+ spark.sql(s"SHOW TBLPROPERTIES $tableName ('hoodie.table.name')").collect().apply(0).get(0)
+ }
+
// test unset propertes
spark.sql(s"alter table $tableName unset tblproperties(comment, 'key1', 'key2')")
val unsetMeta = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
index 529b5bb49ec..debcf0aadb8 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
@@ -29,7 +29,7 @@ import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceUtils}
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
import org.apache.hudi.common.table.timeline.HoodieInstant.State
-import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, Option}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.internal.schema.InternalSchema
@@ -48,6 +48,7 @@ import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColu
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.types.StructType
+import java.util.Properties
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
@@ -169,6 +170,12 @@ case class Spark31AlterTableCommand(table: CatalogTable, changes: Seq[TableChang
val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
val newTable = table.copy(properties = newProperties, comment = tableComment)
catalog.alterTable(newTable)
+
+ // delete hoodie table's config file
+ val deleteProps: util.Set[String] = new util.HashSet[String]()
+ propKeys.foreach(v => deleteProps.add(v))
+ Spark31AlterTableCommand.deleteTableProperties(sparkSession, deleteProps, table)
+
logInfo("table properties change finished")
}
@@ -183,6 +190,12 @@ case class Spark31AlterTableCommand(table: CatalogTable, changes: Seq[TableChang
properties = table.properties ++ properties,
comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment))
catalog.alterTable(newTable)
+
+ // upserts the hoodie table's config file
+ val updatedProps = new Properties
+ properties.foreach(u => updatedProps.setProperty(u._1, u._2))
+ Spark31AlterTableCommand.updateTableProperties(sparkSession, updatedProps, table)
+
logInfo("table properties change finished")
}
@@ -320,5 +333,21 @@ object Spark31AlterTableCommand extends Logging {
}
}
}
+
+ def updateTableProperties(sparkSession: SparkSession, updatedProps: Properties, table: CatalogTable): Any = {
+ val path = Spark31AlterTableCommand.getTableLocation(table, sparkSession)
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
+ .setConf(hadoopConf).build()
+ HoodieTableConfig.update(metaClient.getFs, new Path(metaClient.getMetaPath), updatedProps)
+ }
+
+ def deleteTableProperties(sparkSession: SparkSession, deletedProps: util.Set[String], table: CatalogTable): Any = {
+ val path = Spark31AlterTableCommand.getTableLocation(table, sparkSession)
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
+ .setConf(hadoopConf).build()
+ HoodieTableConfig.delete(metaClient.getFs, new Path(metaClient.getMetaPath), deletedProps)
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
index 1f11caedb8c..ab6b9ebbbd5 100644
--- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
+++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
@@ -25,12 +25,14 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.connector.catalog.{Table, V1Table}
+import org.apache.spark.sql.execution.command.ShowTablePropertiesCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, removeMetaFields}
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
-import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand}
+import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, ShowHoodieTablePropertiesCommand, TruncateHoodieTableCommand}
+import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{SQLContext, SparkSession}
@@ -150,7 +152,7 @@ case class HoodieSpark3PostAnalysisRule(sparkSession: SparkSession) extends Rule
purge,
retainData = true
)
-
+ case s: ShowTablePropertiesCommand => ShowHoodieTablePropertiesCommand(s.table, s.propertyKey)
case _ => plan
}
}
diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
index bca3e7050c7..8095a26fc42 100644
--- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
@@ -29,7 +29,7 @@ import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
import org.apache.hudi.{DataSourceOptionsHelper, DataSourceUtils}
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
import org.apache.hudi.common.table.timeline.HoodieInstant.State
-import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, Option}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.internal.schema.InternalSchema
@@ -45,8 +45,11 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RemoveProperty, SetProperty}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.hudi.command.AlterTableCommand.getTableLocation
import org.apache.spark.sql.types.StructType
+import java.util.Properties
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
@@ -185,13 +188,19 @@ case class AlterTableCommand(table: CatalogTable, changes: Seq[TableChange], cha
// ignore NonExist unset
propKeys.foreach { k =>
if (!table.properties.contains(k) && k != TableCatalog.PROP_COMMENT) {
- logWarning(s"find non exist unset property: ${k} , ignore it")
+ logWarning(s"find non exist unset property: $k , ignore it")
}
}
val tableComment = if (propKeys.contains(TableCatalog.PROP_COMMENT)) None else table.comment
val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
val newTable = table.copy(properties = newProperties, comment = tableComment)
catalog.alterTable(newTable)
+
+ // delete hoodie table's config file
+ val deleteProps: util.Set[String] = new util.HashSet[String]()
+ propKeys.foreach(v => deleteProps.add(v))
+ AlterTableCommand.deleteTableProperties(sparkSession, deleteProps, table)
+
logInfo("table properties change finished")
}
@@ -206,6 +215,12 @@ case class AlterTableCommand(table: CatalogTable, changes: Seq[TableChange], cha
properties = table.properties ++ properties,
comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment))
catalog.alterTable(newTable)
+
+ // upserts the hoodie table's config file
+ val updatedProps = new Properties
+ properties.foreach(u => updatedProps.setProperty(u._1, u._2))
+ AlterTableCommand.updateTableProperties(sparkSession, updatedProps, table)
+
logInfo("table properties change finished")
}
@@ -343,5 +358,21 @@ object AlterTableCommand extends Logging {
ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue
) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
}
+
+ def updateTableProperties(sparkSession: SparkSession, updatedProps: Properties, table: CatalogTable): Any = {
+ val path = AlterTableCommand.getTableLocation(table, sparkSession)
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
+ .setConf(hadoopConf).build()
+ HoodieTableConfig.update(metaClient.getFs, new Path(metaClient.getMetaPath), updatedProps)
+ }
+
+ def deleteTableProperties(sparkSession: SparkSession, deletedProps: util.Set[String], table: CatalogTable): Any = {
+ val path = AlterTableCommand.getTableLocation(table, sparkSession)
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
+ .setConf(hadoopConf).build()
+ HoodieTableConfig.delete(metaClient.getFs, new Path(metaClient.getMetaPath), deletedProps)
+ }
}