You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by bo...@apache.org on 2023/03/31 05:50:04 UTC

[kyuubi] branch branch-1.7 updated: [KYUUBI #4617] [AUTHZ] Collect results for filtered show objects ahead to prevent holding unserializable spark plan

This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new 520510ff5 [KYUUBI #4617] [AUTHZ] Collect results for filtered show objects ahead to prevent holding unserializable spark plan
520510ff5 is described below

commit 520510ff56e1162394aaa7f2df70679d435ebaa8
Author: liangbowen <li...@gf.com.cn>
AuthorDate: Fri Mar 31 13:49:36 2023 +0800

    [KYUUBI #4617] [AUTHZ] Collect results for filtered show objects ahead to prevent holding unserializable spark plan
    
    ### _Why are the changes needed?_
    
    To fix #4617.
    - The reason for issue #4617 is that delegated SparkPlan is not serilizable when execution
    - Collect results for filtered show objects ahead in FilterDataSourceV2Strategy to prevent holding the delegated plan
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4634 from bowenliang123/4617-filter.
    
    Closes #4617
    
    fe00ef58a [liangbowen] rename results to result
    65ce03a51 [liangbowen] fix 4617
    
    Authored-by: liangbowen <li...@gf.com.cn>
    Signed-off-by: liangbowen <li...@gf.com.cn>
    (cherry picked from commit 92f191a660699414c02ed94c6834253d6a42318d)
    Signed-off-by: liangbowen <li...@gf.com.cn>
---
 .../authz/ranger/FilterDataSourceV2Strategy.scala  |  7 +++--
 .../authz/ranger/FilteredShowObjectsExec.scala     | 36 +++++++++++++++-------
 .../authz/ranger/RangerSparkExtensionSuite.scala   |  2 ++
 3 files changed, 32 insertions(+), 13 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala
index 1109464ac..d39aacdcf 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala
@@ -25,10 +25,13 @@ import org.apache.kyuubi.plugin.spark.authz.util.ObjectFilterPlaceHolder
 class FilterDataSourceV2Strategy(spark: SparkSession) extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowNamespaces" =>
-      spark.sessionState.planner.plan(child).map(FilteredShowNamespaceExec).toSeq
+      spark.sessionState.planner.plan(child)
+        .map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq
 
     case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowTables" =>
-      spark.sessionState.planner.plan(child).map(FilteredShowTablesExec).toSeq
+      spark.sessionState.planner.plan(child)
+        .map(FilteredShowTablesExec(_, spark.sparkContext)).toSeq
+
     case _ => Nil
   }
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala
index 7cc777d9b..67519118e 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala
@@ -17,6 +17,7 @@
 package org.apache.kyuubi.plugin.spark.authz.ranger
 
 import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -26,24 +27,29 @@ import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
 import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils
 
 trait FilteredShowObjectsExec extends LeafExecNode {
-  def delegated: SparkPlan
+  def result: Array[InternalRow]
 
-  final override def output: Seq[Attribute] = delegated.output
-
-  final private lazy val result = {
-    delegated.executeCollect().filter(isAllowed(_, AuthZUtils.getAuthzUgi(sparkContext)))
-  }
+  override def output: Seq[Attribute]
 
   final override def doExecute(): RDD[InternalRow] = {
     sparkContext.parallelize(result, 1)
   }
+}
 
-  protected def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean
+trait FilteredShowObjectsCheck {
+  def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean
 }
 
-case class FilteredShowNamespaceExec(delegated: SparkPlan) extends FilteredShowObjectsExec {
+case class FilteredShowNamespaceExec(result: Array[InternalRow], output: Seq[Attribute])
+  extends FilteredShowObjectsExec {}
+object FilteredShowNamespaceExec extends FilteredShowObjectsCheck {
+  def apply(delegated: SparkPlan, sc: SparkContext): FilteredShowNamespaceExec = {
+    val result = delegated.executeCollect()
+      .filter(isAllowed(_, AuthZUtils.getAuthzUgi(sc)))
+    new FilteredShowNamespaceExec(result, delegated.output)
+  }
 
-  override protected def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
+  override def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
     val database = r.getString(0)
     val resource = AccessResource(ObjectType.DATABASE, database, null, null)
     val request = AccessRequest(resource, ugi, OperationType.SHOWDATABASES, AccessType.USE)
@@ -52,8 +58,16 @@ case class FilteredShowNamespaceExec(delegated: SparkPlan) extends FilteredShowO
   }
 }
 
-case class FilteredShowTablesExec(delegated: SparkPlan) extends FilteredShowObjectsExec {
-  override protected def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
+case class FilteredShowTablesExec(result: Array[InternalRow], output: Seq[Attribute])
+  extends FilteredShowObjectsExec {}
+object FilteredShowTablesExec extends FilteredShowObjectsCheck {
+  def apply(delegated: SparkPlan, sc: SparkContext): FilteredShowNamespaceExec = {
+    val result = delegated.executeCollect()
+      .filter(isAllowed(_, AuthZUtils.getAuthzUgi(sc)))
+    new FilteredShowNamespaceExec(result, delegated.output)
+  }
+
+  override def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
     val database = r.getString(0)
     val table = r.getString(1)
     val isTemp = r.getBoolean(2)
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 77ef32454..6479af3c7 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -310,6 +310,7 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
       doAs("admin", assert(sql(s"show tables from $db").collect().length === 2))
       doAs("bob", assert(sql(s"show tables from $db").collect().length === 0))
       doAs("i_am_invisible", assert(sql(s"show tables from $db").collect().length === 0))
+      doAs("i_am_invisible", assert(sql(s"show tables from $db").limit(1).isEmpty))
     }
   }
 
@@ -324,6 +325,7 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
 
       doAs("bob", assert(sql(s"SHOW DATABASES").collect().length == 1))
       doAs("bob", assert(sql(s"SHOW DATABASES").collectAsList().get(0).getString(0) == "default"))
+      doAs("i_am_invisible", assert(sql(s"SHOW DATABASES").limit(1).isEmpty))
     }
   }