You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/06 11:44:48 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1451] Add Row-level filtering support
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new c4a608f99 [KYUUBI #1451] Add Row-level filtering support
c4a608f99 is described below
commit c4a608f996cb3f0ede619595f5ef9b29a34a3ed9
Author: Kent Yao <ya...@apache.org>
AuthorDate: Wed Apr 6 19:44:40 2022 +0800
[KYUUBI #1451] Add Row-level filtering support
### _Why are the changes needed?_
Add Row-level filtering support
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2267 from yaooqinn/rowfiltering.
Closes #1451
58f65146 [Kent Yao] [KYUUBI #1451] Add Row level filtering support
5cc0ccad [Kent Yao] [KYUUBI #1451] Add Row level filtering support
ac26aab9 [Kent Yao] [KYUUBI #1451] Add Row level filtering support
534912c3 [Kent Yao] [KYUUBI #1451] Add Row level filtering support
74ca8187 [Kent Yao] [KYUUBI #1451] Add Row level filtering support
Authored-by: Kent Yao <ya...@apache.org>
Signed-off-by: Kent Yao <ya...@apache.org>
---
extensions/spark/kyuubi-spark-authz/README.md | 2 +-
.../plugin/spark/authz/PrivilegesBuilder.scala | 26 ++------
.../plugin/spark/authz/ranger/AccessRequest.scala | 14 ++--
.../plugin/spark/authz/ranger/AccessResource.scala | 2 +-
.../spark/authz/ranger/RangerSparkExtension.scala | 10 ++-
.../spark/authz/ranger/RuleApplyRowFilter.scala | 62 ++++++++++++++++++
...arkAuthorizer.scala => RuleAuthorization.scala} | 38 +++--------
...rkPlugin.scala => SparkRangerAdminPlugin.scala} | 11 +++-
.../plugin/spark/authz/util/AuthZUtils.scala | 75 +++++++++++++++++++++
.../RowFilterMarker.scala} | 9 ++-
.../RuleEliminateRowFilterMarker.scala} | 11 +++-
.../src/test/resources/sparkSql_hive_jenkins.json | 58 +++++++++++++++++
.../spark/authz/PrivilegesBuilderSuite.scala | 29 +--------
.../plugin/spark/authz/SparkSessionProvider.scala | 61 +++++++++++++++++
...Suite.scala => RangerSparkExtensionSuite.scala} | 76 +++++++++++++++++-----
.../authz/ranger/SparkRangerAdminPluginSuite.scala | 42 ++++++++++++
16 files changed, 414 insertions(+), 112 deletions(-)
diff --git a/extensions/spark/kyuubi-spark-authz/README.md b/extensions/spark/kyuubi-spark-authz/README.md
index e0e2bdda9..06bf2d2b9 100644
--- a/extensions/spark/kyuubi-spark-authz/README.md
+++ b/extensions/spark/kyuubi-spark-authz/README.md
@@ -20,7 +20,7 @@
## Functions
- [x] Column-level fine-grained authorization
-- [ ] Row-level fine-grained authorization, a.k.a. Row-level filtering
+- [x] Row-level fine-grained authorization, a.k.a. Row-level filtering
- [ ] Data masking
## Build
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index e332d1903..f275877b3 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -18,7 +18,6 @@
package org.apache.kyuubi.plugin.spark.authz
import scala.collection.mutable.ArrayBuffer
-import scala.util.{Failure, Success, Try}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -31,6 +30,7 @@ import org.apache.spark.sql.types.StructField
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectType._
+import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
object PrivilegesBuilder {
@@ -50,22 +50,6 @@ object PrivilegesBuilder {
parts.map(quoteIfNeeded).mkString(".")
}
- /**
- * fixme error handling need improve here
- */
- private def getFieldVal[T](o: Any, name: String): T = {
- Try {
- val field = o.getClass.getDeclaredField(name)
- field.setAccessible(true)
- field.get(o)
- } match {
- case Success(value) => value.asInstanceOf[T]
- case Failure(e) =>
- val candidates = o.getClass.getDeclaredFields.map(_.getName).mkString("[", ",", "]")
- throw new RuntimeException(s"$name not in $candidates", e)
- }
- }
-
private def databasePrivileges(db: String): PrivilegeObject = {
PrivilegeObject(DATABASE, PrivilegeObjectActionType.OTHER, db, db)
}
@@ -124,11 +108,11 @@ object PrivilegesBuilder {
val cols = projectionList ++ collectLeaves(f.condition)
buildQuery(f.child, privilegeObjects, cols)
- case h if h.nodeName == "HiveTableRelation" =>
- mergeProjection(getFieldVal[CatalogTable](h, "tableMeta"), h)
+ case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) =>
+ mergeProjection(getHiveTable(hiveTableRelation), hiveTableRelation)
- case l if l.nodeName == "LogicalRelation" =>
- getFieldVal[Option[CatalogTable]](l, "catalogTable").foreach { t =>
+ case logicalRelation if hasResolvedDatasourceTable(logicalRelation) =>
+ getDatasourceTable(logicalRelation).foreach { t =>
mergeProjection(t, plan)
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala
index c2d9e4149..d0b3e4fc4 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala
@@ -24,15 +24,16 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.ranger.plugin.policyengine.{RangerAccessRequestImpl, RangerPolicyEngine}
+import org.apache.kyuubi.plugin.spark.authz.OperationType.OperationType
import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType._
-case class AccessRequest(accessType: AccessType) extends RangerAccessRequestImpl
+case class AccessRequest private (accessType: AccessType) extends RangerAccessRequestImpl
object AccessRequest {
def apply(
resource: AccessResource,
user: UserGroupInformation,
- opType: String,
+ opType: OperationType,
accessType: AccessType): AccessRequest = {
val userName = user.getShortUserName
val groups = user.getGroupNames.toSet.asJava
@@ -40,13 +41,14 @@ object AccessRequest {
req.setResource(resource)
req.setUser(userName)
req.setUserGroups(groups)
+ req.setAction(opType.toString)
try {
- val getRoles = RangerSparkPlugin.getClass.getMethod(
+ val getRoles = SparkRangerAdminPlugin.getClass.getMethod(
"getRolesFromUserAndGroups",
classOf[String],
classOf[java.util.Set[String]])
getRoles.setAccessible(true)
- val roles = getRoles.invoke(RangerSparkPlugin, userName, groups)
+ val roles = getRoles.invoke(SparkRangerAdminPlugin, userName, groups)
val setRoles = req.getClass.getMethod("setUserRoles", classOf[java.util.Set[String]])
setRoles.setAccessible(true)
setRoles.invoke(req, roles)
@@ -59,9 +61,9 @@ object AccessRequest {
case _ => req.setAccessType(accessType.toString.toLowerCase)
}
try {
- val getClusterName = RangerSparkPlugin.getClass.getMethod("getClusterName")
+ val getClusterName = SparkRangerAdminPlugin.getClass.getMethod("getClusterName")
getClusterName.setAccessible(true)
- val clusterName = getClusterName.invoke(RangerSparkPlugin)
+ val clusterName = getClusterName.invoke(SparkRangerAdminPlugin)
val setClusterName = req.getClass.getMethod("setClusterName", classOf[String])
setClusterName.setAccessible(true)
setClusterName.invoke(req, clusterName)
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala
index 0fb1bc247..b26b8bf82 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala
@@ -58,7 +58,7 @@ object AccessResource {
resource.setValue("database", firstLevelResource)
resource.setValue("table", secondLevelResource)
}
- resource.setServiceDef(RangerSparkPlugin.getServiceDef)
+ resource.setServiceDef(SparkRangerAdminPlugin.getServiceDef)
resource
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
index cfc376f2c..949f4260d 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
@@ -19,11 +19,13 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.kyuubi.plugin.spark.authz.util.RuleEliminateRowFilterMarker
+
/**
* ACL Management for Apache Spark SQL with Apache Ranger, enabling:
* <ul>
* <li>Table/Column level authorization(yes)</li>
- * <li>Row level filtering(no)</li>
+ * <li>Row level filtering(yes)</li>
* <li>Data masking(no)</li>
* <ul>
*
@@ -34,9 +36,11 @@ import org.apache.spark.sql.SparkSessionExtensions
* @since 1.6.0
*/
class RangerSparkExtension extends (SparkSessionExtensions => Unit) {
- RangerSparkPlugin.init()
+ SparkRangerAdminPlugin.init()
override def apply(v1: SparkSessionExtensions): Unit = {
- v1.injectOptimizerRule(new RangerSparkAuthorizer(_))
+ v1.injectResolutionRule(new RuleApplyRowFilter(_))
+ v1.injectPostHocResolutionRule(_ => new RuleEliminateRowFilterMarker())
+ v1.injectOptimizerRule(new RuleAuthorization(_))
}
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala
new file mode 100644
index 000000000..2e3b1454e
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.ranger
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
+import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
+import org.apache.kyuubi.plugin.spark.authz.util.RowFilterMarker
+
+class RuleApplyRowFilter(spark: SparkSession) extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan transformUp {
+ case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) =>
+ val table = getHiveTable(hiveTableRelation)
+ applyFilter(hiveTableRelation, table, spark)
+ case logicalRelation if hasResolvedDatasourceTable(logicalRelation) =>
+ val table = getDatasourceTable(logicalRelation)
+ if (table.isEmpty) {
+ logicalRelation
+ } else {
+ applyFilter(logicalRelation, table.get, spark)
+ }
+ }
+ }
+
+ private def applyFilter(
+ plan: LogicalPlan,
+ table: CatalogTable,
+ spark: SparkSession): LogicalPlan = {
+ val identifier = table.identifier
+ val ugi = getAuthzUgi(spark.sparkContext)
+ val opType = OperationType(plan.nodeName)
+ val are = AccessResource(ObjectType.TABLE, identifier.database.orNull, identifier.table, null)
+ val art = AccessRequest(are, ugi, opType, AccessType.SELECT)
+ val filterExprStr = SparkRangerAdminPlugin.getFilterExpr(art)
+ if (filterExprStr.isEmpty) {
+ plan
+ } else {
+ val filterExpr = spark.sessionState.sqlParser.parseExpression(filterExprStr.get)
+ Filter(filterExpr, RowFilterMarker(plan))
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkAuthorizer.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
similarity index 69%
rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkAuthorizer.scala
rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
index 1121cb1c2..4ed1fadc3 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkAuthorizer.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
@@ -19,49 +19,31 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
import scala.collection.mutable.ArrayBuffer
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.kyuubi.plugin.spark.authz.{ranger, ObjectType, OperationType, PrivilegeObject, PrivilegesBuilder}
+import org.apache.kyuubi.plugin.spark.authz.{ObjectType, _}
import org.apache.kyuubi.plugin.spark.authz.ObjectType._
+import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
-class RangerSparkAuthorizer(spark: SparkSession) extends Rule[LogicalPlan] {
+class RuleAuthorization(spark: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
- RangerSparkAuthorizer.checkPrivileges(spark, plan)
+ RuleAuthorization.checkPrivileges(spark, plan)
plan
}
}
-object RangerSparkAuthorizer {
+object RuleAuthorization {
- /**
- * Get the active session user
- * @param spark spark context instance
- * @return the user name
- */
- private def getAuthzUgi(spark: SparkContext): UserGroupInformation = {
- // kyuubi.session.user is only used by kyuubi
- val user = spark.getLocalProperty("kyuubi.session.user")
- if (user != null) {
- UserGroupInformation.createRemoteUser(user)
- } else {
- UserGroupInformation.getCurrentUser
- }
- }
-
- def checkPrivileges(
- spark: SparkSession,
- plan: LogicalPlan): Unit = {
+ def checkPrivileges(spark: SparkSession, plan: LogicalPlan): Unit = {
val ugi = getAuthzUgi(spark.sparkContext)
val opType = OperationType(plan.nodeName)
val (inputs, outputs) = PrivilegesBuilder.build(plan)
val requests = new ArrayBuffer[AccessRequest]()
if (inputs.isEmpty && opType == OperationType.SHOWDATABASES) {
val resource = AccessResource(DATABASE, null)
- requests += AccessRequest(resource, ugi, opType.toString, AccessType.USE)
+ requests += AccessRequest(resource, ugi, opType, AccessType.USE)
}
def addAccessRequest(objects: Seq[PrivilegeObject], isInput: Boolean): Unit = {
@@ -70,7 +52,7 @@ object RangerSparkAuthorizer {
val accessType = ranger.AccessType(obj, opType, isInput)
if (accessType != AccessType.NONE && !requests.exists(o =>
o.accessType == accessType && o.getResource == resource)) {
- requests += AccessRequest(resource, ugi, opType.toString, accessType)
+ requests += AccessRequest(resource, ugi, opType, accessType)
}
}
}
@@ -84,7 +66,7 @@ object RangerSparkAuthorizer {
case ObjectType.COLUMN if resource.getColumns.nonEmpty =>
resource.getColumns.foreach { col =>
val cr = AccessResource(COLUMN, resource.getDatabase, resource.getTable, col)
- val req = AccessRequest(cr, ugi, opType.toString, request.accessType)
+ val req = AccessRequest(cr, ugi, opType, request.accessType)
verify(req)
}
case _ => verify(request)
@@ -93,7 +75,7 @@ object RangerSparkAuthorizer {
}
private def verify(req: AccessRequest): Unit = {
- val ret = RangerSparkPlugin.isAccessAllowed(req, null)
+ val ret = SparkRangerAdminPlugin.isAccessAllowed(req, null)
if (ret != null && !ret.getIsAllowed) {
throw new RuntimeException(
s"Permission denied: user [${req.getUser}] does not have [${req.getAccessType}] privilege" +
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
similarity index 73%
copy from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala
copy to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
index c1b68d938..b1471b7e5 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
@@ -19,4 +19,13 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
import org.apache.ranger.plugin.service.RangerBasePlugin
-object RangerSparkPlugin extends RangerBasePlugin("spark", "sparkSql")
+object SparkRangerAdminPlugin extends RangerBasePlugin("spark", "sparkSql") {
+
+ def getFilterExpr(req: AccessRequest): Option[String] = {
+ val result = evalRowFilterPolicies(req, null)
+ Option(result)
+ .filter(_.isRowFilterEnabled)
+ .map(_.getFilterExpr)
+ .filter(fe => fe != null && fe.nonEmpty)
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
new file mode 100644
index 000000000..e2e691fb3
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.util
+
+import scala.util.{Failure, Success, Try}
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+private[authz] object AuthZUtils {
+
+ /**
+ * fixme error handling need improve here
+ */
+ def getFieldVal[T](o: Any, name: String): T = {
+ Try {
+ val field = o.getClass.getDeclaredField(name)
+ field.setAccessible(true)
+ field.get(o)
+ } match {
+ case Success(value) => value.asInstanceOf[T]
+ case Failure(e) =>
+ val candidates = o.getClass.getDeclaredFields.map(_.getName).mkString("[", ",", "]")
+ throw new RuntimeException(s"$name not in $candidates", e)
+ }
+ }
+
+ /**
+ * Get the active session user
+ * @param spark spark context instance
+ * @return the user name
+ */
+ def getAuthzUgi(spark: SparkContext): UserGroupInformation = {
+ // kyuubi.session.user is only used by kyuubi
+ val user = spark.getLocalProperty("kyuubi.session.user")
+ if (user != null && user != UserGroupInformation.getCurrentUser.getShortUserName) {
+ UserGroupInformation.createRemoteUser(user)
+ } else {
+ UserGroupInformation.getCurrentUser
+ }
+ }
+
+ def hasResolvedHiveTable(plan: LogicalPlan): Boolean = {
+ plan.nodeName == "HiveTableRelation" && plan.resolved
+ }
+
+ def getHiveTable(plan: LogicalPlan): CatalogTable = {
+ getFieldVal[CatalogTable](plan, "tableMeta")
+ }
+
+ def hasResolvedDatasourceTable(plan: LogicalPlan): Boolean = {
+ plan.nodeName == "LogicalRelation" && plan.resolved
+ }
+
+ def getDatasourceTable(plan: LogicalPlan): Option[CatalogTable] = {
+ getFieldVal[Option[CatalogTable]](plan, "catalogTable")
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterMarker.scala
similarity index 72%
copy from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala
copy to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterMarker.scala
index c1b68d938..6b2e2d4d9 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterMarker.scala
@@ -15,8 +15,11 @@
* limitations under the License.
*/
-package org.apache.kyuubi.plugin.spark.authz.ranger
+package org.apache.kyuubi.plugin.spark.authz.util
-import org.apache.ranger.plugin.service.RangerBasePlugin
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
-object RangerSparkPlugin extends RangerBasePlugin("spark", "sparkSql")
+case class RowFilterMarker(table: LogicalPlan) extends LeafNode {
+ override def output: Seq[Attribute] = table.output
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateRowFilterMarker.scala
similarity index 69%
rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala
rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateRowFilterMarker.scala
index c1b68d938..4ec8bea38 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateRowFilterMarker.scala
@@ -15,8 +15,13 @@
* limitations under the License.
*/
-package org.apache.kyuubi.plugin.spark.authz.ranger
+package org.apache.kyuubi.plugin.spark.authz.util
-import org.apache.ranger.plugin.service.RangerBasePlugin
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
-object RangerSparkPlugin extends RangerBasePlugin("spark", "sparkSql")
+class RuleEliminateRowFilterMarker extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transformUp { case rf: RowFilterMarker => rf.table }
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
index a684c13f5..62a7bda2b 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
+++ b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
@@ -463,6 +463,64 @@
"guid": "fd24db19-f7cc-4e13-a8ba-bbd5a07a2d8d",
"isEnabled": true,
"version": 5
+ },
+ {
+ "service": "hive_jenkins",
+ "name": "src_key _less_than_20",
+ "policyType": 2,
+ "policyPriority": 0,
+ "description": "",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": [
+ "default"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "table": {
+ "values": [
+ "src"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ }
+ },
+ "policyItems": [],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [],
+ "rowFilterPolicyItems": [
+ {
+ "rowFilterInfo": {
+ "filterExpr": "key\u003c20"
+ },
+ "accesses": [
+ {
+ "type": "select",
+ "isAllowed": true
+ }
+ ],
+ "users": [
+ "bob"
+ ],
+ "groups": [],
+ "conditions": [],
+ "delegateAdmin": false
+ }
+ ],
+ "serviceType": "hive",
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [
+ ""
+ ],
+ "id": 4,
+ "guid": "f588a9ed-f7b1-48f7-9d0d-c12cf2b9b7ed",
+ "isEnabled": true,
+ "version": 26
}
],
"serviceDef": {
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
index e0d461553..252332e4e 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
@@ -18,38 +18,13 @@
package org.apache.kyuubi.plugin.spark.authz
import org.apache.commons.lang3.StringUtils
-import org.apache.spark.SPARK_VERSION
-import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.plugin.spark.authz.OperationType._
import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType
-abstract class PrivilegesBuilderSuite extends KyuubiFunSuite {
-
- protected val catalogImpl: String
-
- protected val isSparkV2: Boolean = SPARK_VERSION.split("\\.").head == "2"
- protected val isSparkV31OrGreater: Boolean = {
- val parts = SPARK_VERSION.split("\\.").map(_.toInt)
- (parts.head > 3) || (parts.head == 3 && parts(1) >= 1)
- }
- protected val isSparkV32OrGreater: Boolean = {
- val parts = SPARK_VERSION.split("\\.").map(_.toInt)
- (parts.head > 3) || (parts.head == 3 && parts(1) >= 2)
- }
-
- protected lazy val spark: SparkSession = SparkSession.builder()
- .master("local")
- .config("spark.ui.enabled", "false")
- .config(
- "spark.sql.warehouse.dir",
- Utils.createTempDir(namePrefix = "spark-warehouse").toString)
- .config("spark.sql.catalogImplementation", catalogImpl)
- .getOrCreate()
-
- protected val sql: String => DataFrame = spark.sql
+abstract class PrivilegesBuilderSuite extends KyuubiFunSuite with SparkSessionProvider {
protected def withTable(t: String)(f: String => Unit): Unit = {
try {
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
new file mode 100644
index 000000000..607d8257b
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz
+
+import java.nio.file.Files
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.{DataFrame, SparkSession, SparkSessionExtensions}
+
+import org.apache.kyuubi.Utils
+
+trait SparkSessionProvider {
+ protected val catalogImpl: String
+ protected def format: String = if (catalogImpl == "hive") "hive" else "parquet"
+ protected val isSparkV2: Boolean = SPARK_VERSION.split("\\.").head == "2"
+ protected val isSparkV31OrGreater: Boolean = {
+ val parts = SPARK_VERSION.split("\\.").map(_.toInt)
+ (parts.head > 3) || (parts.head == 3 && parts(1) >= 1)
+ }
+ protected val isSparkV32OrGreater: Boolean = {
+ val parts = SPARK_VERSION.split("\\.").map(_.toInt)
+ (parts.head > 3) || (parts.head == 3 && parts(1) >= 2)
+ }
+
+ protected val extension: SparkSessionExtensions => Unit = _ => Unit
+ protected lazy val spark: SparkSession = {
+ val metastore = {
+ val path = Utils.createTempDir(namePrefix = "hms")
+ Files.delete(path)
+ path
+ }
+ SparkSession.builder()
+ .master("local")
+ .config("spark.ui.enabled", "false")
+ .config("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastore;create=true")
+ .config("spark.sql.catalogImplementation", catalogImpl)
+ .config(
+ "spark.sql.warehouse.dir",
+ Utils.createTempDir(namePrefix = "spark-warehouse").toString)
+ .withExtensions(extension)
+ .getOrCreate()
+ }
+
+ protected val sql: String => DataFrame = spark.sql
+
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkAuthorizerSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
similarity index 65%
rename from extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkAuthorizerSuite.scala
rename to extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 4ec3422c1..e3962609c 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkAuthorizerSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -22,23 +22,14 @@ import java.security.PrivilegedExceptionAction
import scala.util.Try
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{Row, SparkSessionExtensions}
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider
-class RangerSparkAuthorizerSuite extends KyuubiFunSuite {
+abstract class RangerSparkExtensionSuite extends KyuubiFunSuite with SparkSessionProvider {
- private lazy val spark: SparkSession = SparkSession.builder()
- .master("local")
- .withExtensions(new RangerSparkExtension)
- .config("spark.ui.enabled", "false")
- .config(
- "spark.sql.warehouse.dir",
- Utils.createTempDir(namePrefix = "spark-warehouse").toString)
- .config("spark.sql.catalogImplementation", "in-memory")
- .getOrCreate()
-
- private val sql: String => DataFrame = spark.sql
+ override protected val extension: SparkSessionExtensions => Unit = new RangerSparkExtension
private def doAs[T](user: String, f: => T): T = {
UserGroupInformation.createRemoteUser(user).doAs[T](
@@ -59,7 +50,7 @@ class RangerSparkAuthorizerSuite extends KyuubiFunSuite {
s"Permission denied: user [$user] does not have [$privilege] privilege on [$resource]"
}
- test("databases") {
+ test("auth: databases") {
val testDb = "mydb"
val create = s"CREATE DATABASE IF NOT EXISTS $testDb"
val alter = s"ALTER DATABASE $testDb SET DBPROPERTIES (abc = '123')"
@@ -73,19 +64,19 @@ class RangerSparkAuthorizerSuite extends KyuubiFunSuite {
val e1 = intercept[RuntimeException](sql(alter))
assert(e1.getMessage === errorMessage("alter", "mydb"))
val e2 = intercept[RuntimeException](sql(drop))
- assert(e2.getMessage === (errorMessage("drop", "mydb")))
+ assert(e2.getMessage === errorMessage("drop", "mydb"))
doAs("kent", Try(sql("SHOW DATABASES")).isSuccess)
} finally {
doAs("admin", sql(drop))
}
}
- test("tables") {
+ test("auth: tables") {
val db = "default"
val table = "src"
val col = "key"
- val create0 = s"CREATE TABLE IF NOT EXISTS $db.$table ($col int, value int) USING parquet"
+ val create0 = s"CREATE TABLE IF NOT EXISTS $db.$table ($col int, value int) USING $format"
val alter0 = s"ALTER TABLE $db.$table SET TBLPROPERTIES(key='ak')"
val drop0 = s"DROP TABLE IF EXISTS $db.$table"
val select = s"SELECT * FROM $db.$table"
@@ -123,7 +114,7 @@ class RangerSparkAuthorizerSuite extends KyuubiFunSuite {
}
}
- test("functions") {
+ test("auth: functions") {
val db = "default"
val func = "func"
val create0 = s"CREATE FUNCTION IF NOT EXISTS $db.$func AS 'abc.mnl.xyz'"
@@ -134,4 +125,53 @@ class RangerSparkAuthorizerSuite extends KyuubiFunSuite {
})
doAs("admin", assert(Try(sql(create0)).isSuccess))
}
+
+ test("row level filter") {
+ val db = "default"
+ val table = "src"
+ val col = "key"
+ val create = s"CREATE TABLE IF NOT EXISTS $db.$table ($col int, value int) USING $format"
+ try {
+ doAs("admin", assert(Try { sql(create) }.isSuccess))
+ doAs("admin", sql(s"INSERT INTO $db.$table SELECT 1, 1"))
+ doAs("admin", sql(s"INSERT INTO $db.$table SELECT 20, 2"))
+ doAs("admin", sql(s"INSERT INTO $db.$table SELECT 30, 3"))
+
+ doAs(
+ "kent",
+ assert(sql(s"SELECT key FROM $db.$table order by key").collect() ===
+ Seq(Row(1), Row(20), Row(30))))
+
+ Seq(
+ s"SELECT value FROM $db.$table",
+ s"SELECT value as key FROM $db.$table",
+ s"SELECT max(value) FROM $db.$table",
+ s"SELECT coalesce(max(value), 1) FROM $db.$table",
+ s"SELECT value FROM $db.$table WHERE value in (SELECT value as key FROM $db.$table)")
+ .foreach { q =>
+ doAs(
+ "bob", {
+ withClue(q) {
+ assert(sql(q).collect() === Seq(Row(1)))
+ }
+ })
+ }
+ doAs(
+ "bob", {
+ sql(s"CREATE TABLE $db.src2 using $format AS SELECT value FROM $db.$table")
+ assert(sql(s"SELECT value FROM $db.${table}2").collect() === Seq(Row(1)))
+ })
+ } finally {
+ doAs("admin", sql(s"DROP TABLE IF EXISTS $db.${table}2"))
+ doAs("admin", sql(s"DROP TABLE IF EXISTS $db.$table"))
+ }
+ }
+}
+
+class InMemoryCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
+ override protected val catalogImpl: String = "in-memory"
+}
+
+class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
+ override protected val catalogImpl: String = "hive"
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala
new file mode 100644
index 000000000..ae1679579
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.ranger
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
+import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._
+
+class SparkRangerAdminPluginSuite extends KyuubiFunSuite {
+
+ test("get filter expression") {
+ val bob = UserGroupInformation.createRemoteUser("bob")
+ val are = AccessResource(ObjectType.TABLE, "default", "src", null)
+ def buildAccessRequest(ugi: UserGroupInformation): AccessRequest = {
+ AccessRequest(are, ugi, OperationType.QUERY, AccessType.SELECT)
+ }
+ val maybeString = getFilterExpr(buildAccessRequest(bob))
+ assert(maybeString.get === "key<20")
+ Seq("admin", "alice").foreach { user =>
+ val ugi = UserGroupInformation.createRemoteUser(user)
+ val maybeString = getFilterExpr(buildAccessRequest(ugi))
+ assert(maybeString.isEmpty)
+ }
+ }
+}