You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/31 11:05:53 UTC

[hudi] 15/17: [HUDI-5548] spark sql update hudi's table properties

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4bd7cf428bdcfababab67e2458b62cf20133454c
Author: XuQianJin-Stars <fo...@apache.com>
AuthorDate: Mon Jan 16 19:47:30 2023 +0800

    [HUDI-5548] spark sql update hudi's table properties
---
 .../command/ShowHoodieTablePropertiesCommand.scala | 61 ++++++++++++++++++++++
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |  1 +
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  |  9 ++++
 .../hudi/command/Spark31AlterTableCommand.scala    | 31 ++++++++++-
 .../sql/hudi/analysis/HoodieSpark3Analysis.scala   |  6 ++-
 .../spark/sql/hudi/command/AlterTableCommand.scala | 35 ++++++++++++-
 6 files changed, 138 insertions(+), 5 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePropertiesCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePropertiesCommand.scala
new file mode 100644
index 00000000000..8faa3354bea
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePropertiesCommand.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.collection.JavaConversions.mapAsScalaMap
+
+/**
+ * Command for show hudi table's properties.
+ */
+case class ShowHoodieTablePropertiesCommand(
+    tableIdentifier: TableIdentifier,
+    propertyKey: Option[String])
+  extends HoodieLeafRunnableCommand {
+
+  override val output: Seq[Attribute] = {
+    val schema = AttributeReference("value", StringType, nullable = false)() :: Nil
+    propertyKey match {
+      case None => AttributeReference("key", StringType, nullable = false)() :: schema
+      case _ => schema
+    }
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    if (!sparkAdapter.isHoodieTable(tableIdentifier, sparkSession)) {
+      Seq.empty[Row]
+    } else {
+      val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
+      val tableProps = hoodieCatalogTable.metaClient.getTableConfig.getProps
+      propertyKey match {
+        case Some(p) =>
+          val propValue = tableProps
+            .getOrElse(p, s"Table ${tableIdentifier.unquotedString} does not have property: $p")
+          Seq(Row(propValue))
+        case None =>
+          tableProps.map(p => Row(p._1, p._2)).toSeq
+      }
+    }
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index c8add030981..e42bddbd102 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -610,6 +610,7 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
       case TruncateTableCommand(tableName, partitionSpec)
         if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
         TruncateHoodieTableCommand(tableName, partitionSpec)
+      case s: ShowTablePropertiesCommand => ShowHoodieTablePropertiesCommand(s.table, s.propertyKey)
       case _ => plan
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 65357b903b5..285fcc3f322 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -22,7 +22,10 @@ import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.sync.common.HoodieMetaSyncOperations
 import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
+import org.apache.hudi.sync.common.HoodieMetaSyncOperations
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.functions.{arrays_zip, col}
 import org.apache.spark.sql.{Row, SaveMode, SparkSession}
@@ -188,6 +191,12 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           val meta = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
           assert(meta.comment.get.equals("it is a hudi table"))
           assert(Seq("key1", "key2").filter(meta.properties.contains(_)).size == 2)
+
+          // test show properties
+          assertResult(tableName) {
+            spark.sql(s"SHOW TBLPROPERTIES $tableName ('hoodie.table.name')").collect().apply(0).get(0)
+          }
+
           // test unset propertes
           spark.sql(s"alter table $tableName unset tblproperties(comment, 'key1', 'key2')")
           val unsetMeta = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
index 529b5bb49ec..debcf0aadb8 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
@@ -29,7 +29,7 @@ import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
 import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceUtils}
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
 import org.apache.hudi.common.table.timeline.HoodieInstant.State
-import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.util.{CommitUtils, Option}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.internal.schema.InternalSchema
@@ -48,6 +48,7 @@ import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColu
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.types.StructType
 
+import java.util.Properties
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
@@ -169,6 +170,12 @@ case class Spark31AlterTableCommand(table: CatalogTable, changes: Seq[TableChang
     val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
     val newTable = table.copy(properties = newProperties, comment = tableComment)
     catalog.alterTable(newTable)
+
+    // delete hoodie table's config file
+    val deleteProps: util.Set[String] = new util.HashSet[String]()
+    propKeys.foreach(v => deleteProps.add(v))
+    Spark31AlterTableCommand.deleteTableProperties(sparkSession, deleteProps, table)
+
     logInfo("table properties change finished")
   }
 
@@ -183,6 +190,12 @@ case class Spark31AlterTableCommand(table: CatalogTable, changes: Seq[TableChang
       properties = table.properties ++ properties,
       comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment))
     catalog.alterTable(newTable)
+
+    // upserts the hoodie table's config file
+    val updatedProps = new Properties
+    properties.foreach(u => updatedProps.setProperty(u._1, u._2))
+    Spark31AlterTableCommand.updateTableProperties(sparkSession, updatedProps, table)
+
     logInfo("table properties change finished")
   }
 
@@ -320,5 +333,21 @@ object Spark31AlterTableCommand extends Logging {
       }
     }
   }
+
+  def updateTableProperties(sparkSession: SparkSession, updatedProps: Properties, table: CatalogTable): Any = {
+    val path = Spark31AlterTableCommand.getTableLocation(table, sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
+      .setConf(hadoopConf).build()
+    HoodieTableConfig.update(metaClient.getFs, new Path(metaClient.getMetaPath), updatedProps)
+  }
+
+  def deleteTableProperties(sparkSession: SparkSession, deletedProps: util.Set[String], table: CatalogTable): Any = {
+    val path = Spark31AlterTableCommand.getTableLocation(table, sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
+      .setConf(hadoopConf).build()
+    HoodieTableConfig.delete(metaClient.getFs, new Path(metaClient.getMetaPath), deletedProps)
+  }
 }
 
diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
index 1f11caedb8c..ab6b9ebbbd5 100644
--- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
+++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala
@@ -25,12 +25,14 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
 import org.apache.spark.sql.connector.catalog.{Table, V1Table}
+import org.apache.spark.sql.execution.command.ShowTablePropertiesCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, removeMetaFields}
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
-import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand}
+import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, ShowHoodieTablePropertiesCommand, TruncateHoodieTableCommand}
+import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{SQLContext, SparkSession}
 
@@ -150,7 +152,7 @@ case class HoodieSpark3PostAnalysisRule(sparkSession: SparkSession) extends Rule
           purge,
           retainData = true
         )
-
+      case s: ShowTablePropertiesCommand => ShowHoodieTablePropertiesCommand(s.table, s.propertyKey)
       case _ => plan
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
index bca3e7050c7..8095a26fc42 100644
--- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
@@ -29,7 +29,7 @@ import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
 import org.apache.hudi.{DataSourceOptionsHelper, DataSourceUtils}
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
 import org.apache.hudi.common.table.timeline.HoodieInstant.State
-import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.util.{CommitUtils, Option}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.internal.schema.InternalSchema
@@ -45,8 +45,11 @@ import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RemoveProperty, SetProperty}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.hudi.command.AlterTableCommand.getTableLocation
 import org.apache.spark.sql.types.StructType
 
+import java.util.Properties
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
@@ -185,13 +188,19 @@ case class AlterTableCommand(table: CatalogTable, changes: Seq[TableChange], cha
     // ignore NonExist unset
     propKeys.foreach { k =>
       if (!table.properties.contains(k) && k != TableCatalog.PROP_COMMENT) {
-        logWarning(s"find non exist unset property: ${k} , ignore it")
+        logWarning(s"find non exist unset property: $k , ignore it")
       }
     }
     val tableComment = if (propKeys.contains(TableCatalog.PROP_COMMENT)) None else table.comment
     val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) }
     val newTable = table.copy(properties = newProperties, comment = tableComment)
     catalog.alterTable(newTable)
+
+    // delete hoodie table's config file
+    val deleteProps: util.Set[String] = new util.HashSet[String]()
+    propKeys.foreach(v => deleteProps.add(v))
+    AlterTableCommand.deleteTableProperties(sparkSession, deleteProps, table)
+
     logInfo("table properties change finished")
   }
 
@@ -206,6 +215,12 @@ case class AlterTableCommand(table: CatalogTable, changes: Seq[TableChange], cha
       properties = table.properties ++ properties,
       comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment))
     catalog.alterTable(newTable)
+
+    // upserts the hoodie table's config file
+    val updatedProps = new Properties
+    properties.foreach(u => updatedProps.setProperty(u._1, u._2))
+    AlterTableCommand.updateTableProperties(sparkSession, updatedProps, table)
+
     logInfo("table properties change finished")
   }
 
@@ -343,5 +358,21 @@ object AlterTableCommand extends Logging {
       ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue
     ) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
   }
+
+  def updateTableProperties(sparkSession: SparkSession, updatedProps: Properties, table: CatalogTable): Any = {
+    val path = AlterTableCommand.getTableLocation(table, sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
+      .setConf(hadoopConf).build()
+    HoodieTableConfig.update(metaClient.getFs, new Path(metaClient.getMetaPath), updatedProps)
+  }
+
+  def deleteTableProperties(sparkSession: SparkSession, deletedProps: util.Set[String], table: CatalogTable): Any = {
+    val path = AlterTableCommand.getTableLocation(table, sparkSession)
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
+      .setConf(hadoopConf).build()
+    HoodieTableConfig.delete(metaClient.getFs, new Path(metaClient.getMetaPath), deletedProps)
+  }
 }