You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/10/24 06:26:36 UTC
[kyuubi] branch master updated: [KYUUBI #5479][AUTHZ] Support Hudi CallProcedureHoodieCommand for stored procedures
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 64578d133 [KYUUBI #5479][AUTHZ] Support Hudi CallProcedureHoodieCommand for stored procedures
64578d133 is described below
commit 64578d13328db22405efb2347513301c0cec5ee5
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Tue Oct 24 14:26:27 2023 +0800
[KYUUBI #5479][AUTHZ] Support Hudi CallProcedureHoodieCommand for stored procedures
### _Why are the changes needed?_
To close #5479
Support Hudi CallProcedureHoodieCommand, grammar https://hudi.apache.org/docs/procedures/
- CallProcedureHoodieCommand: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CallProcedureHoodieCommand.scala
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No
Closes #5502 from AngersZhuuuu/KYUUBI-5479.
Closes #5479
583df35b4 [Angerszhuuuu] Update tableExtractors.scala
6b51a9d6c [Bowen Liang] refactor extractors in more scala way, and use lookupExtractor for reusing StringTableExtractor singleton
cde7992da [Angerszhuuuu] [KYUUBI #5479][AUTHZ] Support Hudi CallProcedureHoodieCommand
Lead-authored-by: Angerszhuuuu <an...@gmail.com>
Co-authored-by: Bowen Liang <li...@gf.com.cn>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
....kyuubi.plugin.spark.authz.serde.TableExtractor | 2 +
.../src/main/resources/table_command_spec.json | 31 +++
.../plugin/spark/authz/serde/tableExtractors.scala | 227 ++++++++++++++++++++-
.../plugin/spark/authz/gen/HudiCommands.scala | 19 ++
.../HudiCatalogRangerSparkExtensionSuite.scala | 50 +++++
5 files changed, 328 insertions(+), 1 deletion(-)
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
index 33c8b8759..dc35a8f51 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
+++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
@@ -21,6 +21,8 @@ org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ExpressionSeqTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiDataSourceV2RelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoTargetTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.HudiCallProcedureInputTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.HudiCallProcedureOutputTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalRelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 0b09c902f..f4d5eb60a 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -1513,6 +1513,37 @@
} ],
"opType" : "ALTERTABLE_PROPERTIES",
"queryDescs" : [ ]
+}, {
+ "classname" : "org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand",
+ "tableDescs" : [ {
+ "fieldName" : "clone",
+ "fieldExtractor" : "HudiCallProcedureInputTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "OTHER"
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : true,
+ "setCurrentDatabaseIfMissing" : true
+ }, {
+ "fieldName" : "clone",
+ "fieldExtractor" : "HudiCallProcedureOutputTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE"
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : true
+ } ],
+ "opType" : "QUERY",
+ "queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.CompactionHoodieTableCommand",
"tableDescs" : [ {
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index fcd8c8207..2c212cc5c 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -18,14 +18,17 @@
package org.apache.kyuubi.plugin.spark.authz.serde
import java.util.{Map => JMap}
+import java.util.LinkedHashMap
import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
import org.apache.kyuubi.util.reflect.ReflectUtils._
@@ -266,3 +269,225 @@ class HudiMergeIntoTargetTableExtractor extends TableExtractor {
}
}
}
+
+abstract class HudiCallProcedureTableExtractor extends TableExtractor {
+
+ protected def extractTableIdentifier(
+ procedure: AnyRef,
+ args: AnyRef,
+ tableParameterKey: String): Option[String] = {
+ val tableIdentifierParameter =
+ invokeAs[Array[AnyRef]](procedure, "parameters")
+ .find(invokeAs[String](_, "name").equals(tableParameterKey))
+ .getOrElse(throw new IllegalArgumentException(s"Could not find param $tableParameterKey"))
+ val tableIdentifierParameterIndex = invokeAs[LinkedHashMap[String, Int]](args, "map")
+ .getOrDefault(tableParameterKey, INVALID_INDEX)
+ tableIdentifierParameterIndex match {
+ case INVALID_INDEX =>
+ None
+ case argsIndex =>
+ val dataType = invokeAs[DataType](tableIdentifierParameter, "dataType")
+ val row = invokeAs[InternalRow](args, "internalRow")
+ val tableName = InternalRow.getAccessor(dataType, true)(row, argsIndex)
+ Option(tableName.asInstanceOf[UTF8String].toString)
+ }
+ }
+
+ case class ProcedureArgsInputOutputPair(
+ input: Option[String] = None,
+ output: Option[String] = None)
+
+ protected val PROCEDURE_CLASS_PATH = "org.apache.spark.sql.hudi.command.procedures"
+
+ protected val INVALID_INDEX = -1
+
+ // These pairs are used to get the procedure input/output args which user passed in call command.
+ protected val procedureArgsInputOutputPairs: Map[String, ProcedureArgsInputOutputPair] = Map(
+ (
+ s"$PROCEDURE_CLASS_PATH.ArchiveCommitsProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.CommitsCompareProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.CopyToTableProcedure",
+ ProcedureArgsInputOutputPair(
+ input = Some("table"),
+ output = Some("new_table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.CopyToTempViewProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.CreateMetadataTableProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.CreateSavepointProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.DeleteMarkerProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.DeleteMetadataTableProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.DeleteSavepointProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ExportInstantsProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.HdfsParquetImportProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.HelpProcedure",
+ ProcedureArgsInputOutputPair()),
+ (
+ s"$PROCEDURE_CLASS_PATH.HiveSyncProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.InitMetadataTableProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.RepairAddpartitionmetaProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.RepairCorruptedCleanFilesProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.RepairDeduplicateProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.RepairMigratePartitionMetaProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.RepairOverwriteHoodiePropsProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.RollbackToInstantTimeProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.RollbackToSavepointProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.RunBootstrapProcedure",
+ ProcedureArgsInputOutputPair(
+ input = Some("table"),
+ output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.RunCleanProcedure",
+ ProcedureArgsInputOutputPair(
+ input = Some("table"),
+ output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.RunClusteringProcedure",
+ ProcedureArgsInputOutputPair(
+ input = Some("table"),
+ output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.RunCompactionProcedure",
+ ProcedureArgsInputOutputPair(
+ input = Some("table"),
+ output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowArchivedCommitsProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowBootstrapMappingProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowClusteringProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowCommitExtraMetadataProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowCommitFilesProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowCommitPartitionsProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowCommitWriteStatsProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowCompactionProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowFileSystemViewProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowFsPathDetailProcedure",
+ ProcedureArgsInputOutputPair()),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowHoodieLogFileMetadataProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowHoodieLogFileRecordsProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowInvalidParquetProcedure",
+ ProcedureArgsInputOutputPair()),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowMetadataTableFilesProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowMetadataTablePartitionsProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowMetadataTableStatsProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowRollbacksProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowSavepointsProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ShowTablePropertiesProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.StatsFileSizeProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.StatsWriteAmplificationProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.UpgradeOrDowngradeProcedure",
+ ProcedureArgsInputOutputPair(output = Some("table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ValidateHoodieSyncProcedure",
+ ProcedureArgsInputOutputPair(
+ input = Some("src_table"),
+ output = Some("dst_table"))),
+ (
+ s"$PROCEDURE_CLASS_PATH.ValidateMetadataTableFilesProcedure",
+ ProcedureArgsInputOutputPair(input = Some("table"))))
+}
+
+class HudiCallProcedureOutputTableExtractor
+ extends HudiCallProcedureTableExtractor {
+ override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+ val procedure = invokeAs[AnyRef](v1, "procedure")
+ val args = invokeAs[AnyRef](v1, "args")
+ procedureArgsInputOutputPairs.get(procedure.getClass.getName)
+ .filter(_.output.isDefined)
+ .map { argsPairs =>
+ val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.output.get)
+ lookupExtractor[StringTableExtractor].apply(spark, tableIdentifier.get).orNull
+ }
+ }
+}
+
+class HudiCallProcedureInputTableExtractor
+ extends HudiCallProcedureTableExtractor {
+ override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+ val procedure = invokeAs[AnyRef](v1, "procedure")
+ val args = invokeAs[AnyRef](v1, "args")
+ procedureArgsInputOutputPairs.get(procedure.getClass.getName)
+ .filter(_.input.isDefined)
+ .map { argsPairs =>
+ val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.input.get)
+ lookupExtractor[StringTableExtractor].apply(spark, tableIdentifier.get).orNull
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
index 522059f27..7909bef9b 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
@@ -200,12 +200,31 @@ object HudiCommands {
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDescs))
}
+ val CallProcedureHoodieCommand = {
+ val cmd = "org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand"
+ TableCommandSpec(
+ cmd,
+ Seq(
+ TableDesc(
+ "clone",
+ classOf[HudiCallProcedureInputTableExtractor],
+ actionTypeDesc = Some(ActionTypeDesc(actionType = Some(OTHER))),
+ isInput = true,
+ setCurrentDatabaseIfMissing = true),
+ TableDesc(
+ "clone",
+ classOf[HudiCallProcedureOutputTableExtractor],
+ actionTypeDesc = Some(ActionTypeDesc(actionType = Some(UPDATE))),
+ setCurrentDatabaseIfMissing = true)))
+ }
+
val data: Array[TableCommandSpec] = Array(
AlterHoodieTableAddColumnsCommand,
AlterHoodieTableChangeColumnCommand,
AlterHoodieTableDropPartitionCommand,
AlterHoodieTableRenameCommand,
AlterTableCommand,
+ CallProcedureHoodieCommand,
CreateHoodieTableAsSelectCommand,
CreateHoodieTableCommand,
CreateHoodieTableLikeCommand,
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
index fd7acd129..7e7c3ad9e 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
@@ -472,4 +472,54 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
}
}
}
+
+ test("CallProcedureHoodieCommand") {
+ withSingleCallEnabled {
+ withCleanTmpResources(Seq(
+ (s"$namespace1.$table1", "table"),
+ (s"$namespace1.$table2", "table"),
+ (namespace1, "database"))) {
+ doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+ doAs(
+ admin,
+ sql(
+ s"""
+ |CREATE TABLE IF NOT EXISTS $namespace1.$table1(id int, name string, city string)
+ |USING HUDI
+ |OPTIONS (
+ | type = 'cow',
+ | primaryKey = 'id',
+ | 'hoodie.datasource.hive_sync.enable' = 'false'
+ |)
+ |PARTITIONED BY(city)
+ |""".stripMargin))
+ doAs(
+ admin,
+ sql(
+ s"""
+ |CREATE TABLE IF NOT EXISTS $namespace1.$table2(id int, name string, city string)
+ |USING HUDI
+ |OPTIONS (
+ | type = 'cow',
+ | primaryKey = 'id',
+ | 'hoodie.datasource.hive_sync.enable' = 'false'
+ |)
+ |PARTITIONED BY(city)
+ |""".stripMargin))
+
+ val copy_to_table =
+ s"CALL copy_to_table(table => '$namespace1.$table1', new_table => '$namespace1.$table2')"
+ interceptContains[AccessControlException] {
+ doAs(someone, sql(copy_to_table))
+ }(s"does not have [select] privilege on [$namespace1/$table1]")
+ doAs(admin, sql(copy_to_table))
+
+ val show_table_properties = s"CALL show_table_properties(table => '$namespace1.$table1')"
+ interceptContains[AccessControlException] {
+ doAs(someone, sql(show_table_properties))
+ }(s"does not have [select] privilege on [$namespace1/$table1]")
+ doAs(admin, sql(show_table_properties))
+ }
+ }
+ }
}