You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/08/09 11:20:21 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3186] Support applying Row-level Filter and Data Masking policies for DatasourceV2 in Authz module
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 64b1d920b [KYUUBI #3186] Support applying Row-level Filter and Data Masking policies for DatasourceV2 in Authz module
64b1d920b is described below
commit 64b1d920b6c4e04d7ee06d0d6586f4667f9b7773
Author: liangbowen <li...@gf.com.cn>
AuthorDate: Tue Aug 9 19:20:11 2022 +0800
[KYUUBI #3186] Support applying Row-level Filter and Data Masking policies for DatasourceV2 in Authz module
### _Why are the changes needed?_
`RuleApplyRowFilterAndDataMasking` in AuthZ currently applys row-level filters correctly on both `HiveTableRelation` fro Hive tables and `LogicalRelation`, but is not affcecting the execution plan for Iceberg tables.
As Iceberg table is accssesed via `DataSourceV2Relation`, `RuleApplyRowFilterAndDataMasking` is adopted to apply row filtering on `TableIdentifier` for different relation (including HiveTableRelation and LogicalRelation) and fetch correct table identifier from `DataSourceV2Relation`.
Considering an iceberg table `gftest.sampleice`, with an row-level filter as `id='1'`.
SQL:
`EXPLAIN EXTENDED select *from gftest.sampleice limit 5;`
Execution Plan before this PR:
```
== Parsed Logical Plan ==
'GlobalLimit 5
+- 'LocalLimit 5
+- 'Project [*]
+- 'UnresolvedRelation [gftest, sampleice], [], false
== Analyzed Logical Plan ==
id: bigint, data: string
GlobalLimit 5
+- LocalLimit 5
+- Project [id#13332L, data#13333]
+- SubqueryAlias spark_catalog.gftest.sampleice
+- RelationV2[id#13332L, data#13333] spark_catalog.gftest.sampleice
== Optimized Logical Plan ==
GlobalLimit 5
+- LocalLimit 5
+- RelationV2[id#13332L, data#13333] spark_catalog.gftest.sampleice
== Physical Plan ==
CollectLimit 5
+- *(1) ColumnarToRow
+- BatchScan[id#13332L, data#13333] spark_catalog.gftest.sampleice [filters=] RuntimeFilters: []
```
Execution Plan before this PR:
```
== Parsed Logical Plan ==
'GlobalLimit 5
+- 'LocalLimit 5
+- 'Project [*]
+- 'UnresolvedRelation [gftest, sampleice], [], false
== Analyzed Logical Plan ==
id: bigint, data: string
GlobalLimit 5
+- LocalLimit 5
+- Project [id#142L, data#143]
+- SubqueryAlias spark_catalog.gftest.sampleice
+- Project [id#142L, data#143]
+- Filter (id#142L = cast(1 as bigint))
+- RowFilterAndDataMaskingMarker
+- RelationV2[id#142L, data#143] spark_catalog.gftest.sampleice
== Optimized Logical Plan ==
GlobalLimit 5
+- LocalLimit 5
+- Filter (isnotnull(id#142L) AND (id#142L = 1))
+- RelationV2[id#142L, data#143] spark_catalog.gftest.sampleice
== Physical Plan ==
CollectLimit 5
+- *(1) Filter (isnotnull(id#142L) AND (id#142L = 1))
+- *(1) ColumnarToRow
+- BatchScan[id#142L, data#143] spark_catalog.gftest.sampleice [filters=id IS NOT NULL, id = 1] RuntimeFilters: []
```
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3197 from bowenliang123/feature_pr_rowfilter_dsv2.
Closes #3186
f8b10396 [Bowen Liang] Merge branch 'master' into feature_pr_rowfilter_dsv2
1d476307 [liangbowen] 1.refactoring getDatasourceV2Identifier compatible for multiple spark versions 2.moving quote from PrivilegeBuilder to AuthZUtils
0853f864 [liangbowen] apply mvn spotless:apply
7d26e99f [liangbowen] remove unnecessary return
84b4beaa [liangbowen] change to use Identifier of DataSourceV2Relation instead of Table
f5def106 [liangbowen] remove unnecessary table null condition
a1516d55 [liangbowen] fix org.apache.iceberg.spark.source.SparkTable cannot be cast to scala.Option
bd173221 [liangbowen] add comments
804be4e1 [liangbowen] fix method signature
75980965 [liangbowen] adapting rowfilter for DataSourceV2Relation,by adjusting applyFilterAndMasking via TableIdentifier
Lead-authored-by: liangbowen <li...@gf.com.cn>
Co-authored-by: Bowen Liang <bo...@gmail.com>
Signed-off-by: Kent Yao <ya...@apache.org>
---
.../plugin/spark/authz/PrivilegesBuilder.scala | 12 ----------
.../ranger/RuleApplyRowFilterAndDataMasking.scala | 18 ++++++++++-----
.../plugin/spark/authz/util/AuthZUtils.scala | 27 ++++++++++++++++++++++
3 files changed, 39 insertions(+), 18 deletions(-)
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index e6090d327..459698e59 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -34,18 +34,6 @@ import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
object PrivilegesBuilder {
- private def quoteIfNeeded(part: String): String = {
- if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
- part
- } else {
- s"`${part.replace("`", "``")}`"
- }
- }
-
- private def quote(parts: Seq[String]): String = {
- parts.map(quoteIfNeeded).mkString(".")
- }
-
private def databasePrivileges(db: String): PrivilegeObject = {
PrivilegeObject(DATABASE, PrivilegeObjectActionType.OTHER, db, db)
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
index 78081b555..b18ce83e3 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.plugin.spark.authz.ranger
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -30,19 +30,26 @@ import org.apache.kyuubi.plugin.spark.authz.util.RowFilterAndDataMaskingMarker
class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
- // Apply FilterAndMasking and wrap HiveTableRelation/LogicalRelation with
+ // Apply FilterAndMasking and wrap HiveTableRelation/LogicalRelation/DataSourceV2Relation with
// RowFilterAndDataMaskingMarker if it is not wrapped yet.
plan mapChildren {
case p: RowFilterAndDataMaskingMarker => p
case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) =>
val table = getHiveTable(hiveTableRelation)
- applyFilterAndMasking(hiveTableRelation, table, spark)
+ applyFilterAndMasking(hiveTableRelation, table.identifier, spark)
case logicalRelation if hasResolvedDatasourceTable(logicalRelation) =>
val table = getDatasourceTable(logicalRelation)
if (table.isEmpty) {
logicalRelation
} else {
- applyFilterAndMasking(logicalRelation, table.get, spark)
+ applyFilterAndMasking(logicalRelation, table.get.identifier, spark)
+ }
+ case datasourceV2Relation if hasResolvedDatasourceV2Table(datasourceV2Relation) =>
+ val tableIdentifier = getDatasourceV2Identifier(datasourceV2Relation)
+ if (tableIdentifier.isEmpty) {
+ datasourceV2Relation
+ } else {
+ applyFilterAndMasking(datasourceV2Relation, tableIdentifier.get, spark)
}
case other => apply(other)
}
@@ -50,9 +57,8 @@ class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[Logical
private def applyFilterAndMasking(
plan: LogicalPlan,
- table: CatalogTable,
+ identifier: TableIdentifier,
spark: SparkSession): LogicalPlan = {
- val identifier = table.identifier
val ugi = getAuthzUgi(spark.sparkContext)
val opType = OperationType(plan.nodeName)
val parse = spark.sessionState.sqlParser.parseExpression _
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index 038155a03..377e99ce9 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -21,6 +21,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SPARK_VERSION, SparkContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -83,6 +84,20 @@ private[authz] object AuthZUtils {
getFieldVal[Option[CatalogTable]](plan, "catalogTable")
}
+ def hasResolvedDatasourceV2Table(plan: LogicalPlan): Boolean = {
+ plan.nodeName == "DataSourceV2Relation" && plan.resolved
+ }
+
+ def getDatasourceV2Identifier(plan: LogicalPlan): Option[TableIdentifier] = {
+ // avoid importing DataSourceV2Relation for Spark version compatibility
+ val identifier = getFieldVal[Option[AnyRef]](plan, "identifier")
+ identifier.map { id =>
+ val namespaces = invoke(id, "namespace").asInstanceOf[Array[String]]
+ val table = invoke(id, "name").asInstanceOf[String]
+ TableIdentifier(table, Some(quote(namespaces)))
+ }
+ }
+
def isSparkVersionAtMost(targetVersionString: String): Boolean = {
SemanticVersion(SPARK_VERSION).isVersionAtMost(targetVersionString)
}
@@ -94,4 +109,16 @@ private[authz] object AuthZUtils {
def isSparkVersionEqualTo(targetVersionString: String): Boolean = {
SemanticVersion(SPARK_VERSION).isVersionEqualTo(targetVersionString)
}
+
+ def quoteIfNeeded(part: String): String = {
+ if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
+ part
+ } else {
+ s"`${part.replace("`", "``")}`"
+ }
+ }
+
+ def quote(parts: Seq[String]): String = {
+ parts.map(quoteIfNeeded).mkString(".")
+ }
}