You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/10/24 06:26:36 UTC

[kyuubi] branch master updated: [KYUUBI #5479][AUTHZ] Support Hudi CallProcedureHoodieCommand for stored procedures

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 64578d133 [KYUUBI #5479][AUTHZ] Support Hudi CallProcedureHoodieCommand for stored procedures
64578d133 is described below

commit 64578d13328db22405efb2347513301c0cec5ee5
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Tue Oct 24 14:26:27 2023 +0800

    [KYUUBI #5479][AUTHZ] Support Hudi CallProcedureHoodieCommand for stored procedures
    
    ### _Why are the changes needed?_
    To close #5479
    Support Hudi CallProcedureHoodieCommand,  grammar https://hudi.apache.org/docs/procedures/
    
    - CallProcedureHoodieCommand: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CallProcedureHoodieCommand.scala
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    No
    
    Closes #5502 from AngersZhuuuu/KYUUBI-5479.
    
    Closes #5479
    
    583df35b4 [Angerszhuuuu] Update tableExtractors.scala
    6b51a9d6c [Bowen Liang] refactor extractors in more scala way, and use lookupExtractor for reusing StringTableExtractor singleton
    cde7992da [Angerszhuuuu] [KYUUBI #5479][AUTHZ] Support Hudi CallProcedureHoodieCommand
    
    Lead-authored-by: Angerszhuuuu <an...@gmail.com>
    Co-authored-by: Bowen Liang <li...@gf.com.cn>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 ....kyuubi.plugin.spark.authz.serde.TableExtractor |   2 +
 .../src/main/resources/table_command_spec.json     |  31 +++
 .../plugin/spark/authz/serde/tableExtractors.scala | 227 ++++++++++++++++++++-
 .../plugin/spark/authz/gen/HudiCommands.scala      |  19 ++
 .../HudiCatalogRangerSparkExtensionSuite.scala     |  50 +++++
 5 files changed, 328 insertions(+), 1 deletion(-)

diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
index 33c8b8759..dc35a8f51 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
+++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
@@ -21,6 +21,8 @@ org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.ExpressionSeqTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.HudiDataSourceV2RelationTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoTargetTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.HudiCallProcedureInputTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.HudiCallProcedureOutputTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.IdentifierTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.LogicalRelationTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 0b09c902f..f4d5eb60a 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -1513,6 +1513,37 @@
   } ],
   "opType" : "ALTERTABLE_PROPERTIES",
   "queryDescs" : [ ]
+}, {
+  "classname" : "org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand",
+  "tableDescs" : [ {
+    "fieldName" : "clone",
+    "fieldExtractor" : "HudiCallProcedureInputTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "OTHER"
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : true,
+    "setCurrentDatabaseIfMissing" : true
+  }, {
+    "fieldName" : "clone",
+    "fieldExtractor" : "HudiCallProcedureOutputTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE"
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : true
+  } ],
+  "opType" : "QUERY",
+  "queryDescs" : [ ]
 }, {
   "classname" : "org.apache.spark.sql.hudi.command.CompactionHoodieTableCommand",
   "tableDescs" : [ {
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index fcd8c8207..2c212cc5c 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -18,14 +18,17 @@
 package org.apache.kyuubi.plugin.spark.authz.serde
 
 import java.util.{Map => JMap}
+import java.util.LinkedHashMap
 
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
 import org.apache.kyuubi.util.reflect.ReflectUtils._
@@ -266,3 +269,225 @@ class HudiMergeIntoTargetTableExtractor extends TableExtractor {
     }
   }
 }
+
+abstract class HudiCallProcedureTableExtractor extends TableExtractor {
+
+  protected def extractTableIdentifier(
+      procedure: AnyRef,
+      args: AnyRef,
+      tableParameterKey: String): Option[String] = {
+    val tableIdentifierParameter =
+      invokeAs[Array[AnyRef]](procedure, "parameters")
+        .find(invokeAs[String](_, "name").equals(tableParameterKey))
+        .getOrElse(throw new IllegalArgumentException(s"Could not find param $tableParameterKey"))
+    val tableIdentifierParameterIndex = invokeAs[LinkedHashMap[String, Int]](args, "map")
+      .getOrDefault(tableParameterKey, INVALID_INDEX)
+    tableIdentifierParameterIndex match {
+      case INVALID_INDEX =>
+        None
+      case argsIndex =>
+        val dataType = invokeAs[DataType](tableIdentifierParameter, "dataType")
+        val row = invokeAs[InternalRow](args, "internalRow")
+        val tableName = InternalRow.getAccessor(dataType, true)(row, argsIndex)
+        Option(tableName.asInstanceOf[UTF8String].toString)
+    }
+  }
+
+  case class ProcedureArgsInputOutputPair(
+      input: Option[String] = None,
+      output: Option[String] = None)
+
+  protected val PROCEDURE_CLASS_PATH = "org.apache.spark.sql.hudi.command.procedures"
+
+  protected val INVALID_INDEX = -1
+
+  // These pairs are used to get the procedure input/output args which user passed in call command.
+  protected val procedureArgsInputOutputPairs: Map[String, ProcedureArgsInputOutputPair] = Map(
+    (
+      s"$PROCEDURE_CLASS_PATH.ArchiveCommitsProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.CommitsCompareProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.CopyToTableProcedure",
+      ProcedureArgsInputOutputPair(
+        input = Some("table"),
+        output = Some("new_table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.CopyToTempViewProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.CreateMetadataTableProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.CreateSavepointProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.DeleteMarkerProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.DeleteMetadataTableProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.DeleteSavepointProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ExportInstantsProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.HdfsParquetImportProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.HelpProcedure",
+      ProcedureArgsInputOutputPair()),
+    (
+      s"$PROCEDURE_CLASS_PATH.HiveSyncProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.InitMetadataTableProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.RepairAddpartitionmetaProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.RepairCorruptedCleanFilesProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.RepairDeduplicateProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.RepairMigratePartitionMetaProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.RepairOverwriteHoodiePropsProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.RollbackToInstantTimeProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.RollbackToSavepointProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.RunBootstrapProcedure",
+      ProcedureArgsInputOutputPair(
+        input = Some("table"),
+        output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.RunCleanProcedure",
+      ProcedureArgsInputOutputPair(
+        input = Some("table"),
+        output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.RunClusteringProcedure",
+      ProcedureArgsInputOutputPair(
+        input = Some("table"),
+        output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.RunCompactionProcedure",
+      ProcedureArgsInputOutputPair(
+        input = Some("table"),
+        output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowArchivedCommitsProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowBootstrapMappingProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowClusteringProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowCommitExtraMetadataProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowCommitFilesProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowCommitPartitionsProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowCommitWriteStatsProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowCompactionProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowFileSystemViewProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowFsPathDetailProcedure",
+      ProcedureArgsInputOutputPair()),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowHoodieLogFileMetadataProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowHoodieLogFileRecordsProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowInvalidParquetProcedure",
+      ProcedureArgsInputOutputPair()),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowMetadataTableFilesProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowMetadataTablePartitionsProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowMetadataTableStatsProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowRollbacksProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowSavepointsProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ShowTablePropertiesProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.StatsFileSizeProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.StatsWriteAmplificationProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.UpgradeOrDowngradeProcedure",
+      ProcedureArgsInputOutputPair(output = Some("table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ValidateHoodieSyncProcedure",
+      ProcedureArgsInputOutputPair(
+        input = Some("src_table"),
+        output = Some("dst_table"))),
+    (
+      s"$PROCEDURE_CLASS_PATH.ValidateMetadataTableFilesProcedure",
+      ProcedureArgsInputOutputPair(input = Some("table"))))
+}
+
+class HudiCallProcedureOutputTableExtractor
+  extends HudiCallProcedureTableExtractor {
+  override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+    val procedure = invokeAs[AnyRef](v1, "procedure")
+    val args = invokeAs[AnyRef](v1, "args")
+    procedureArgsInputOutputPairs.get(procedure.getClass.getName)
+      .filter(_.output.isDefined)
+      .map { argsPairs =>
+        val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.output.get)
+        lookupExtractor[StringTableExtractor].apply(spark, tableIdentifier.get).orNull
+      }
+  }
+}
+
+class HudiCallProcedureInputTableExtractor
+  extends HudiCallProcedureTableExtractor {
+  override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+    val procedure = invokeAs[AnyRef](v1, "procedure")
+    val args = invokeAs[AnyRef](v1, "args")
+    procedureArgsInputOutputPairs.get(procedure.getClass.getName)
+      .filter(_.input.isDefined)
+      .map { argsPairs =>
+        val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.input.get)
+        lookupExtractor[StringTableExtractor].apply(spark, tableIdentifier.get).orNull
+      }
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
index 522059f27..7909bef9b 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala
@@ -200,12 +200,31 @@ object HudiCommands {
     TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDescs))
   }
 
+  val CallProcedureHoodieCommand = {
+    val cmd = "org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand"
+    TableCommandSpec(
+      cmd,
+      Seq(
+        TableDesc(
+          "clone",
+          classOf[HudiCallProcedureInputTableExtractor],
+          actionTypeDesc = Some(ActionTypeDesc(actionType = Some(OTHER))),
+          isInput = true,
+          setCurrentDatabaseIfMissing = true),
+        TableDesc(
+          "clone",
+          classOf[HudiCallProcedureOutputTableExtractor],
+          actionTypeDesc = Some(ActionTypeDesc(actionType = Some(UPDATE))),
+          setCurrentDatabaseIfMissing = true)))
+  }
+
   val data: Array[TableCommandSpec] = Array(
     AlterHoodieTableAddColumnsCommand,
     AlterHoodieTableChangeColumnCommand,
     AlterHoodieTableDropPartitionCommand,
     AlterHoodieTableRenameCommand,
     AlterTableCommand,
+    CallProcedureHoodieCommand,
     CreateHoodieTableAsSelectCommand,
     CreateHoodieTableCommand,
     CreateHoodieTableLikeCommand,
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
index fd7acd129..7e7c3ad9e 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
@@ -472,4 +472,54 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
       }
     }
   }
+
+  test("CallProcedureHoodieCommand") {
+    withSingleCallEnabled {
+      withCleanTmpResources(Seq(
+        (s"$namespace1.$table1", "table"),
+        (s"$namespace1.$table2", "table"),
+        (namespace1, "database"))) {
+        doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+        doAs(
+          admin,
+          sql(
+            s"""
+               |CREATE TABLE IF NOT EXISTS $namespace1.$table1(id int, name string, city string)
+               |USING HUDI
+               |OPTIONS (
+               | type = 'cow',
+               | primaryKey = 'id',
+               | 'hoodie.datasource.hive_sync.enable' = 'false'
+               |)
+               |PARTITIONED BY(city)
+               |""".stripMargin))
+        doAs(
+          admin,
+          sql(
+            s"""
+               |CREATE TABLE IF NOT EXISTS $namespace1.$table2(id int, name string, city string)
+               |USING HUDI
+               |OPTIONS (
+               | type = 'cow',
+               | primaryKey = 'id',
+               | 'hoodie.datasource.hive_sync.enable' = 'false'
+               |)
+               |PARTITIONED BY(city)
+               |""".stripMargin))
+
+        val copy_to_table =
+          s"CALL copy_to_table(table => '$namespace1.$table1', new_table => '$namespace1.$table2')"
+        interceptContains[AccessControlException] {
+          doAs(someone, sql(copy_to_table))
+        }(s"does not have [select] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(copy_to_table))
+
+        val show_table_properties = s"CALL show_table_properties(table => '$namespace1.$table1')"
+        interceptContains[AccessControlException] {
+          doAs(someone, sql(show_table_properties))
+        }(s"does not have [select] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(show_table_properties))
+      }
+    }
+  }
 }