You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/09/14 10:17:28 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3424] [FEATURE] [AUTHZ] Access privilege checks for namespaces and tables of DatasourceV2

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aacbb754 [KYUUBI #3424] [FEATURE] [AUTHZ] Access privilege checks for namespaces and tables of DatasourceV2
6aacbb754 is described below

commit 6aacbb754448ac32d05288ab1d1bb1e6b78bd438
Author: Bowen Liang <li...@gf.com.cn>
AuthorDate: Wed Sep 14 18:17:17 2022 +0800

    [KYUUBI #3424] [FEATURE] [AUTHZ] Access privilege checks for namespaces and tables of DatasourceV2
    
    ### _Why are the changes needed?_
    
    close #3424.
    
    Covering the following V2 commands,
    - v2Commands
      - CreateNamespace: CREATE DATABASE
      - DropNamespace: DROP DATABASE
      - CommentOnNamespace
      - CreateTable
      - CreateV2Table
      - CreateAsSelect
      - UpdateTable
      - AppendData: Insert Using a SELECT Statement /  VALUES / TABLE
      - OverwriteByExpression: Insert Overwrite
      - OverwritePartitionsDynamic
      - CacheTable
      - DropTable
      - TruncateTable
      - AlterTable
      - MergeIntoTable
      - RepairTable
    
    - v2AlterTableCommands
      - CommentOnTable
      - AddColumns
      - DropColumns
      - RenameColumn
      - AlterColumn
      - ReplaceColumns
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3425 from bowenliang123/3424-authz-access-dsv2.
    
    Closes #3424
    
    c35ee4ef [Bowen Liang] fix: renamed to HasQueryAsLogicPlan. add comment for name rule
    8b4324b6 [Bowen Liang] fix import
    4c67dd25 [Bowen Liang] adding comments of class and methods
    dc23b68c [Bowen Liang] nit: rename field name cmdTypes to commandTypes, param name p to plan
    3e05617d [liangbowen] fix enum withName error by reverting command enum to extend super.Val
    4c099063 [liangbowen] rename V2Command to CmdPrivilegeBuilder
    ef9613c4 [liangbowen] rename all the CommandTypes without "v2" prefix . move V2CommandType into v2Command object and rename to CommandType.
    11f06454 [Bowen Liang] nit: reorder fields of V2Command
    f27f732b [Bowen Liang] nit: remove extends super.Val
    6008e20c [Bowen Liang] nit: rename operType to operationType of V2Commmand
    9727102d [Bowen Liang] change leastVer and mostVer from String to Option[String]
    ef3cb0d1 [Bowen Liang] moved implmented v2Commands name from OperationType match cases
    00eb90c3 [Bowen Liang] support AddPartitions,DropPartitions,RenamePartitions,TruncatePartition in v2Commands
    84813a5d [Bowen Liang] nit
    29cb69a2 [Bowen Liang] support MergeIntoTable in v2Commands
    c9bf97fa [Bowen Liang] support RepairTable in v2Commands
    f31abee3 [Bowen Liang] remove operType for TruncateTable
    a48f6e23 [Bowen Liang] rename handle method name of v2Commands to buildCommand
    afe03dbc [Bowen Liang] nit: optimize imports of v2Commands
    f4d3a0ac [Bowen Liang] revert mergeProjectionV1Table method name to mergeProjection for less change to existed code
    99ac63c7 [Bowen Liang] support CommentOnNamespace, CommentOnTable in v2Commands
    b17bcc69 [Bowen Liang] update ut for TRUNCATE TABLE to  isSparkV32OrGreater
    35843948 [Bowen Liang] overload applyFilterAndMasking method for V2 Identifier
    3e31a339 [Bowen Liang] support TruncatedTable in v2Commands
    01f6d6b0 [Bowen Liang] revert DropNamespace to private. add ut for create database / drop database
    49453081 [Bowen Liang] revert tablePrivileges to private
    b8380086 [Bowen Liang] refactor OverwriteByExpression, OverwritePartitionsDynamic to v2Commands
    023210c6 [Bowen Liang] refactor AlterTable to v2Commands
    b499d75b [Bowen Liang] revert method name `v1TablePrivileges` back to tablePrivileges
    eb678cc7 [Bowen Liang] move v2TablePrivileges method to v2Commands
    40c5a1cd [Bowen Liang] nit
    a6d2224b [Bowen Liang] refactor AlterColumn, DropColumns, ReplaceColumns, RenameColumn to v2Commands
    aaadee32 [Bowen Liang] nit
    c307ee76 [Bowen Liang] refactor CacheTable, CacheTableAsSelect to v2Commands
    974001db [Bowen Liang] refactor AppendData, UpdateTable, DeleteFromTable to v2Commands
    ed82898c [Bowen Liang] refactor DropTable to v2Commands
    0eabfb09 [Bowen Liang] refactor support for  CreateTableAsSelect, ReplaceTable, ReplaceTableAsSelect command. add support for HasQuery cmdType.
    22535b29 [Bowen Liang] introduce cmdTypes in v2Commands and support V2CreateTablePlan. support  CreateTable, CreateV2Table command.
    39e4aaa1 [Bowen Liang] generalized privileges builder for v2commands with enum and reusable builder for inputObjs and outputObjs
    e32ffdf5 [Bowen Liang] nit
    e09bacc3 [Bowen Liang] change projectV2Table and mergeProjectionWithIdentifier to mergeProjectionV1Table and mergeProjectionV2Table
    f36d5b77 [Bowen Liang] change tablePrivileges and tablePrivilegesWithIdentifier to v1TablePrivileges and v2TablePrivileges
    7820b13f [Bowen Liang] optimize getTableIdentifierFromIdentifier
    906270c3 [Bowen Liang] use the spark version as condition instead
    32d108e2 [Bowen Liang] fix ut for changes in mergeProjectionWithIdentifier
    3ab1ed88 [Bowen Liang] reuse plan for columns in mergeProjectionWithIdentifier
    8ad9f712 [Bowen Liang] remove redundant actionType param for mergeProjectionWithIdentifier method
    094a297a [Bowen Liang] remove test-jar dependency on spark-catalyst_${scala.binary.version}
    53d16391 [Bowen Liang] remove test-jar dependency on spark-catalyst_${scala.binary.version}
    70595e34 [Bowen Liang] reformat
    a3a2429c [Bowen Liang] fix
    aa26b566 [Bowen Liang] move ut on JdbcTableCatalog to V2JdbcTableCatalogRangerSparkExtensionSuite.scala file
    0f0dc41f [liangbowen] change to use class name in string for org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
    82ee98af [liangbowen] nit
    128c092f [liangbowen] nit
    af2462ce [liangbowen] change to use JDBCTableCatalog as V2TableCatalog. ut pass for spark 3.1+
    e612b82b [liangbowen] nit
    98eae971 [liangbowen] fix ut
    f1b37a3f [Bowen Liang] support AddColumns/DropColumns/RenameColumn/AlterColumn and assume spark3.2+ for ut
    7cdbcfe4 [Bowen Liang] Revert "change to import org.apache.spark.sql.catalyst.catalog.InMemoryCatalog for 3.0+"
    4afa3178 [Bowen Liang] change to import org.apache.spark.sql.catalyst.catalog.InMemoryCatalog for 3.0+
    4b68ea00 [Bowen Liang] separate dsv2 asserts into cases.
    06cf1681 [Bowen Liang] update getDatasourceV2Identifier and mergeProjectionWithIdentifier to use Identifier directly
    af81a308 [Bowen Liang] update getDatasourceV2Identifier and mergeProjectionWithIdentifier to use Identifier directly
    179e5aff [liangbowen] ut for cache table
    4b64d7e8 [liangbowen] remove redundant "CreateTableAsSelect" in buildCommand
    d4eda229 [liangbowen] nit
    3e02eba0 [Bowen Liang] add support and ut for delete v2 catalog table
    4718fc29 [Bowen Liang] add support and ut for create/drop/select v2 catalog table
    ccaf744a [liangbowen] separate getDatasourceV2ColumnNames method
    1d0d5131 [liangbowen] initial support for access checks for dsv2 queries
    
    Lead-authored-by: Bowen Liang <li...@gf.com.cn>
    Co-authored-by: liangbowen <li...@gf.com.cn>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../kyuubi/plugin/spark/authz/OperationType.scala  |  15 +-
 .../plugin/spark/authz/PrivilegesBuilder.scala     |  52 ++-
 .../ranger/RuleApplyRowFilterAndDataMasking.scala  |   8 +
 .../plugin/spark/authz/util/AuthZUtils.scala       |  15 +-
 .../kyuubi/plugin/spark/authz/v2Commands.scala     | 359 +++++++++++++++++++++
 .../authz/ranger/RangerSparkExtensionSuite.scala   |   6 +-
 ...JdbcTableCatalogRangerSparkExtensionSuite.scala | 348 ++++++++++++++++++++
 7 files changed, 755 insertions(+), 48 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/OperationType.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/OperationType.scala
index ab0a61451..2da4637bc 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/OperationType.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/OperationType.scala
@@ -37,11 +37,12 @@ object OperationType extends Enumeration {
    */
   def apply(clzName: String): OperationType = {
     clzName match {
+      case v2Cmd if v2Commands.accept(v2Cmd) =>
+        v2Commands.withName(v2Cmd).operationType
+
       case "AddArchivesCommand" => EXPLAIN
       case "AddFilesCommand" => EXPLAIN
       case "AddJarsCommand" => EXPLAIN
-      case "AddPartitions" => ALTERTABLE_ADDPARTS
-      case "AlterColumn" => ALTERTABLE_REPLACECOLS
       case "AlterDatabasePropertiesCommand" |
           "SetNamespaceProperties" => ALTERDATABASE
       case "AlterDatabaseSetLocationCommand" |
@@ -61,11 +62,9 @@ object OperationType extends Enumeration {
           "AlterTableUnsetPropertiesCommand" => ALTERTABLE_PROPERTIES
       case ava if ava.contains("AlterViewAs") => ALTERVIEW_AS
       case ac if ac.startsWith("Analyze") => ANALYZE_TABLE
-      case "AppendData" => ALTERTABLE_ADDPARTS
-      case "CreateDatabaseCommand" | "CreateNamespace" => CREATEDATABASE
+      case "CreateDatabaseCommand" => CREATEDATABASE
       case "CreateFunctionCommand" | "CreateFunction" => CREATEFUNCTION
-      case "CreateTableAsSelect" |
-          "CreateDataSourceTableAsSelectCommand" |
+      case "CreateDataSourceTableAsSelectCommand" |
           "CreateHiveTableAsSelectCommand" |
           "OptimizedCreateHiveTableAsSelectCommand" => CREATETABLE_AS_SELECT
       case "CreateTableCommand" |
@@ -73,9 +72,7 @@ object OperationType extends Enumeration {
           "CreateTableLikeCommand" => CREATETABLE
       case "CreateViewCommand" |
           "CacheTableCommand" |
-          "CreateTempViewUsing" |
-          "CacheTable" |
-          "CacheTableAsSelect" => CREATEVIEW
+          "CreateTempViewUsing" => CREATEVIEW
       case "DescribeDatabaseCommand" | "DescribeNamespace" => DESCDATABASE
       case "DescribeFunctionCommand" => DESCFUNCTION
       case "DescribeColumnCommand" | "DescribeTableCommand" => DESCTABLE
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index 9bc3264a3..2ec8fd7ca 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.analysis.{PersistedView, ViewType}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
-import org.apache.spark.sql.catalyst.plans.logical.{Command, Filter, Join, LogicalPlan, Project, Sort, Window}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.StructField
 
@@ -33,10 +34,11 @@ import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
 import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectType._
 import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
 import org.apache.kyuubi.plugin.spark.authz.util.PermanentViewMarker
+import org.apache.kyuubi.plugin.spark.authz.v2Commands.v2TablePrivileges
 
 object PrivilegesBuilder {
 
-  private def databasePrivileges(db: String): PrivilegeObject = {
+  def databasePrivileges(db: String): PrivilegeObject = {
     PrivilegeObject(DATABASE, PrivilegeObjectActionType.OTHER, db, db)
   }
 
@@ -88,7 +90,7 @@ object PrivilegesBuilder {
    * @param privilegeObjects input or output spark privilege object list
    * @param projectionList Projection list after pruning
    */
-  private def buildQuery(
+  def buildQuery(
       plan: LogicalPlan,
       privilegeObjects: ArrayBuffer[PrivilegeObject],
       projectionList: Seq[NamedExpression] = Nil,
@@ -106,6 +108,16 @@ object PrivilegesBuilder {
       }
     }
 
+    def mergeProjectionV2Table(table: Identifier, plan: LogicalPlan): Unit = {
+      if (projectionList.isEmpty) {
+        privilegeObjects += v2TablePrivileges(table, plan.output.map(_.name))
+      } else {
+        val cols = (projectionList ++ conditionList).flatMap(collectLeaves)
+          .filter(plan.outputSet.contains).map(_.name).distinct
+        privilegeObjects += v2TablePrivileges(table, cols)
+      }
+    }
+
     plan match {
       case p: Project => buildQuery(p.child, privilegeObjects, p.projectList, conditionList)
 
@@ -138,6 +150,12 @@ object PrivilegesBuilder {
           mergeProjection(t, plan)
         }
 
+      case datasourceV2Relation if hasResolvedDatasourceV2Table(datasourceV2Relation) =>
+        val tableIdent = getDatasourceV2Identifier(datasourceV2Relation)
+        if (tableIdent.isDefined) {
+          mergeProjectionV2Table(tableIdent.get, plan)
+        }
+
       case u if u.nodeName == "UnresolvedRelation" =>
         val tableNameM = u.getClass.getMethod("tableName")
         val parts = tableNameM.invoke(u).asInstanceOf[String].split("\\.")
@@ -191,6 +209,9 @@ object PrivilegesBuilder {
     }
 
     plan.nodeName match {
+      case v2Cmd if v2Commands.accept(v2Cmd) =>
+        v2Commands.withName(v2Cmd).buildPrivileges(plan, inputObjs, outputObjs)
+
       case "AlterDatabasePropertiesCommand" |
           "AlterDatabaseSetLocationCommand" |
           "CreateDatabaseCommand" |
@@ -290,22 +311,9 @@ object PrivilegesBuilder {
           inputObjs += databasePrivileges(db.get)
         }
 
-      case "CacheTable" =>
-        val query = getPlanField[LogicalPlan]("table") // table to cache
-        buildQuery(query, inputObjs)
-
       case "CacheTableCommand" =>
         getPlanField[Option[LogicalPlan]]("plan").foreach(buildQuery(_, inputObjs))
 
-      case "CacheTableAsSelect" =>
-        val query = getPlanField[LogicalPlan]("plan")
-        buildQuery(query, inputObjs)
-
-      case "CreateNamespace" =>
-        val resolvedNamespace = getPlanField[Any]("name")
-        val databases = getFieldVal[Seq[String]](resolvedNamespace, "nameParts")
-        outputObjs += databasePrivileges(quote(databases))
-
       case "CreateViewCommand" =>
         if (getPlanField[ViewType]("viewType") == PersistedView) {
           val view = getPlanField[TableIdentifier]("name")
@@ -350,18 +358,6 @@ object PrivilegesBuilder {
         val database = getFieldVal[Seq[String]](child, "nameParts")
         inputObjs += databasePrivileges(quote(database))
 
-      case "CreateTableAsSelect" |
-          "ReplaceTableAsSelect" =>
-        val left = getPlanField[LogicalPlan]("name")
-        left.nodeName match {
-          case "ResolvedDBObjectName" =>
-            val nameParts = getPlanField[Seq[String]]("nameParts")
-            val db = Some(quote(nameParts.init))
-            outputObjs += tablePrivileges(TableIdentifier(nameParts.last, db))
-          case _ =>
-        }
-        buildQuery(getQuery, inputObjs)
-
       case "CreateTableLikeCommand" =>
         val target = setCurrentDBIfNecessary(getPlanField[TableIdentifier]("targetTable"), spark)
         val source = setCurrentDBIfNecessary(getPlanField[TableIdentifier]("sourceTable"), spark)
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
index b18ce83e3..8fe7bcd70 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.Identifier
 
 import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
 import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
@@ -55,6 +56,13 @@ class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[Logical
     }
   }
 
+  private def applyFilterAndMasking(
+      plan: LogicalPlan,
+      identifier: Identifier,
+      spark: SparkSession): LogicalPlan = {
+    applyFilterAndMasking(plan, getTableIdentifierFromV2Identifier(identifier), spark)
+  }
+
   private def applyFilterAndMasking(
       plan: LogicalPlan,
       identifier: TableIdentifier,
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index b7977713a..e157a8314 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -24,6 +24,7 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
+import org.apache.spark.sql.connector.catalog.Identifier
 
 private[authz] object AuthZUtils {
 
@@ -98,14 +99,12 @@ private[authz] object AuthZUtils {
     plan.nodeName == "DataSourceV2Relation" && plan.resolved
   }
 
-  def getDatasourceV2Identifier(plan: LogicalPlan): Option[TableIdentifier] = {
-    // avoid importing DataSourceV2Relation for Spark version compatibility
-    val identifier = getFieldVal[Option[AnyRef]](plan, "identifier")
-    identifier.map { id =>
-      val namespaces = invoke(id, "namespace").asInstanceOf[Array[String]]
-      val table = invoke(id, "name").asInstanceOf[String]
-      TableIdentifier(table, Some(quote(namespaces)))
-    }
+  def getDatasourceV2Identifier(plan: LogicalPlan): Option[Identifier] = {
+    getFieldVal[Option[Identifier]](plan, "identifier")
+  }
+
+  def getTableIdentifierFromV2Identifier(id: Identifier): TableIdentifier = {
+    TableIdentifier(id.name(), Some(quote(id.namespace())))
   }
 
   def hasResolvedPermanentView(plan: LogicalPlan): Boolean = {
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
new file mode 100644
index 000000000..90035d1f5
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+import org.apache.kyuubi.plugin.spark.authz.OperationType._
+import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType.PrivilegeObjectActionType
+import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectType.TABLE_OR_VIEW
+import org.apache.kyuubi.plugin.spark.authz.PrivilegesBuilder._
+import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
+import org.apache.kyuubi.plugin.spark.authz.v2Commands.CommandType.{CommandType, HasChildAsIdentifier, HasQueryAsLogicalPlan, HasTableAsIdentifier, HasTableAsIdentifierOption, HasTableNameAsIdentifier}
+
+/**
+ * Building privilege objects
+ * for Spark Datasource V2 commands
+ */
+object v2Commands extends Enumeration {
+
+  /**
+   * Command type enum
+   * with naming rule as `HasFieldAsReturnType`
+   * for hinting privileges building of inputObjs or outputObjs
+   */
+  object CommandType extends Enumeration {
+    type CommandType = Value
+    val HasChildAsIdentifier, HasQueryAsLogicalPlan, HasTableAsIdentifier,
+        HasTableAsIdentifierOption, HasTableNameAsIdentifier = Value
+  }
+
+  import scala.language.implicitConversions
+
+  implicit def valueToCmdPrivilegeBuilder(x: Value): CmdPrivilegeBuilder =
+    x.asInstanceOf[CmdPrivilegeBuilder]
+
+  /**
+   * check whether commandName is implemented with supported privilege builders
+   * and pass the requirement checks (e.g. Spark version)
+   *
+   * @param commandName name of command
+   * @return true if so, false else
+   */
+  def accept(commandName: String): Boolean = {
+    try {
+      val command = v2Commands.withName(commandName)
+
+      // check spark version requirements
+      def passSparkVersionCheck: Boolean =
+        (command.mostVer.isEmpty || isSparkVersionAtMost(command.mostVer.get)) &&
+          (command.leastVer.isEmpty || isSparkVersionAtLeast(command.leastVer.get))
+
+      passSparkVersionCheck
+    } catch {
+      case _: NoSuchElementException => false
+    }
+  }
+
+  val defaultBuildInput: (LogicalPlan, ArrayBuffer[PrivilegeObject], Seq[CommandType]) => Unit =
+    (plan, inputObjs, commandTypes) => {
+      commandTypes.foreach {
+        case HasQueryAsLogicalPlan =>
+          val query = getFieldVal[LogicalPlan](plan, "query")
+          buildQuery(query, inputObjs)
+        case _ =>
+      }
+    }
+
+  val defaultBuildOutput: (
+      LogicalPlan,
+      ArrayBuffer[PrivilegeObject],
+      Seq[CommandType],
+      PrivilegeObjectActionType) => Unit =
+    (plan, outputObjs, commandTypes, outputObjsActionType) => {
+      commandTypes.foreach {
+        case HasTableNameAsIdentifier =>
+          val table = invoke(plan, "tableName").asInstanceOf[Identifier]
+          outputObjs += v2TablePrivileges(table)
+
+        case HasTableAsIdentifierOption =>
+          val table = getFieldVal[AnyRef](plan, "table")
+          val tableIdent = getFieldVal[Option[Identifier]](table, "identifier")
+          if (tableIdent.isDefined) {
+            outputObjs += v2TablePrivileges(tableIdent.get, actionType = outputObjsActionType)
+          }
+
+        case HasTableAsIdentifier =>
+          val table = getFieldVal[LogicalPlan](plan, "table")
+          val tableIdent = getFieldVal[Identifier](table, "identifier")
+          outputObjs += v2TablePrivileges(tableIdent)
+
+        case HasChildAsIdentifier =>
+          val table = getFieldVal[AnyRef](plan, "child")
+          val tableIdent = getFieldVal[Identifier](table, "identifier")
+          outputObjs += v2TablePrivileges(tableIdent)
+
+        case _ =>
+      }
+    }
+
+  /**
+   * Command privilege builder
+   *
+   * @param operationType    OperationType for converting accessType
+   * @param leastVer         minimum Spark version required
+   * @param mostVer          maximum Spark version supported
+   * @param commandTypes     Seq of [[CommandType]] hinting privilege building
+   * @param buildInput       input [[PrivilegeObject]] for privilege check
+   * @param buildOutput      output [[PrivilegeObject]] for privilege check
+   * @param outputActionType [[PrivilegeObjectActionType]] for output [[PrivilegeObject]]
+   */
+  case class CmdPrivilegeBuilder(
+      operationType: OperationType = QUERY,
+      leastVer: Option[String] = None,
+      mostVer: Option[String] = None,
+      commandTypes: Seq[CommandType] = Seq.empty,
+      buildInput: (LogicalPlan, ArrayBuffer[PrivilegeObject], Seq[CommandType]) => Unit =
+        defaultBuildInput,
+      buildOutput: (
+          LogicalPlan,
+          ArrayBuffer[PrivilegeObject],
+          Seq[CommandType],
+          PrivilegeObjectActionType) => Unit = defaultBuildOutput,
+      outputActionType: PrivilegeObjectActionType = PrivilegeObjectActionType.OTHER)
+    extends super.Val {
+
+    def buildPrivileges(
+        plan: LogicalPlan,
+        inputObjs: ArrayBuffer[PrivilegeObject],
+        outputObjs: ArrayBuffer[PrivilegeObject]): Unit = {
+      this.buildInput(plan, inputObjs, commandTypes)
+      this.buildOutput(plan, outputObjs, commandTypes, outputActionType)
+    }
+  }
+
+  def v2TablePrivileges(
+      table: Identifier,
+      columns: Seq[String] = Nil,
+      actionType: PrivilegeObjectActionType = PrivilegeObjectActionType.OTHER): PrivilegeObject = {
+    PrivilegeObject(TABLE_OR_VIEW, actionType, quote(table.namespace()), table.name(), columns)
+  }
+
+  // namespace commands
+
+  val CreateNamespace: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = CREATEDATABASE,
+    buildOutput = (plan, outputObjs, _, _) => {
+      if (isSparkVersionAtLeast("3.3")) {
+        val resolvedNamespace = getFieldVal[Any](plan, "name")
+        val databases = getFieldVal[Seq[String]](resolvedNamespace, "nameParts")
+        outputObjs += databasePrivileges(quote(databases))
+      } else {
+        val namespace = getFieldVal[Seq[String]](plan, "namespace")
+        outputObjs += databasePrivileges(quote(namespace))
+      }
+    })
+
+  val DropNamespace: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = DROPDATABASE,
+    buildOutput = (plan, outputObjs, _, _) => {
+      val resolvedNamespace = getFieldVal[LogicalPlan](plan, "namespace")
+      val databases = getFieldVal[Seq[String]](resolvedNamespace, "namespace")
+      outputObjs += databasePrivileges(quote(databases))
+    })
+
+  // with V2CreateTablePlan
+
+  val CreateTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = CREATETABLE,
+    commandTypes = Seq(HasTableNameAsIdentifier),
+    leastVer = Some("3.3"))
+
+  val CreateV2Table: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = CREATETABLE,
+    commandTypes = Seq(HasTableNameAsIdentifier),
+    mostVer = Some("3.2"))
+
+  val CreateTableAsSelect: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = CREATETABLE,
+    commandTypes = Seq(HasTableNameAsIdentifier, HasQueryAsLogicalPlan))
+
+  val ReplaceTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = CREATETABLE,
+    commandTypes = Seq(HasTableNameAsIdentifier))
+
+  val ReplaceTableAsSelect: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = CREATETABLE,
+    commandTypes = Seq(HasTableNameAsIdentifier, HasQueryAsLogicalPlan))
+
+  // with V2WriteCommand
+
+  val AppendData: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    commandTypes = Seq(HasTableAsIdentifierOption, HasQueryAsLogicalPlan),
+    outputActionType = PrivilegeObjectActionType.INSERT)
+
+  val UpdateTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    commandTypes = Seq(HasTableAsIdentifierOption),
+    outputActionType = PrivilegeObjectActionType.UPDATE)
+
+  val DeleteFromTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    commandTypes = Seq(HasTableAsIdentifierOption),
+    outputActionType = PrivilegeObjectActionType.UPDATE)
+
+  val OverwriteByExpression: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    commandTypes = Seq(HasTableAsIdentifierOption, HasQueryAsLogicalPlan),
+    outputActionType = PrivilegeObjectActionType.UPDATE)
+
+  val OverwritePartitionsDynamic: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    commandTypes = Seq(HasTableAsIdentifierOption, HasQueryAsLogicalPlan),
+    outputActionType = PrivilegeObjectActionType.UPDATE)
+
+  // with V2PartitionCommand
+
+  val AddPartitions: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = OperationType.ALTERTABLE_ADDPARTS,
+    leastVer = Some("3.2"),
+    commandTypes = Seq(HasTableAsIdentifier))
+
+  val DropPartitions: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = OperationType.ALTERTABLE_DROPPARTS,
+    leastVer = Some("3.2"),
+    commandTypes = Seq(HasTableAsIdentifier))
+
+  val RenamePartitions: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = OperationType.ALTERTABLE_ADDPARTS,
+    leastVer = Some("3.2"),
+    commandTypes = Seq(HasTableAsIdentifier))
+
+  val TruncatePartition: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = OperationType.ALTERTABLE_DROPPARTS,
+    leastVer = Some("3.2"),
+    commandTypes = Seq(HasTableAsIdentifier))
+
+  // other table commands
+
+  val CacheTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = CREATEVIEW,
+    leastVer = Some("3.2"),
+    buildInput = (plan, inputObjs, _) => {
+      val query = getFieldVal[LogicalPlan](plan, "table") // table to cache
+      buildQuery(query, inputObjs)
+    })
+
+  val CacheTableAsSelect: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = CREATEVIEW,
+    leastVer = Some("3.2"),
+    buildInput = (plan, inputObjs, _) => {
+      val query = getFieldVal[LogicalPlan](plan, "plan")
+      buildQuery(query, inputObjs)
+    })
+
+  val CommentOnNamespace: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = ALTERDATABASE,
+    buildOutput = (plan, outputObjs, _, _) => {
+      val resolvedNamespace = getFieldVal[AnyRef](plan, "child")
+      val namespace = getFieldVal[Seq[String]](resolvedNamespace, "namespace")
+      outputObjs += databasePrivileges(quote(namespace))
+    })
+
+  val CommentOnTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = ALTERTABLE_PROPERTIES,
+    commandTypes = Seq(
+      if (isSparkVersionAtLeast("3.2")) HasTableAsIdentifier else HasChildAsIdentifier))
+
+  val DropTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = DROPTABLE,
+    buildOutput = (plan, outputObjs, _, _) => {
+      val tableIdent =
+        if (isSparkVersionAtLeast("3.1")) {
+          val resolvedTable = getFieldVal[LogicalPlan](plan, "child")
+          getFieldVal[Identifier](resolvedTable, "identifier")
+        } else {
+          getFieldVal[Identifier](plan, "ident")
+        }
+      outputObjs += v2TablePrivileges(tableIdent)
+    })
+  val MergeIntoTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    buildInput = (plan, inputObjs, _) => {
+      val table = getFieldVal[DataSourceV2Relation](plan, "sourceTable")
+      buildQuery(table, inputObjs)
+    },
+    buildOutput = (plan, outputObjs, _, _) => {
+      val table = getFieldVal[DataSourceV2Relation](plan, "targetTable")
+      if (table.identifier.isDefined) {
+        outputObjs += v2TablePrivileges(
+          table.identifier.get,
+          actionType = PrivilegeObjectActionType.UPDATE)
+      }
+    })
+
+  val RepairTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = ALTERTABLE_ADDPARTS,
+    leastVer = Some("3.2"),
+    commandTypes = Seq(HasChildAsIdentifier))
+
+  val TruncateTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    leastVer = Some("3.2"),
+    buildOutput = (plan, outputObjs, _, _) => {
+      val table = getFieldVal[Any](plan, "table")
+      val tableIdent = getFieldVal[Identifier](table, "identifier")
+      outputObjs += v2TablePrivileges(tableIdent, actionType = PrivilegeObjectActionType.UPDATE)
+    })
+
+  // with V2AlterTableCommand
+
+  val AlterTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = ALTERTABLE_ADDCOLS,
+    mostVer = Some("3.1"),
+    buildOutput = (plan, outputObjs, _, _) => {
+      val table = getFieldVal[Any](plan, "table")
+      val tableIdent = getFieldVal[Option[Identifier]](table, "identifier")
+      if (tableIdent.isDefined) {
+        outputObjs += v2TablePrivileges(tableIdent.get)
+      }
+    })
+
+  val AddColumns: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = ALTERTABLE_ADDCOLS,
+    leastVer = Some("3.2"),
+    commandTypes = Seq(HasTableAsIdentifier))
+
+  val AlterColumn: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = ALTERTABLE_ADDCOLS,
+    leastVer = Some("3.2"),
+    commandTypes = Seq(HasTableAsIdentifier))
+
+  val DropColumns: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = ALTERTABLE_ADDCOLS,
+    leastVer = Some("3.2"),
+    commandTypes = Seq(HasTableAsIdentifier))
+
+  val ReplaceColumns: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = ALTERTABLE_REPLACECOLS,
+    leastVer = Some("3.2"),
+    commandTypes = Seq(HasTableAsIdentifier))
+
+  val RenameColumn: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
+    operationType = ALTERTABLE_RENAMECOL,
+    leastVer = Some("3.2"),
+    commandTypes = Seq(HasTableAsIdentifier))
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 1095a7b7c..d8a7eedd4 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -34,8 +34,7 @@ import org.scalatest.BeforeAndAfterAll
 // scalastyle:off
 import org.scalatest.funsuite.AnyFunSuite
 
-import org.apache.kyuubi.plugin.spark.authz.AccessControlException
-import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider
+import org.apache.kyuubi.plugin.spark.authz.{AccessControlException, SparkSessionProvider}
 import org.apache.kyuubi.plugin.spark.authz.ranger.RuleAuthorization.KYUUBI_AUTHZ_TAG
 import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.getFieldVal
 
@@ -777,7 +776,8 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
 
         val e1 = intercept[AccessControlException](
           doAs("someone", sql(s"CACHE TABLE $cacheTable2 select * from $db1.$srcTable1")))
-        assert(e1.getMessage.contains(s"does not have [select] privilege on [$db1/$srcTable1/id]"))
+        assert(
+          e1.getMessage.contains(s"does not have [select] privilege on [$db1/$srcTable1/id]"))
 
         doAs("admin", sql(s"CACHE TABLE $cacheTable3 SELECT 1 AS a, 2 AS b "))
         doAs("someone", sql(s"CACHE TABLE $cacheTable4 select 1 as a, 2 as b "))
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
new file mode 100644
index 000000000..41cd6d8a6
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.plugin.spark.authz.ranger
+
+import java.sql.DriverManager
+
+import scala.util.Try
+
+// scalastyle:off
+import org.apache.kyuubi.plugin.spark.authz.AccessControlException
+
+/**
+ * Tests for RangerSparkExtensionSuite
+ * on JdbcTableCatalog with DataSource V2 API.
+ */
+class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
+  override protected val catalogImpl: String = "in-memory"
+
+  val catalogV2 = "testcat"
+  val jdbcCatalogV2 = "jdbc2"
+  val namespace1 = "ns1"
+  val namespace2 = "ns2"
+  val table1 = "table1"
+  val table2 = "table2"
+  val outputTable1 = "outputTable1"
+  val cacheTable1 = "cacheTable1"
+
+  val dbUrl = s"jdbc:derby:memory:$catalogV2"
+  val jdbcUrl: String = s"$dbUrl;create=true"
+
+  override def beforeAll(): Unit = {
+    if (isSparkV31OrGreater) {
+      spark.conf.set(
+        s"spark.sql.catalog.$catalogV2",
+        "org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
+      spark.conf.set(s"spark.sql.catalog.$catalogV2.url", jdbcUrl)
+      spark.conf.set(
+        s"spark.sql.catalog.$catalogV2.driver",
+        "org.apache.derby.jdbc.AutoloadedDriver")
+
+      super.beforeAll()
+
+      doAs("admin", sql(s"CREATE DATABASE IF NOT EXISTS $catalogV2.$namespace1"))
+      doAs(
+        "admin",
+        sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table1" +
+          " (id int, name string, city string)"))
+      doAs(
+        "admin",
+        sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$outputTable1" +
+          " (id int, name string, city string)"))
+    }
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    spark.sessionState.catalog.reset()
+    spark.sessionState.conf.clear()
+
+    // cleanup db
+    Try {
+      DriverManager.getConnection(s"$dbUrl;shutdown=true")
+    }
+  }
+
+  test("[KYUUBI #3424] CREATE DATABASE") {
+    assume(isSparkV31OrGreater)
+
+    // create database
+    val e1 = intercept[AccessControlException](
+      doAs("someone", sql(s"CREATE DATABASE IF NOT EXISTS $catalogV2.$namespace2").explain()))
+    assert(e1.getMessage.contains(s"does not have [create] privilege" +
+      s" on [$namespace2]"))
+  }
+
+  test("[KYUUBI #3424] DROP DATABASE") {
+    assume(isSparkV31OrGreater)
+
+    // create database
+    val e1 = intercept[AccessControlException](
+      doAs("someone", sql(s"DROP DATABASE IF EXISTS $catalogV2.$namespace2").explain()))
+    assert(e1.getMessage.contains(s"does not have [drop] privilege" +
+      s" on [$namespace2]"))
+  }
+
+  test("[KYUUBI #3424] SELECT TABLE") {
+    assume(isSparkV31OrGreater)
+
+    // select
+    val e1 = intercept[AccessControlException](
+      doAs("someone", sql(s"select city, id from $catalogV2.$namespace1.$table1").explain()))
+    assert(e1.getMessage.contains(s"does not have [select] privilege" +
+      s" on [$namespace1/$table1/id]"))
+  }
+
+  test("[KYUUBI #3424] CREATE TABLE") {
+    assume(isSparkV31OrGreater)
+
+    // CreateTable
+    val e2 = intercept[AccessControlException](
+      doAs("someone", sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2")))
+    assert(e2.getMessage.contains(s"does not have [create] privilege" +
+      s" on [$namespace1/$table2]"))
+
+    // CreateTableAsSelect
+    val e21 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2" +
+          s" AS select * from $catalogV2.$namespace1.$table1")))
+    assert(e21.getMessage.contains(s"does not have [select] privilege" +
+      s" on [$namespace1/$table1/id]"))
+  }
+
+  test("[KYUUBI #3424] DROP TABLE") {
+    assume(isSparkV31OrGreater)
+
+    // DropTable
+    val e3 = intercept[AccessControlException](
+      doAs("someone", sql(s"DROP TABLE $catalogV2.$namespace1.$table1")))
+    assert(e3.getMessage.contains(s"does not have [drop] privilege" +
+      s" on [$namespace1/$table1]"))
+  }
+
+  test("[KYUUBI #3424] INSERT TABLE") {
+    assume(isSparkV31OrGreater)
+
+    // AppendData: Insert Using a VALUES Clause
+    val e4 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" +
+          s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
+    assert(e4.getMessage.contains(s"does not have [update] privilege" +
+      s" on [$namespace1/$outputTable1]"))
+
+    // AppendData: Insert Using a TABLE Statement
+    val e42 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" +
+          s" TABLE $catalogV2.$namespace1.$table1")))
+    assert(e42.getMessage.contains(s"does not have [select] privilege" +
+      s" on [$namespace1/$table1/id]"))
+
+    // AppendData: Insert Using a SELECT Statement
+    val e43 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" +
+          s" SELECT * from $catalogV2.$namespace1.$table1")))
+    assert(e43.getMessage.contains(s"does not have [select] privilege" +
+      s" on [$namespace1/$table1/id]"))
+
+    // OverwriteByExpression: Insert Overwrite
+    val e44 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"INSERT OVERWRITE $catalogV2.$namespace1.$outputTable1 (id, name, city)" +
+          s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
+    assert(e44.getMessage.contains(s"does not have [update] privilege" +
+      s" on [$namespace1/$outputTable1]"))
+  }
+
+  test("[KYUUBI #3424] MERGE INTO") {
+    assume(isSparkV31OrGreater)
+
+    val mergeIntoSql =
+      s"""
+         |MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target
+         |USING $catalogV2.$namespace1.$table1  AS source
+         |ON target.id = source.id
+         |WHEN MATCHED AND (target.name='delete') THEN DELETE
+         |WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city
+      """.stripMargin
+
+    // MergeIntoTable:  Using a MERGE INTO Statement
+    val e1 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(mergeIntoSql)))
+    assert(e1.getMessage.contains(s"does not have [select] privilege" +
+      s" on [$namespace1/$table1/id]"))
+
+    try {
+      SparkRangerAdminPlugin.getRangerConf.setBoolean(
+        s"ranger.plugin.${SparkRangerAdminPlugin.getServiceType}.authorize.in.single.call",
+        true)
+      val e2 = intercept[AccessControlException](
+        doAs(
+          "someone",
+          sql(mergeIntoSql)))
+      assert(e2.getMessage.contains(s"does not have" +
+        s" [select] privilege" +
+        s" on [$namespace1/$table1/id,$namespace1/table1/name,$namespace1/$table1/city]," +
+        s" [update] privilege on [$namespace1/$outputTable1]"))
+    } finally {
+      SparkRangerAdminPlugin.getRangerConf.setBoolean(
+        s"ranger.plugin.${SparkRangerAdminPlugin.getServiceType}.authorize.in.single.call",
+        false)
+    }
+  }
+
+  test("[KYUUBI #3424] UPDATE TABLE") {
+    assume(isSparkV31OrGreater)
+
+    // UpdateTable
+    val e5 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Hangzhou' " +
+          " WHERE id=1")))
+    assert(e5.getMessage.contains(s"does not have [update] privilege" +
+      s" on [$namespace1/$table1]"))
+  }
+
+  test("[KYUUBI #3424] DELETE FROM TABLE") {
+    assume(isSparkV31OrGreater)
+
+    // DeleteFromTable
+    val e6 = intercept[AccessControlException](
+      doAs("someone", sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=1")))
+    assert(e6.getMessage.contains(s"does not have [update] privilege" +
+      s" on [$namespace1/$table1]"))
+  }
+
+  test("[KYUUBI #3424] CACHE TABLE") {
+    assume(isSparkV31OrGreater)
+
+    // CacheTable
+    val e7 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"CACHE TABLE $cacheTable1" +
+          s" AS select * from $catalogV2.$namespace1.$table1")))
+    if (isSparkV32OrGreater) {
+      assert(e7.getMessage.contains(s"does not have [select] privilege" +
+        s" on [$namespace1/$table1/id]"))
+    } else {
+      assert(e7.getMessage.contains(s"does not have [select] privilege" +
+        s" on [$catalogV2.$namespace1/$table1]"))
+    }
+  }
+
+  test("[KYUUBI #3424] TRUNCATE TABLE") {
+    assume(isSparkV32OrGreater)
+
+    // CreateView with select
+    val e1 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"TRUNCATE TABLE $catalogV2.$namespace1.$table1")))
+    assert(e1.getMessage.contains(s"does not have [update] privilege" +
+      s" on [$namespace1/$table1]"))
+  }
+
+  test("[KYUUBI #3424] MSCK REPAIR TABLE") {
+    assume(isSparkV32OrGreater)
+
+    // CreateView with select
+    val e1 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"MSCK REPAIR TABLE $catalogV2.$namespace1.$table1")))
+    assert(e1.getMessage.contains(s"does not have [alter] privilege" +
+      s" on [$namespace1/$table1]"))
+  }
+
+  test("[KYUUBI #3424] ALTER TABLE") {
+    assume(isSparkV31OrGreater)
+
+    // AddColumns
+    val e61 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"ALTER TABLE $catalogV2.$namespace1.$table1 ADD COLUMNS (age int) ").explain()))
+    assert(e61.getMessage.contains(s"does not have [alter] privilege" +
+      s" on [$namespace1/$table1]"))
+
+    // DropColumns
+    val e62 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"ALTER TABLE $catalogV2.$namespace1.$table1 DROP COLUMNS city ").explain()))
+    assert(e62.getMessage.contains(s"does not have [alter] privilege" +
+      s" on [$namespace1/$table1]"))
+
+    // RenameColumn
+    val e63 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"ALTER TABLE $catalogV2.$namespace1.$table1 RENAME COLUMN city TO city2 ").explain()))
+    assert(e63.getMessage.contains(s"does not have [alter] privilege" +
+      s" on [$namespace1/$table1]"))
+
+    // AlterColumn
+    val e64 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"ALTER TABLE $catalogV2.$namespace1.$table1 " +
+          s"ALTER COLUMN city COMMENT 'city' ")))
+    assert(e64.getMessage.contains(s"does not have [alter] privilege" +
+      s" on [$namespace1/$table1]"))
+  }
+
+  test("[KYUUBI #3424] COMMENT ON") {
+    assume(isSparkV31OrGreater)
+
+    // CommentOnNamespace
+    val e1 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"COMMENT ON DATABASE $catalogV2.$namespace1 IS 'xYz' ").explain()))
+    assert(e1.getMessage.contains(s"does not have [alter] privilege" +
+      s" on [$namespace1]"))
+
+    // CommentOnNamespace
+    val e2 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"COMMENT ON NAMESPACE $catalogV2.$namespace1 IS 'xYz' ").explain()))
+    assert(e2.getMessage.contains(s"does not have [alter] privilege" +
+      s" on [$namespace1]"))
+
+    // CommentOnTable
+    val e3 = intercept[AccessControlException](
+      doAs(
+        "someone",
+        sql(s"COMMENT ON TABLE $catalogV2.$namespace1.$table1 IS 'xYz' ").explain()))
+    assert(e3.getMessage.contains(s"does not have [alter] privilege" +
+      s" on [$namespace1/$table1]"))
+
+  }
+}