You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/06/30 07:20:54 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 8d4d00feb [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
8d4d00feb is described below
commit 8d4d00feb3cd7e6ef5e942531856de532ed3a74f
Author: zhouyifan279 <zh...@gmail.com>
AuthorDate: Thu Jun 30 15:20:44 2022 +0800
[KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
### _Why are the changes needed?_
Fix #2918
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2967 from zhouyifan279/2918.
Closes #2918
79800d5b [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
b279d2f5 [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
fcb1f8a3 [Min Zhao] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
3cab67b1 [Min Zhao] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
cff04d1c [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
24aaf81e [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
306508f8 [zhouyifan279] [KYUUBI #2918][Bug] Kyuubi integrated Ranger failed to query: table stats must be specified
Lead-authored-by: zhouyifan279 <zh...@gmail.com>
Co-authored-by: Min Zhao <zh...@163.com>
Signed-off-by: Kent Yao <ya...@apache.org>
---
.../ranger/RuleApplyRowFilterAndDataMasking.scala | 6 ++-
.../ranger/RuleReplaceShowObjectCommands.scala | 8 +--
.../authz/util/RowFilterAndDataMaskingMarker.scala | 12 +++--
.../spark/authz/util/RuleEliminateMarker.scala | 2 +-
...ernalChild.scala => WithInternalChildren.scala} | 6 ++-
.../authz/ranger/RangerSparkExtensionSuite.scala | 58 +++++++++++++++++++++-
6 files changed, 81 insertions(+), 11 deletions(-)
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
index 5ea1d2737..78081b555 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
@@ -30,7 +30,10 @@ import org.apache.kyuubi.plugin.spark.authz.util.RowFilterAndDataMaskingMarker
class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
- plan transformUp {
+ // Apply FilterAndMasking and wrap HiveTableRelation/LogicalRelation with
+ // RowFilterAndDataMaskingMarker if it is not wrapped yet.
+ plan mapChildren {
+ case p: RowFilterAndDataMaskingMarker => p
case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) =>
val table = getHiveTable(hiveTableRelation)
applyFilterAndMasking(hiveTableRelation, table, spark)
@@ -41,6 +44,7 @@ class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[Logical
} else {
applyFilterAndMasking(logicalRelation, table.get, spark)
}
+ case other => apply(other)
}
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala
index 22f91249b..08d2b4fd0 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleReplaceShowObjectCommands.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{RunnableCommand, ShowColumnsCommand}
import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
-import org.apache.kyuubi.plugin.spark.authz.util.{AuthZUtils, ObjectFilterPlaceHolder, WithInternalChild}
+import org.apache.kyuubi.plugin.spark.authz.util.{AuthZUtils, ObjectFilterPlaceHolder, WithInternalChildren}
class RuleReplaceShowObjectCommands extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
@@ -76,7 +76,7 @@ case class FilteredShowDatabasesCommand(delegated: RunnableCommand)
}
abstract class FilteredShowObjectCommand(delegated: RunnableCommand)
- extends RunnableCommand with WithInternalChild {
+ extends RunnableCommand with WithInternalChildren {
override val output: Seq[Attribute] = delegated.output
@@ -92,7 +92,7 @@ abstract class FilteredShowObjectCommand(delegated: RunnableCommand)
}
case class FilteredShowFunctionsCommand(delegated: RunnableCommand)
- extends FilteredShowObjectCommand(delegated) with WithInternalChild {
+ extends FilteredShowObjectCommand(delegated) with WithInternalChildren {
override protected def isAllowed(r: Row, ugi: UserGroupInformation): Boolean = {
val functionName = r.getString(0)
@@ -110,7 +110,7 @@ case class FilteredShowFunctionsCommand(delegated: RunnableCommand)
}
case class FilteredShowColumnsCommand(delegated: RunnableCommand)
- extends FilteredShowObjectCommand(delegated) with WithInternalChild {
+ extends FilteredShowObjectCommand(delegated) with WithInternalChildren {
override val output: Seq[Attribute] = delegated.output
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
index b6da24217..357e9bfc2 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
@@ -18,8 +18,14 @@
package org.apache.kyuubi.plugin.spark.authz.util
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+
+case class RowFilterAndDataMaskingMarker(child: LogicalPlan) extends UnaryNode
+ with WithInternalChild {
+
+ override def output: Seq[Attribute] = child.output
+
+ override def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
+ copy(child = newChild)
-case class RowFilterAndDataMaskingMarker(table: LogicalPlan) extends LeafNode {
- override def output: Seq[Attribute] = table.output
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
index 77e4083ab..d2da72570 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
@@ -22,6 +22,6 @@ import org.apache.spark.sql.catalyst.rules.Rule
class RuleEliminateMarker extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
- plan.transformUp { case rf: RowFilterAndDataMaskingMarker => rf.table }
+ plan.transformUp { case rf: RowFilterAndDataMaskingMarker => rf.child }
}
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/WithInternalChild.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/WithInternalChildren.scala
similarity index 91%
rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/WithInternalChild.scala
rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/WithInternalChildren.scala
index f10e72e28..bbce1dff8 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/WithInternalChild.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/WithInternalChildren.scala
@@ -19,6 +19,10 @@ package org.apache.kyuubi.plugin.spark.authz.util
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-trait WithInternalChild {
+trait WithInternalChildren {
def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): LogicalPlan
}
+
+trait WithInternalChild {
+ def withNewChildInternal(newChild: LogicalPlan): LogicalPlan
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 30f1a9f9c..ec48c6587 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -25,18 +25,22 @@ import scala.util.Try
import org.apache.commons.codec.digest.DigestUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.sql.{Row, SparkSessionExtensions}
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.scalatest.BeforeAndAfterAll
// scalastyle:off
import org.scalatest.funsuite.AnyFunSuite
import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider
+import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.getFieldVal
abstract class RangerSparkExtensionSuite extends AnyFunSuite
with SparkSessionProvider with BeforeAndAfterAll {
// scalastyle:on
override protected val extension: SparkSessionExtensions => Unit = new RangerSparkExtension
- private def doAs[T](user: String, f: => T): T = {
+ protected def doAs[T](user: String, f: => T): T = {
UserGroupInformation.createRemoteUser(user).doAs[T](
new PrivilegedExceptionAction[T] {
override def run(): T = f
@@ -380,4 +384,56 @@ class InMemoryCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite
class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
override protected val catalogImpl: String = "hive"
+
+ test("table stats must be specified") {
+ val table = "hive_src"
+ try {
+ doAs("admin", sql(s"CREATE TABLE IF NOT EXISTS $table (id int)"))
+ doAs(
+ "admin", {
+ val hiveTableRelation = sql(s"SELECT * FROM $table")
+ .queryExecution.optimizedPlan.collectLeaves().head.asInstanceOf[HiveTableRelation]
+ assert(getFieldVal[Option[Statistics]](hiveTableRelation, "tableStats").nonEmpty)
+ })
+ } finally {
+ doAs("admin", sql(s"DROP TABLE IF EXISTS $table"))
+ }
+ }
+
+ test("HiveTableRelation should be able to be converted to LogicalRelation") {
+ val table = "hive_src"
+ try {
+ doAs("admin", sql(s"CREATE TABLE IF NOT EXISTS $table (id int) STORED AS PARQUET"))
+ doAs(
+ "admin", {
+ val relation = sql(s"SELECT * FROM $table")
+ .queryExecution.optimizedPlan.collectLeaves().head
+ assert(relation.isInstanceOf[LogicalRelation])
+ })
+ } finally {
+ doAs("admin", sql(s"DROP TABLE IF EXISTS $table"))
+ }
+ }
+
+ test("Pass through JoinSelection") {
+ val db = "test"
+ val table1 = "table1"
+ val table2 = "table2"
+
+ doAs(
+ "admin",
+ try {
+ sql(s"CREATE DATABASE IF NOT EXISTS $db")
+ sql(s"CREATE TABLE IF NOT EXISTS $db.$table1(id int) STORED AS PARQUET")
+ sql(s"INSERT INTO $db.$table1 SELECT 1")
+ sql(s"CREATE TABLE IF NOT EXISTS $db.$table2(id int, name string) STORED AS PARQUET")
+ sql(s"INSERT INTO $db.$table2 SELECT 1, 'a'")
+ val join = s"SELECT a.id, b.name FROM $db.$table1 a JOIN $db.$table2 b ON a.id=b.id"
+ assert(sql(join).collect().length == 1)
+ } finally {
+ sql(s"DROP TABLE IF EXISTS $db.$table2")
+ sql(s"DROP TABLE IF EXISTS $db.$table1")
+ sql(s"DROP DATABASE IF EXISTS $db")
+ })
+ }
}