You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/06 11:44:48 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1451] Add Row-level filtering support

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new c4a608f99 [KYUUBI #1451] Add Row-level filtering support
c4a608f99 is described below

commit c4a608f996cb3f0ede619595f5ef9b29a34a3ed9
Author: Kent Yao <ya...@apache.org>
AuthorDate: Wed Apr 6 19:44:40 2022 +0800

    [KYUUBI #1451] Add Row-level filtering support
    
    ### _Why are the changes needed?_
    
    Add Row-level filtering support
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2267 from yaooqinn/rowfiltering.
    
    Closes #1451
    
    58f65146 [Kent Yao] [KYUUBI #1451] Add Row level filtering support
    5cc0ccad [Kent Yao] [KYUUBI #1451] Add Row level filtering support
    ac26aab9 [Kent Yao] [KYUUBI #1451] Add Row level filtering support
    534912c3 [Kent Yao] [KYUUBI #1451] Add Row level filtering support
    74ca8187 [Kent Yao] [KYUUBI #1451] Add Row level filtering support
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 extensions/spark/kyuubi-spark-authz/README.md      |  2 +-
 .../plugin/spark/authz/PrivilegesBuilder.scala     | 26 ++------
 .../plugin/spark/authz/ranger/AccessRequest.scala  | 14 ++--
 .../plugin/spark/authz/ranger/AccessResource.scala |  2 +-
 .../spark/authz/ranger/RangerSparkExtension.scala  | 10 ++-
 .../spark/authz/ranger/RuleApplyRowFilter.scala    | 62 ++++++++++++++++++
 ...arkAuthorizer.scala => RuleAuthorization.scala} | 38 +++--------
 ...rkPlugin.scala => SparkRangerAdminPlugin.scala} | 11 +++-
 .../plugin/spark/authz/util/AuthZUtils.scala       | 75 +++++++++++++++++++++
 .../RowFilterMarker.scala}                         |  9 ++-
 .../RuleEliminateRowFilterMarker.scala}            | 11 +++-
 .../src/test/resources/sparkSql_hive_jenkins.json  | 58 +++++++++++++++++
 .../spark/authz/PrivilegesBuilderSuite.scala       | 29 +--------
 .../plugin/spark/authz/SparkSessionProvider.scala  | 61 +++++++++++++++++
 ...Suite.scala => RangerSparkExtensionSuite.scala} | 76 +++++++++++++++++-----
 .../authz/ranger/SparkRangerAdminPluginSuite.scala | 42 ++++++++++++
 16 files changed, 414 insertions(+), 112 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-authz/README.md b/extensions/spark/kyuubi-spark-authz/README.md
index e0e2bdda9..06bf2d2b9 100644
--- a/extensions/spark/kyuubi-spark-authz/README.md
+++ b/extensions/spark/kyuubi-spark-authz/README.md
@@ -20,7 +20,7 @@
 ## Functions
 
 - [x] Column-level fine-grained authorization
-- [ ] Row-level fine-grained authorization, a.k.a. Row-level filtering
+- [x] Row-level fine-grained authorization, a.k.a. Row-level filtering
 - [ ] Data masking
 
 ## Build
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index e332d1903..f275877b3 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -18,7 +18,6 @@
 package org.apache.kyuubi.plugin.spark.authz
 
 import scala.collection.mutable.ArrayBuffer
-import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -31,6 +30,7 @@ import org.apache.spark.sql.types.StructField
 
 import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
 import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectType._
+import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
 
 object PrivilegesBuilder {
 
@@ -50,22 +50,6 @@ object PrivilegesBuilder {
     parts.map(quoteIfNeeded).mkString(".")
   }
 
-  /**
-   * fixme error handling need improve here
-   */
-  private def getFieldVal[T](o: Any, name: String): T = {
-    Try {
-      val field = o.getClass.getDeclaredField(name)
-      field.setAccessible(true)
-      field.get(o)
-    } match {
-      case Success(value) => value.asInstanceOf[T]
-      case Failure(e) =>
-        val candidates = o.getClass.getDeclaredFields.map(_.getName).mkString("[", ",", "]")
-        throw new RuntimeException(s"$name not in $candidates", e)
-    }
-  }
-
   private def databasePrivileges(db: String): PrivilegeObject = {
     PrivilegeObject(DATABASE, PrivilegeObjectActionType.OTHER, db, db)
   }
@@ -124,11 +108,11 @@ object PrivilegesBuilder {
         val cols = projectionList ++ collectLeaves(f.condition)
         buildQuery(f.child, privilegeObjects, cols)
 
-      case h if h.nodeName == "HiveTableRelation" =>
-        mergeProjection(getFieldVal[CatalogTable](h, "tableMeta"), h)
+      case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) =>
+        mergeProjection(getHiveTable(hiveTableRelation), hiveTableRelation)
 
-      case l if l.nodeName == "LogicalRelation" =>
-        getFieldVal[Option[CatalogTable]](l, "catalogTable").foreach { t =>
+      case logicalRelation if hasResolvedDatasourceTable(logicalRelation) =>
+        getDatasourceTable(logicalRelation).foreach { t =>
           mergeProjection(t, plan)
         }
 
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala
index c2d9e4149..d0b3e4fc4 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessRequest.scala
@@ -24,15 +24,16 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.ranger.plugin.policyengine.{RangerAccessRequestImpl, RangerPolicyEngine}
 
+import org.apache.kyuubi.plugin.spark.authz.OperationType.OperationType
 import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType._
 
-case class AccessRequest(accessType: AccessType) extends RangerAccessRequestImpl
+case class AccessRequest private (accessType: AccessType) extends RangerAccessRequestImpl
 
 object AccessRequest {
   def apply(
       resource: AccessResource,
       user: UserGroupInformation,
-      opType: String,
+      opType: OperationType,
       accessType: AccessType): AccessRequest = {
     val userName = user.getShortUserName
     val groups = user.getGroupNames.toSet.asJava
@@ -40,13 +41,14 @@ object AccessRequest {
     req.setResource(resource)
     req.setUser(userName)
     req.setUserGroups(groups)
+    req.setAction(opType.toString)
     try {
-      val getRoles = RangerSparkPlugin.getClass.getMethod(
+      val getRoles = SparkRangerAdminPlugin.getClass.getMethod(
         "getRolesFromUserAndGroups",
         classOf[String],
         classOf[java.util.Set[String]])
       getRoles.setAccessible(true)
-      val roles = getRoles.invoke(RangerSparkPlugin, userName, groups)
+      val roles = getRoles.invoke(SparkRangerAdminPlugin, userName, groups)
       val setRoles = req.getClass.getMethod("setUserRoles", classOf[java.util.Set[String]])
       setRoles.setAccessible(true)
       setRoles.invoke(req, roles)
@@ -59,9 +61,9 @@ object AccessRequest {
       case _ => req.setAccessType(accessType.toString.toLowerCase)
     }
     try {
-      val getClusterName = RangerSparkPlugin.getClass.getMethod("getClusterName")
+      val getClusterName = SparkRangerAdminPlugin.getClass.getMethod("getClusterName")
       getClusterName.setAccessible(true)
-      val clusterName = getClusterName.invoke(RangerSparkPlugin)
+      val clusterName = getClusterName.invoke(SparkRangerAdminPlugin)
       val setClusterName = req.getClass.getMethod("setClusterName", classOf[String])
       setClusterName.setAccessible(true)
       setClusterName.invoke(req, clusterName)
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala
index 0fb1bc247..b26b8bf82 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala
@@ -58,7 +58,7 @@ object AccessResource {
         resource.setValue("database", firstLevelResource)
         resource.setValue("table", secondLevelResource)
     }
-    resource.setServiceDef(RangerSparkPlugin.getServiceDef)
+    resource.setServiceDef(SparkRangerAdminPlugin.getServiceDef)
     resource
   }
 
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
index cfc376f2c..949f4260d 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
@@ -19,11 +19,13 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
 
 import org.apache.spark.sql.SparkSessionExtensions
 
+import org.apache.kyuubi.plugin.spark.authz.util.RuleEliminateRowFilterMarker
+
 /**
  * ACL Management for Apache Spark SQL with Apache Ranger, enabling:
  * <ul>
  *   <li>Table/Column level authorization(yes)</li>
- *   <li>Row level filtering(no)</li>
+ *   <li>Row level filtering(yes)</li>
  *   <li>Data masking(no)</li>
  * <ul>
  *
@@ -34,9 +36,11 @@ import org.apache.spark.sql.SparkSessionExtensions
  *  @since 1.6.0
  */
 class RangerSparkExtension extends (SparkSessionExtensions => Unit) {
-  RangerSparkPlugin.init()
+  SparkRangerAdminPlugin.init()
 
   override def apply(v1: SparkSessionExtensions): Unit = {
-    v1.injectOptimizerRule(new RangerSparkAuthorizer(_))
+    v1.injectResolutionRule(new RuleApplyRowFilter(_))
+    v1.injectPostHocResolutionRule(_ => new RuleEliminateRowFilterMarker())
+    v1.injectOptimizerRule(new RuleAuthorization(_))
   }
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala
new file mode 100644
index 000000000..2e3b1454e
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.ranger
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
+import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
+import org.apache.kyuubi.plugin.spark.authz.util.RowFilterMarker
+
+class RuleApplyRowFilter(spark: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan transformUp {
+      case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) =>
+        val table = getHiveTable(hiveTableRelation)
+        applyFilter(hiveTableRelation, table, spark)
+      case logicalRelation if hasResolvedDatasourceTable(logicalRelation) =>
+        val table = getDatasourceTable(logicalRelation)
+        if (table.isEmpty) {
+          logicalRelation
+        } else {
+          applyFilter(logicalRelation, table.get, spark)
+        }
+    }
+  }
+
+  private def applyFilter(
+      plan: LogicalPlan,
+      table: CatalogTable,
+      spark: SparkSession): LogicalPlan = {
+    val identifier = table.identifier
+    val ugi = getAuthzUgi(spark.sparkContext)
+    val opType = OperationType(plan.nodeName)
+    val are = AccessResource(ObjectType.TABLE, identifier.database.orNull, identifier.table, null)
+    val art = AccessRequest(are, ugi, opType, AccessType.SELECT)
+    val filterExprStr = SparkRangerAdminPlugin.getFilterExpr(art)
+    if (filterExprStr.isEmpty) {
+      plan
+    } else {
+      val filterExpr = spark.sessionState.sqlParser.parseExpression(filterExprStr.get)
+      Filter(filterExpr, RowFilterMarker(plan))
+    }
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkAuthorizer.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
similarity index 69%
rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkAuthorizer.scala
rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
index 1121cb1c2..4ed1fadc3 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkAuthorizer.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
@@ -19,49 +19,31 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.SparkContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 
-import org.apache.kyuubi.plugin.spark.authz.{ranger, ObjectType, OperationType, PrivilegeObject, PrivilegesBuilder}
+import org.apache.kyuubi.plugin.spark.authz.{ObjectType, _}
 import org.apache.kyuubi.plugin.spark.authz.ObjectType._
+import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
 
-class RangerSparkAuthorizer(spark: SparkSession) extends Rule[LogicalPlan] {
+class RuleAuthorization(spark: SparkSession) extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    RangerSparkAuthorizer.checkPrivileges(spark, plan)
+    RuleAuthorization.checkPrivileges(spark, plan)
     plan
   }
 }
 
-object RangerSparkAuthorizer {
+object RuleAuthorization {
 
-  /**
-   * Get the active session user
-   * @param spark spark context instance
-   * @return the user name
-   */
-  private def getAuthzUgi(spark: SparkContext): UserGroupInformation = {
-    // kyuubi.session.user is only used by kyuubi
-    val user = spark.getLocalProperty("kyuubi.session.user")
-    if (user != null) {
-      UserGroupInformation.createRemoteUser(user)
-    } else {
-      UserGroupInformation.getCurrentUser
-    }
-  }
-
-  def checkPrivileges(
-      spark: SparkSession,
-      plan: LogicalPlan): Unit = {
+  def checkPrivileges(spark: SparkSession, plan: LogicalPlan): Unit = {
     val ugi = getAuthzUgi(spark.sparkContext)
     val opType = OperationType(plan.nodeName)
     val (inputs, outputs) = PrivilegesBuilder.build(plan)
     val requests = new ArrayBuffer[AccessRequest]()
     if (inputs.isEmpty && opType == OperationType.SHOWDATABASES) {
       val resource = AccessResource(DATABASE, null)
-      requests += AccessRequest(resource, ugi, opType.toString, AccessType.USE)
+      requests += AccessRequest(resource, ugi, opType, AccessType.USE)
     }
 
     def addAccessRequest(objects: Seq[PrivilegeObject], isInput: Boolean): Unit = {
@@ -70,7 +52,7 @@ object RangerSparkAuthorizer {
         val accessType = ranger.AccessType(obj, opType, isInput)
         if (accessType != AccessType.NONE && !requests.exists(o =>
             o.accessType == accessType && o.getResource == resource)) {
-          requests += AccessRequest(resource, ugi, opType.toString, accessType)
+          requests += AccessRequest(resource, ugi, opType, accessType)
         }
       }
     }
@@ -84,7 +66,7 @@ object RangerSparkAuthorizer {
         case ObjectType.COLUMN if resource.getColumns.nonEmpty =>
           resource.getColumns.foreach { col =>
             val cr = AccessResource(COLUMN, resource.getDatabase, resource.getTable, col)
-            val req = AccessRequest(cr, ugi, opType.toString, request.accessType)
+            val req = AccessRequest(cr, ugi, opType, request.accessType)
             verify(req)
           }
         case _ => verify(request)
@@ -93,7 +75,7 @@ object RangerSparkAuthorizer {
   }
 
   private def verify(req: AccessRequest): Unit = {
-    val ret = RangerSparkPlugin.isAccessAllowed(req, null)
+    val ret = SparkRangerAdminPlugin.isAccessAllowed(req, null)
     if (ret != null && !ret.getIsAllowed) {
       throw new RuntimeException(
         s"Permission denied: user [${req.getUser}] does not have [${req.getAccessType}] privilege" +
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
similarity index 73%
copy from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala
copy to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
index c1b68d938..b1471b7e5 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
@@ -19,4 +19,13 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
 
 import org.apache.ranger.plugin.service.RangerBasePlugin
 
-object RangerSparkPlugin extends RangerBasePlugin("spark", "sparkSql")
+object SparkRangerAdminPlugin extends RangerBasePlugin("spark", "sparkSql") {
+
+  def getFilterExpr(req: AccessRequest): Option[String] = {
+    val result = evalRowFilterPolicies(req, null)
+    Option(result)
+      .filter(_.isRowFilterEnabled)
+      .map(_.getFilterExpr)
+      .filter(fe => fe != null && fe.nonEmpty)
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
new file mode 100644
index 000000000..e2e691fb3
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.util
+
+import scala.util.{Failure, Success, Try}
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+private[authz] object AuthZUtils {
+
+  /**
+   * fixme error handling need improve here
+   */
+  def getFieldVal[T](o: Any, name: String): T = {
+    Try {
+      val field = o.getClass.getDeclaredField(name)
+      field.setAccessible(true)
+      field.get(o)
+    } match {
+      case Success(value) => value.asInstanceOf[T]
+      case Failure(e) =>
+        val candidates = o.getClass.getDeclaredFields.map(_.getName).mkString("[", ",", "]")
+        throw new RuntimeException(s"$name not in $candidates", e)
+    }
+  }
+
+  /**
+   * Get the active session user
+   * @param spark spark context instance
+   * @return the user name
+   */
+  def getAuthzUgi(spark: SparkContext): UserGroupInformation = {
+    // kyuubi.session.user is only used by kyuubi
+    val user = spark.getLocalProperty("kyuubi.session.user")
+    if (user != null && user != UserGroupInformation.getCurrentUser.getShortUserName) {
+      UserGroupInformation.createRemoteUser(user)
+    } else {
+      UserGroupInformation.getCurrentUser
+    }
+  }
+
+  def hasResolvedHiveTable(plan: LogicalPlan): Boolean = {
+    plan.nodeName == "HiveTableRelation" && plan.resolved
+  }
+
+  def getHiveTable(plan: LogicalPlan): CatalogTable = {
+    getFieldVal[CatalogTable](plan, "tableMeta")
+  }
+
+  def hasResolvedDatasourceTable(plan: LogicalPlan): Boolean = {
+    plan.nodeName == "LogicalRelation" && plan.resolved
+  }
+
+  def getDatasourceTable(plan: LogicalPlan): Option[CatalogTable] = {
+    getFieldVal[Option[CatalogTable]](plan, "catalogTable")
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterMarker.scala
similarity index 72%
copy from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala
copy to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterMarker.scala
index c1b68d938..6b2e2d4d9 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterMarker.scala
@@ -15,8 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.plugin.spark.authz.ranger
+package org.apache.kyuubi.plugin.spark.authz.util
 
-import org.apache.ranger.plugin.service.RangerBasePlugin
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 
-object RangerSparkPlugin extends RangerBasePlugin("spark", "sparkSql")
+case class RowFilterMarker(table: LogicalPlan) extends LeafNode {
+  override def output: Seq[Attribute] = table.output
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateRowFilterMarker.scala
similarity index 69%
rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala
rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateRowFilterMarker.scala
index c1b68d938..4ec8bea38 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkPlugin.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateRowFilterMarker.scala
@@ -15,8 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.kyuubi.plugin.spark.authz.ranger
+package org.apache.kyuubi.plugin.spark.authz.util
 
-import org.apache.ranger.plugin.service.RangerBasePlugin
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 
-object RangerSparkPlugin extends RangerBasePlugin("spark", "sparkSql")
+class RuleEliminateRowFilterMarker extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformUp { case rf: RowFilterMarker => rf.table }
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
index a684c13f5..62a7bda2b 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
+++ b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
@@ -463,6 +463,64 @@
       "guid": "fd24db19-f7cc-4e13-a8ba-bbd5a07a2d8d",
       "isEnabled": true,
       "version": 5
+    },
+    {
+      "service": "hive_jenkins",
+      "name": "src_key _less_than_20",
+      "policyType": 2,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "database": {
+          "values": [
+            "default"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "table": {
+          "values": [
+            "src"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [
+        {
+          "rowFilterInfo": {
+            "filterExpr": "key\u003c20"
+          },
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "bob"
+          ],
+          "groups": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "serviceType": "hive",
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [
+        ""
+      ],
+      "id": 4,
+      "guid": "f588a9ed-f7b1-48f7-9d0d-c12cf2b9b7ed",
+      "isEnabled": true,
+      "version": 26
     }
   ],
   "serviceDef": {
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
index e0d461553..252332e4e 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
@@ -18,38 +18,13 @@
 package org.apache.kyuubi.plugin.spark.authz
 
 import org.apache.commons.lang3.StringUtils
-import org.apache.spark.SPARK_VERSION
-import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
-import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.plugin.spark.authz.OperationType._
 import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType
 
-abstract class PrivilegesBuilderSuite extends KyuubiFunSuite {
-
-  protected val catalogImpl: String
-
-  protected val isSparkV2: Boolean = SPARK_VERSION.split("\\.").head == "2"
-  protected val isSparkV31OrGreater: Boolean = {
-    val parts = SPARK_VERSION.split("\\.").map(_.toInt)
-    (parts.head > 3) || (parts.head == 3 && parts(1) >= 1)
-  }
-  protected val isSparkV32OrGreater: Boolean = {
-    val parts = SPARK_VERSION.split("\\.").map(_.toInt)
-    (parts.head > 3) || (parts.head == 3 && parts(1) >= 2)
-  }
-
-  protected lazy val spark: SparkSession = SparkSession.builder()
-    .master("local")
-    .config("spark.ui.enabled", "false")
-    .config(
-      "spark.sql.warehouse.dir",
-      Utils.createTempDir(namePrefix = "spark-warehouse").toString)
-    .config("spark.sql.catalogImplementation", catalogImpl)
-    .getOrCreate()
-
-  protected val sql: String => DataFrame = spark.sql
+abstract class PrivilegesBuilderSuite extends KyuubiFunSuite with SparkSessionProvider {
 
   protected def withTable(t: String)(f: String => Unit): Unit = {
     try {
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
new file mode 100644
index 000000000..607d8257b
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz
+
+import java.nio.file.Files
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.{DataFrame, SparkSession, SparkSessionExtensions}
+
+import org.apache.kyuubi.Utils
+
+trait SparkSessionProvider {
+  protected val catalogImpl: String
+  protected def format: String = if (catalogImpl == "hive") "hive" else "parquet"
+  protected val isSparkV2: Boolean = SPARK_VERSION.split("\\.").head == "2"
+  protected val isSparkV31OrGreater: Boolean = {
+    val parts = SPARK_VERSION.split("\\.").map(_.toInt)
+    (parts.head > 3) || (parts.head == 3 && parts(1) >= 1)
+  }
+  protected val isSparkV32OrGreater: Boolean = {
+    val parts = SPARK_VERSION.split("\\.").map(_.toInt)
+    (parts.head > 3) || (parts.head == 3 && parts(1) >= 2)
+  }
+
+  protected val extension: SparkSessionExtensions => Unit = _ => Unit
+  protected lazy val spark: SparkSession = {
+    val metastore = {
+      val path = Utils.createTempDir(namePrefix = "hms")
+      Files.delete(path)
+      path
+    }
+    SparkSession.builder()
+      .master("local")
+      .config("spark.ui.enabled", "false")
+      .config("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastore;create=true")
+      .config("spark.sql.catalogImplementation", catalogImpl)
+      .config(
+        "spark.sql.warehouse.dir",
+        Utils.createTempDir(namePrefix = "spark-warehouse").toString)
+      .withExtensions(extension)
+      .getOrCreate()
+  }
+
+  protected val sql: String => DataFrame = spark.sql
+
+}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkAuthorizerSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
similarity index 65%
rename from extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkAuthorizerSuite.scala
rename to extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 4ec3422c1..e3962609c 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkAuthorizerSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -22,23 +22,14 @@ import java.security.PrivilegedExceptionAction
 import scala.util.Try
 
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{Row, SparkSessionExtensions}
 
 import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider
 
-class RangerSparkAuthorizerSuite extends KyuubiFunSuite {
+abstract class RangerSparkExtensionSuite extends KyuubiFunSuite with SparkSessionProvider {
 
-  private lazy val spark: SparkSession = SparkSession.builder()
-    .master("local")
-    .withExtensions(new RangerSparkExtension)
-    .config("spark.ui.enabled", "false")
-    .config(
-      "spark.sql.warehouse.dir",
-      Utils.createTempDir(namePrefix = "spark-warehouse").toString)
-    .config("spark.sql.catalogImplementation", "in-memory")
-    .getOrCreate()
-
-  private val sql: String => DataFrame = spark.sql
+  override protected val extension: SparkSessionExtensions => Unit = new RangerSparkExtension
 
   private def doAs[T](user: String, f: => T): T = {
     UserGroupInformation.createRemoteUser(user).doAs[T](
@@ -59,7 +50,7 @@ class RangerSparkAuthorizerSuite extends KyuubiFunSuite {
     s"Permission denied: user [$user] does not have [$privilege] privilege on [$resource]"
   }
 
-  test("databases") {
+  test("auth: databases") {
     val testDb = "mydb"
     val create = s"CREATE DATABASE IF NOT EXISTS $testDb"
     val alter = s"ALTER DATABASE $testDb SET DBPROPERTIES (abc = '123')"
@@ -73,19 +64,19 @@ class RangerSparkAuthorizerSuite extends KyuubiFunSuite {
       val e1 = intercept[RuntimeException](sql(alter))
       assert(e1.getMessage === errorMessage("alter", "mydb"))
       val e2 = intercept[RuntimeException](sql(drop))
-      assert(e2.getMessage === (errorMessage("drop", "mydb")))
+      assert(e2.getMessage === errorMessage("drop", "mydb"))
       doAs("kent", Try(sql("SHOW DATABASES")).isSuccess)
     } finally {
       doAs("admin", sql(drop))
     }
   }
 
-  test("tables") {
+  test("auth: tables") {
     val db = "default"
     val table = "src"
     val col = "key"
 
-    val create0 = s"CREATE TABLE IF NOT EXISTS $db.$table ($col int, value int) USING parquet"
+    val create0 = s"CREATE TABLE IF NOT EXISTS $db.$table ($col int, value int) USING $format"
     val alter0 = s"ALTER TABLE $db.$table SET TBLPROPERTIES(key='ak')"
     val drop0 = s"DROP TABLE IF EXISTS $db.$table"
     val select = s"SELECT * FROM $db.$table"
@@ -123,7 +114,7 @@ class RangerSparkAuthorizerSuite extends KyuubiFunSuite {
     }
   }
 
-  test("functions") {
+  test("auth: functions") {
     val db = "default"
     val func = "func"
     val create0 = s"CREATE FUNCTION IF NOT EXISTS $db.$func AS 'abc.mnl.xyz'"
@@ -134,4 +125,53 @@ class RangerSparkAuthorizerSuite extends KyuubiFunSuite {
       })
     doAs("admin", assert(Try(sql(create0)).isSuccess))
   }
+
+  test("row level filter") {
+    val db = "default"
+    val table = "src"
+    val col = "key"
+    val create = s"CREATE TABLE IF NOT EXISTS $db.$table ($col int, value int) USING $format"
+    try {
+      doAs("admin", assert(Try { sql(create) }.isSuccess))
+      doAs("admin", sql(s"INSERT INTO $db.$table SELECT 1, 1"))
+      doAs("admin", sql(s"INSERT INTO $db.$table SELECT 20, 2"))
+      doAs("admin", sql(s"INSERT INTO $db.$table SELECT 30, 3"))
+
+      doAs(
+        "kent",
+        assert(sql(s"SELECT key FROM $db.$table order by key").collect() ===
+          Seq(Row(1), Row(20), Row(30))))
+
+      Seq(
+        s"SELECT value FROM $db.$table",
+        s"SELECT value as key FROM $db.$table",
+        s"SELECT max(value) FROM $db.$table",
+        s"SELECT coalesce(max(value), 1) FROM $db.$table",
+        s"SELECT value FROM $db.$table WHERE value in (SELECT value as key FROM $db.$table)")
+        .foreach { q =>
+          doAs(
+            "bob", {
+              withClue(q) {
+                assert(sql(q).collect() === Seq(Row(1)))
+              }
+            })
+        }
+      doAs(
+        "bob", {
+          sql(s"CREATE TABLE $db.src2 using $format AS SELECT value FROM $db.$table")
+          assert(sql(s"SELECT value FROM $db.${table}2").collect() === Seq(Row(1)))
+        })
+    } finally {
+      doAs("admin", sql(s"DROP TABLE IF EXISTS $db.${table}2"))
+      doAs("admin", sql(s"DROP TABLE IF EXISTS $db.$table"))
+    }
+  }
+}
+
+class InMemoryCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
+  override protected val catalogImpl: String = "in-memory"
+}
+
+class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
+  override protected val catalogImpl: String = "hive"
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala
new file mode 100644
index 000000000..ae1679579
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.ranger
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
+import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._
+
+class SparkRangerAdminPluginSuite extends KyuubiFunSuite {
+
+  test("get filter expression") {
+    val bob = UserGroupInformation.createRemoteUser("bob")
+    val are = AccessResource(ObjectType.TABLE, "default", "src", null)
+    def buildAccessRequest(ugi: UserGroupInformation): AccessRequest = {
+      AccessRequest(are, ugi, OperationType.QUERY, AccessType.SELECT)
+    }
+    val maybeString = getFilterExpr(buildAccessRequest(bob))
+    assert(maybeString.get === "key<20")
+    Seq("admin", "alice").foreach { user =>
+      val ugi = UserGroupInformation.createRemoteUser(user)
+      val maybeString = getFilterExpr(buildAccessRequest(ugi))
+      assert(maybeString.isEmpty)
+    }
+  }
+}