You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/08/09 11:20:21 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3186] Support applying Row-level Filter and Data Masking policies for DatasourceV2 in Authz module

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 64b1d920b [KYUUBI #3186] Support applying Row-level Filter and Data Masking policies for DatasourceV2 in Authz module
64b1d920b is described below

commit 64b1d920b6c4e04d7ee06d0d6586f4667f9b7773
Author: liangbowen <li...@gf.com.cn>
AuthorDate: Tue Aug 9 19:20:11 2022 +0800

    [KYUUBI #3186] Support applying Row-level Filter and Data Masking policies for DatasourceV2 in Authz module
    
    ### _Why are the changes needed?_
    
    `RuleApplyRowFilterAndDataMasking` in AuthZ currently applys row-level filters correctly on both `HiveTableRelation` fro Hive tables and `LogicalRelation`, but is not affcecting the execution plan for Iceberg tables.
    
    As Iceberg table is accssesed via `DataSourceV2Relation`, `RuleApplyRowFilterAndDataMasking` is adopted to apply row filtering on `TableIdentifier` for different relation (including HiveTableRelation and LogicalRelation) and fetch correct table identifier from `DataSourceV2Relation`.
    
    Considering an iceberg table `gftest.sampleice`, with an row-level filter as `id='1'`.
    
    SQL:
    `EXPLAIN EXTENDED select *from gftest.sampleice limit 5;`
    
    Execution Plan before this PR:
    ```
    == Parsed Logical Plan ==
    'GlobalLimit 5
    +- 'LocalLimit 5
       +- 'Project [*]
          +- 'UnresolvedRelation [gftest, sampleice], [], false
    
    == Analyzed Logical Plan ==
    id: bigint, data: string
    GlobalLimit 5
    +- LocalLimit 5
       +- Project [id#13332L, data#13333]
          +- SubqueryAlias spark_catalog.gftest.sampleice
             +- RelationV2[id#13332L, data#13333] spark_catalog.gftest.sampleice
    
    == Optimized Logical Plan ==
    GlobalLimit 5
    +- LocalLimit 5
       +- RelationV2[id#13332L, data#13333] spark_catalog.gftest.sampleice
    
    == Physical Plan ==
    CollectLimit 5
    +- *(1) ColumnarToRow
       +- BatchScan[id#13332L, data#13333] spark_catalog.gftest.sampleice [filters=] RuntimeFilters: []
    
    ```
    
    Execution Plan before this PR:
    ```
    == Parsed Logical Plan ==
    'GlobalLimit 5
    +- 'LocalLimit 5
       +- 'Project [*]
          +- 'UnresolvedRelation [gftest, sampleice], [], false
    
    == Analyzed Logical Plan ==
    id: bigint, data: string
    GlobalLimit 5
    +- LocalLimit 5
       +- Project [id#142L, data#143]
          +- SubqueryAlias spark_catalog.gftest.sampleice
             +- Project [id#142L, data#143]
                +- Filter (id#142L = cast(1 as bigint))
                   +- RowFilterAndDataMaskingMarker
                      +- RelationV2[id#142L, data#143] spark_catalog.gftest.sampleice
    
    == Optimized Logical Plan ==
    GlobalLimit 5
    +- LocalLimit 5
       +- Filter (isnotnull(id#142L) AND (id#142L = 1))
          +- RelationV2[id#142L, data#143] spark_catalog.gftest.sampleice
    
    == Physical Plan ==
    CollectLimit 5
    +- *(1) Filter (isnotnull(id#142L) AND (id#142L = 1))
       +- *(1) ColumnarToRow
          +- BatchScan[id#142L, data#143] spark_catalog.gftest.sampleice [filters=id IS NOT NULL, id = 1] RuntimeFilters: []
    ```
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3197 from bowenliang123/feature_pr_rowfilter_dsv2.
    
    Closes #3186
    
    f8b10396 [Bowen Liang] Merge branch 'master' into feature_pr_rowfilter_dsv2
    1d476307 [liangbowen] 1.refactoring getDatasourceV2Identifier compatible for multiple spark versions 2.moving quote from PrivilegeBuilder to AuthZUtils
    0853f864 [liangbowen] apply mvn spotless:apply
    7d26e99f [liangbowen] remove unnecessary return
    84b4beaa [liangbowen] change to use Identifier of DataSourceV2Relation instead of Table
    f5def106 [liangbowen] remove unnecessary table null condition
    a1516d55 [liangbowen] fix org.apache.iceberg.spark.source.SparkTable cannot be cast to scala.Option
    bd173221 [liangbowen] add comments
    804be4e1 [liangbowen] fix method signature
    75980965 [liangbowen] adapting rowfilter for DataSourceV2Relation,by adjusting applyFilterAndMasking via TableIdentifier
    
    Lead-authored-by: liangbowen <li...@gf.com.cn>
    Co-authored-by: Bowen Liang <bo...@gmail.com>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 .../plugin/spark/authz/PrivilegesBuilder.scala     | 12 ----------
 .../ranger/RuleApplyRowFilterAndDataMasking.scala  | 18 ++++++++++-----
 .../plugin/spark/authz/util/AuthZUtils.scala       | 27 ++++++++++++++++++++++
 3 files changed, 39 insertions(+), 18 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index e6090d327..459698e59 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -34,18 +34,6 @@ import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
 
 object PrivilegesBuilder {
 
-  private def quoteIfNeeded(part: String): String = {
-    if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
-      part
-    } else {
-      s"`${part.replace("`", "``")}`"
-    }
-  }
-
-  private def quote(parts: Seq[String]): String = {
-    parts.map(quoteIfNeeded).mkString(".")
-  }
-
   private def databasePrivileges(db: String): PrivilegeObject = {
     PrivilegeObject(DATABASE, PrivilegeObjectActionType.OTHER, db, db)
   }
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
index 78081b555..b18ce83e3 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
@@ -18,7 +18,7 @@
 package org.apache.kyuubi.plugin.spark.authz.ranger
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -30,19 +30,26 @@ import org.apache.kyuubi.plugin.spark.authz.util.RowFilterAndDataMaskingMarker
 class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    // Apply FilterAndMasking and wrap HiveTableRelation/LogicalRelation with
+    // Apply FilterAndMasking and wrap HiveTableRelation/LogicalRelation/DataSourceV2Relation with
     // RowFilterAndDataMaskingMarker if it is not wrapped yet.
     plan mapChildren {
       case p: RowFilterAndDataMaskingMarker => p
       case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) =>
         val table = getHiveTable(hiveTableRelation)
-        applyFilterAndMasking(hiveTableRelation, table, spark)
+        applyFilterAndMasking(hiveTableRelation, table.identifier, spark)
       case logicalRelation if hasResolvedDatasourceTable(logicalRelation) =>
         val table = getDatasourceTable(logicalRelation)
         if (table.isEmpty) {
           logicalRelation
         } else {
-          applyFilterAndMasking(logicalRelation, table.get, spark)
+          applyFilterAndMasking(logicalRelation, table.get.identifier, spark)
+        }
+      case datasourceV2Relation if hasResolvedDatasourceV2Table(datasourceV2Relation) =>
+        val tableIdentifier = getDatasourceV2Identifier(datasourceV2Relation)
+        if (tableIdentifier.isEmpty) {
+          datasourceV2Relation
+        } else {
+          applyFilterAndMasking(datasourceV2Relation, tableIdentifier.get, spark)
         }
       case other => apply(other)
     }
@@ -50,9 +57,8 @@ class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[Logical
 
   private def applyFilterAndMasking(
       plan: LogicalPlan,
-      table: CatalogTable,
+      identifier: TableIdentifier,
       spark: SparkSession): LogicalPlan = {
-    val identifier = table.identifier
     val ugi = getAuthzUgi(spark.sparkContext)
     val opType = OperationType(plan.nodeName)
     val parse = spark.sessionState.sqlParser.parseExpression _
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index 038155a03..377e99ce9 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -21,6 +21,7 @@ import scala.util.{Failure, Success, Try}
 
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.spark.{SPARK_VERSION, SparkContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
@@ -83,6 +84,20 @@ private[authz] object AuthZUtils {
     getFieldVal[Option[CatalogTable]](plan, "catalogTable")
   }
 
+  def hasResolvedDatasourceV2Table(plan: LogicalPlan): Boolean = {
+    plan.nodeName == "DataSourceV2Relation" && plan.resolved
+  }
+
+  def getDatasourceV2Identifier(plan: LogicalPlan): Option[TableIdentifier] = {
+    // avoid importing DataSourceV2Relation for Spark version compatibility
+    val identifier = getFieldVal[Option[AnyRef]](plan, "identifier")
+    identifier.map { id =>
+      val namespaces = invoke(id, "namespace").asInstanceOf[Array[String]]
+      val table = invoke(id, "name").asInstanceOf[String]
+      TableIdentifier(table, Some(quote(namespaces)))
+    }
+  }
+
   def isSparkVersionAtMost(targetVersionString: String): Boolean = {
     SemanticVersion(SPARK_VERSION).isVersionAtMost(targetVersionString)
   }
@@ -94,4 +109,16 @@ private[authz] object AuthZUtils {
   def isSparkVersionEqualTo(targetVersionString: String): Boolean = {
     SemanticVersion(SPARK_VERSION).isVersionEqualTo(targetVersionString)
   }
+
+  def quoteIfNeeded(part: String): String = {
+    if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
+      part
+    } else {
+      s"`${part.replace("`", "``")}`"
+    }
+  }
+
+  def quote(parts: Seq[String]): String = {
+    parts.map(quoteIfNeeded).mkString(".")
+  }
 }