You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by bo...@apache.org on 2023/03/31 05:49:49 UTC
[kyuubi] branch master updated: [KYUUBI #4617] [AUTHZ] Collect results for filtered show objects ahead to prevent holding unserializable spark plan
This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 92f191a66 [KYUUBI #4617] [AUTHZ] Collect results for filtered show objects ahead to prevent holding unserializable spark plan
92f191a66 is described below
commit 92f191a660699414c02ed94c6834253d6a42318d
Author: liangbowen <li...@gf.com.cn>
AuthorDate: Fri Mar 31 13:49:36 2023 +0800
[KYUUBI #4617] [AUTHZ] Collect results for filtered show objects ahead to prevent holding unserializable spark plan
### _Why are the changes needed?_
To fix #4617.
- The reason for issue #4617 is that delegated SparkPlan is not serilizable when execution
- Collect results for filtered show objects ahead in FilterDataSourceV2Strategy to prevent holding the delegated plan
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4634 from bowenliang123/4617-filter.
Closes #4617
fe00ef58a [liangbowen] rename results to result
65ce03a51 [liangbowen] fix 4617
Authored-by: liangbowen <li...@gf.com.cn>
Signed-off-by: liangbowen <li...@gf.com.cn>
---
.../authz/ranger/FilterDataSourceV2Strategy.scala | 7 +++--
.../authz/ranger/FilteredShowObjectsExec.scala | 36 +++++++++++++++-------
.../authz/ranger/RangerSparkExtensionSuite.scala | 2 ++
3 files changed, 32 insertions(+), 13 deletions(-)
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala
index 1109464ac..d39aacdcf 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala
@@ -25,10 +25,13 @@ import org.apache.kyuubi.plugin.spark.authz.util.ObjectFilterPlaceHolder
class FilterDataSourceV2Strategy(spark: SparkSession) extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowNamespaces" =>
- spark.sessionState.planner.plan(child).map(FilteredShowNamespaceExec).toSeq
+ spark.sessionState.planner.plan(child)
+ .map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq
case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowTables" =>
- spark.sessionState.planner.plan(child).map(FilteredShowTablesExec).toSeq
+ spark.sessionState.planner.plan(child)
+ .map(FilteredShowTablesExec(_, spark.sparkContext)).toSeq
+
case _ => Nil
}
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala
index 7cc777d9b..67519118e 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.plugin.spark.authz.ranger
import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -26,24 +27,29 @@ import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils
trait FilteredShowObjectsExec extends LeafExecNode {
- def delegated: SparkPlan
+ def result: Array[InternalRow]
- final override def output: Seq[Attribute] = delegated.output
-
- final private lazy val result = {
- delegated.executeCollect().filter(isAllowed(_, AuthZUtils.getAuthzUgi(sparkContext)))
- }
+ override def output: Seq[Attribute]
final override def doExecute(): RDD[InternalRow] = {
sparkContext.parallelize(result, 1)
}
+}
- protected def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean
+trait FilteredShowObjectsCheck {
+ def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean
}
-case class FilteredShowNamespaceExec(delegated: SparkPlan) extends FilteredShowObjectsExec {
+case class FilteredShowNamespaceExec(result: Array[InternalRow], output: Seq[Attribute])
+ extends FilteredShowObjectsExec {}
+object FilteredShowNamespaceExec extends FilteredShowObjectsCheck {
+ def apply(delegated: SparkPlan, sc: SparkContext): FilteredShowNamespaceExec = {
+ val result = delegated.executeCollect()
+ .filter(isAllowed(_, AuthZUtils.getAuthzUgi(sc)))
+ new FilteredShowNamespaceExec(result, delegated.output)
+ }
- override protected def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
+ override def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
val database = r.getString(0)
val resource = AccessResource(ObjectType.DATABASE, database, null, null)
val request = AccessRequest(resource, ugi, OperationType.SHOWDATABASES, AccessType.USE)
@@ -52,8 +58,16 @@ case class FilteredShowNamespaceExec(delegated: SparkPlan) extends FilteredShowO
}
}
-case class FilteredShowTablesExec(delegated: SparkPlan) extends FilteredShowObjectsExec {
- override protected def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
+case class FilteredShowTablesExec(result: Array[InternalRow], output: Seq[Attribute])
+ extends FilteredShowObjectsExec {}
+object FilteredShowTablesExec extends FilteredShowObjectsCheck {
+ def apply(delegated: SparkPlan, sc: SparkContext): FilteredShowNamespaceExec = {
+ val result = delegated.executeCollect()
+ .filter(isAllowed(_, AuthZUtils.getAuthzUgi(sc)))
+ new FilteredShowNamespaceExec(result, delegated.output)
+ }
+
+ override def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean = {
val database = r.getString(0)
val table = r.getString(1)
val isTemp = r.getBoolean(2)
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 3aa551c42..4ccf15cba 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -233,6 +233,7 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
doAs("admin", assert(sql(s"show tables from $db").collect().length === 2))
doAs("bob", assert(sql(s"show tables from $db").collect().length === 0))
doAs("i_am_invisible", assert(sql(s"show tables from $db").collect().length === 0))
+ doAs("i_am_invisible", assert(sql(s"show tables from $db").limit(1).isEmpty))
}
}
@@ -247,6 +248,7 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
doAs("bob", assert(sql(s"SHOW DATABASES").collect().length == 1))
doAs("bob", assert(sql(s"SHOW DATABASES").collectAsList().get(0).getString(0) == "default"))
+ doAs("i_am_invisible", assert(sql(s"SHOW DATABASES").limit(1).isEmpty))
}
}