You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/14 08:39:04 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1451] Support Data Column Masking

This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new fa95281be [KYUUBI #1451] Support Data Column Masking
fa95281be is described below

commit fa95281becf93e4d6856626a72696a512a465e66
Author: Kent Yao <ya...@apache.org>
AuthorDate: Thu Apr 14 16:38:54 2022 +0800

    [KYUUBI #1451] Support Data Column Masking
    
    ### _Why are the changes needed?_
    
    Support Data Column Masking
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #2352 from yaooqinn/datamask.
    
    Closes #1451
    
    0b7010c0 [Kent Yao] Support Data Column Masking
    a27dc556 [Kent Yao] Support Data Column Masking
    a5d825ac [Kent Yao] Support Data Column Masking
    1b0bc5a5 [Kent Yao] Support Data Column Masking
    ce4b5110 [Kent Yao] Support Data Column Masking
    3a5798e3 [Kent Yao] Support Data Column Masking
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Kent Yao <ya...@apache.org>
---
 extensions/spark/kyuubi-spark-authz/README.md      |   2 +-
 .../plugin/spark/authz/PrivilegesBuilder.scala     |  11 +-
 .../spark/authz/ranger/RangerSparkExtension.scala  |   6 +-
 ...cala => RuleApplyRowFilterAndDataMasking.scala} |  34 ++-
 .../authz/ranger/SparkRangerAdminPlugin.scala      |  48 +++
 .../plugin/spark/authz/util/AuthZUtils.scala       |  56 ++++
 ...r.scala => RowFilterAndDataMaskingMarker.scala} |   2 +-
 ...ilterMarker.scala => RuleEliminateMarker.scala} |   4 +-
 .../src/test/resources/sparkSql_hive_jenkins.json  | 325 +++++++++++++++++++++
 .../authz/ranger/RangerSparkExtensionSuite.scala   |  69 +++++
 .../authz/ranger/SparkRangerAdminPluginSuite.scala |  21 ++
 11 files changed, 554 insertions(+), 24 deletions(-)

diff --git a/extensions/spark/kyuubi-spark-authz/README.md b/extensions/spark/kyuubi-spark-authz/README.md
index 06bf2d2b9..1360b6dc9 100644
--- a/extensions/spark/kyuubi-spark-authz/README.md
+++ b/extensions/spark/kyuubi-spark-authz/README.md
@@ -21,7 +21,7 @@
 
 - [x] Column-level fine-grained authorization
 - [x] Row-level fine-grained authorization, a.k.a. Row-level filtering
-- [ ] Data masking
+- [x] Data masking
 
 ## Build
 
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index f275877b3..3e7d503d8 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.plugin.spark.authz
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -34,10 +33,6 @@ import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
 
 object PrivilegesBuilder {
 
-  private val versionParts = SPARK_VERSION.split('.')
-  private val majorVersion: Int = versionParts.head.toInt
-  private val minorVersion: Int = versionParts(1).toInt
-
   private def quoteIfNeeded(part: String): String = {
     if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
       part
@@ -231,7 +226,7 @@ object PrivilegesBuilder {
       case "AnalyzeColumnCommand" =>
         val table = getTableIdent
         val cols =
-          if (majorVersion >= 3) {
+          if (isSparkVersionAtLeast("3.0")) {
             getPlanField[Option[Seq[String]]]("columnNames").getOrElse(Nil)
           } else {
             getPlanField[Seq[String]]("columnNames")
@@ -263,7 +258,7 @@ object PrivilegesBuilder {
         buildQuery(query, inputObjs)
 
       case "CacheTableCommand" =>
-        if (majorVersion == 3 && minorVersion == 1) {
+        if (isSparkVersionEqualTo("3.1")) {
           outputObjs += tablePrivileges(getMultipartIdentifier)
         } else {
           outputObjs += tablePrivileges(getTableIdent)
@@ -281,7 +276,7 @@ object PrivilegesBuilder {
         val view = getPlanField[TableIdentifier]("name")
         outputObjs += tablePrivileges(view)
         val query =
-          if (majorVersion < 3 || (majorVersion == 3 && minorVersion <= 1)) {
+          if (isSparkVersionAtMost("3.1")) {
             getPlanField[LogicalPlan]("child")
           } else {
             getPlanField[LogicalPlan]("plan")
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
index 949f4260d..daaa6754a 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
 
 import org.apache.spark.sql.SparkSessionExtensions
 
-import org.apache.kyuubi.plugin.spark.authz.util.RuleEliminateRowFilterMarker
+import org.apache.kyuubi.plugin.spark.authz.util.RuleEliminateMarker
 
 /**
  * ACL Management for Apache Spark SQL with Apache Ranger, enabling:
@@ -39,8 +39,8 @@ class RangerSparkExtension extends (SparkSessionExtensions => Unit) {
   SparkRangerAdminPlugin.init()
 
   override def apply(v1: SparkSessionExtensions): Unit = {
-    v1.injectResolutionRule(new RuleApplyRowFilter(_))
-    v1.injectPostHocResolutionRule(_ => new RuleEliminateRowFilterMarker())
+    v1.injectResolutionRule(new RuleApplyRowFilterAndDataMasking(_))
+    v1.injectPostHocResolutionRule(_ => new RuleEliminateMarker())
     v1.injectOptimizerRule(new RuleAuthorization(_))
   }
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
similarity index 65%
rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala
rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
index 2e3b1454e..5ea1d2737 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
@@ -19,44 +19,60 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.Rule
 
 import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
 import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
-import org.apache.kyuubi.plugin.spark.authz.util.RowFilterMarker
+import org.apache.kyuubi.plugin.spark.authz.util.RowFilterAndDataMaskingMarker
+
+class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[LogicalPlan] {
 
-class RuleApplyRowFilter(spark: SparkSession) extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan transformUp {
       case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) =>
         val table = getHiveTable(hiveTableRelation)
-        applyFilter(hiveTableRelation, table, spark)
+        applyFilterAndMasking(hiveTableRelation, table, spark)
       case logicalRelation if hasResolvedDatasourceTable(logicalRelation) =>
         val table = getDatasourceTable(logicalRelation)
         if (table.isEmpty) {
           logicalRelation
         } else {
-          applyFilter(logicalRelation, table.get, spark)
+          applyFilterAndMasking(logicalRelation, table.get, spark)
         }
     }
   }
 
-  private def applyFilter(
+  private def applyFilterAndMasking(
       plan: LogicalPlan,
       table: CatalogTable,
       spark: SparkSession): LogicalPlan = {
     val identifier = table.identifier
     val ugi = getAuthzUgi(spark.sparkContext)
     val opType = OperationType(plan.nodeName)
+    val parse = spark.sessionState.sqlParser.parseExpression _
     val are = AccessResource(ObjectType.TABLE, identifier.database.orNull, identifier.table, null)
     val art = AccessRequest(are, ugi, opType, AccessType.SELECT)
     val filterExprStr = SparkRangerAdminPlugin.getFilterExpr(art)
+    val newOutput = plan.output.map { attr =>
+      val are =
+        AccessResource(ObjectType.COLUMN, identifier.database.orNull, identifier.table, attr.name)
+      val art = AccessRequest(are, ugi, opType, AccessType.SELECT)
+      val maskExprStr = SparkRangerAdminPlugin.getMaskingExpr(art)
+      if (maskExprStr.isEmpty) {
+        attr
+      } else {
+        val maskExpr = parse(maskExprStr.get)
+        Alias(maskExpr, attr.name)()
+      }
+    }
+
     if (filterExprStr.isEmpty) {
-      plan
+      Project(newOutput, RowFilterAndDataMaskingMarker(plan))
     } else {
-      val filterExpr = spark.sessionState.sqlParser.parseExpression(filterExprStr.get)
-      Filter(filterExpr, RowFilterMarker(plan))
+      val filterExpr = parse(filterExprStr.get)
+      Project(newOutput, Filter(filterExpr, RowFilterAndDataMaskingMarker(plan)))
     }
   }
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
index b1471b7e5..a72c2eb75 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
@@ -19,6 +19,8 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
 
 import org.apache.ranger.plugin.service.RangerBasePlugin
 
+import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
+
 object SparkRangerAdminPlugin extends RangerBasePlugin("spark", "sparkSql") {
 
   def getFilterExpr(req: AccessRequest): Option[String] = {
@@ -28,4 +30,50 @@ object SparkRangerAdminPlugin extends RangerBasePlugin("spark", "sparkSql") {
       .map(_.getFilterExpr)
       .filter(fe => fe != null && fe.nonEmpty)
   }
+
+  def getMaskingExpr(req: AccessRequest): Option[String] = {
+    val col = req.getResource.asInstanceOf[AccessResource].getColumn
+    val result = evalDataMaskPolicies(req, null)
+    Option(result).filter(_.isMaskEnabled).map { res =>
+      if ("MASK_NULL".equalsIgnoreCase(res.getMaskType)) {
+        "NULL"
+      } else if ("CUSTOM".equalsIgnoreCase(result.getMaskType)) {
+        val maskVal = res.getMaskedValue
+        if (maskVal == null) {
+          "NULL"
+        } else {
+          s"${maskVal.replace("{col}", col)}"
+        }
+      } else if (result.getMaskTypeDef != null) {
+        result.getMaskTypeDef.getName match {
+          case "MASK" => regexp_replace(col)
+          case "MASK_SHOW_FIRST_4" if isSparkVersionAtLeast("3.1") =>
+            regexp_replace(col, hasLen = true)
+          case "MASK_SHOW_FIRST_4" =>
+            val right = regexp_replace(s"substr($col, 5)")
+            s"concat(substr($col, 0, 4), $right)"
+          case "MASK_SHOW_LAST_4" =>
+            val left = regexp_replace(s"left($col, length($col) - 4)")
+            s"concat($left, right($col, 4))"
+          case "MASK_HASH" => s"md5(cast($col as string))"
+          case "MASK_DATE_SHOW_YEAR" => s"date_trunc('YEAR', $col)"
+          case _ => result.getMaskTypeDef.getTransformer match {
+              case transformer if transformer != null && transformer.nonEmpty =>
+                s"${transformer.replace("{col}", col)}"
+              case _ => null
+            }
+        }
+      } else {
+        null
+      }
+    }
+  }
+
+  private def regexp_replace(expr: String, hasLen: Boolean = false): String = {
+    val pos = if (hasLen) ", 5" else ""
+    val upper = s"regexp_replace($expr, '[A-Z]', 'X'$pos)"
+    val lower = s"regexp_replace($upper, '[a-z]', 'x'$pos)"
+    val digits = s"regexp_replace($lower, '[0-9]', 'n'$pos)"
+    digits
+  }
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index e2e691fb3..41bb0f888 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.plugin.spark.authz.util
 import scala.util.{Failure, Success, Try}
 
 import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.SPARK_VERSION
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -72,4 +73,59 @@ private[authz] object AuthZUtils {
   def getDatasourceTable(plan: LogicalPlan): Option[CatalogTable] = {
     getFieldVal[Option[CatalogTable]](plan, "catalogTable")
   }
+
+  /**
+   * Given a Kyuubi/Spark/Hive version string,
+   * return the (major version number, minor version number).
+   * E.g., for 2.0.1-SNAPSHOT, return (2, 0).
+   */
+  def majorMinorVersion(version: String): (Int, Int) = {
+    """^(\d+)\.(\d+)(\..*)?$""".r.findFirstMatchIn(version) match {
+      case Some(m) =>
+        (m.group(1).toInt, m.group(2).toInt)
+      case None =>
+        throw new IllegalArgumentException(s"Tried to parse '$version' as a project" +
+          s" version string, but it could not find the major and minor version numbers.")
+    }
+  }
+
+  /**
+   * Given a Kyuubi/Spark/Hive version string, return the major version number.
+   * E.g., for 2.0.1-SNAPSHOT, return 2.
+   */
+  def majorVersion(version: String): Int = majorMinorVersion(version)._1
+
+  /**
+   * Given a Kyuubi/Spark/Hive version string, return the minor version number.
+   * E.g., for 2.0.1-SNAPSHOT, return 0.
+   */
+  def minorVersion(version: String): Int = majorMinorVersion(version)._2
+
+  def isSparkVersionAtMost(ver: String): Boolean = {
+    val runtimeMajor = majorVersion(SPARK_VERSION)
+    val targetMajor = majorVersion(ver)
+    (runtimeMajor < targetMajor) || {
+      val runtimeMinor = minorVersion(SPARK_VERSION)
+      val targetMinor = minorVersion(ver)
+      runtimeMajor == targetMajor && runtimeMinor <= targetMinor
+    }
+  }
+
+  def isSparkVersionAtLeast(ver: String): Boolean = {
+    val runtimeMajor = majorVersion(SPARK_VERSION)
+    val targetMajor = majorVersion(ver)
+    (runtimeMajor > targetMajor) || {
+      val runtimeMinor = minorVersion(SPARK_VERSION)
+      val targetMinor = minorVersion(ver)
+      runtimeMajor == targetMajor && runtimeMinor >= targetMinor
+    }
+  }
+
+  def isSparkVersionEqualTo(ver: String): Boolean = {
+    val runtimeMajor = majorVersion(SPARK_VERSION)
+    val targetMajor = majorVersion(ver)
+    val runtimeMinor = minorVersion(SPARK_VERSION)
+    val targetMinor = minorVersion(ver)
+    runtimeMajor == targetMajor && runtimeMinor == targetMinor
+  }
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterMarker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
similarity index 92%
rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterMarker.scala
rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
index 6b2e2d4d9..b6da24217 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
@@ -20,6 +20,6 @@ package org.apache.kyuubi.plugin.spark.authz.util
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 
-case class RowFilterMarker(table: LogicalPlan) extends LeafNode {
+case class RowFilterAndDataMaskingMarker(table: LogicalPlan) extends LeafNode {
   override def output: Seq[Attribute] = table.output
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateRowFilterMarker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
similarity index 88%
rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateRowFilterMarker.scala
rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
index 4ec8bea38..77e4083ab 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateRowFilterMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
@@ -20,8 +20,8 @@ package org.apache.kyuubi.plugin.spark.authz.util
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 
-class RuleEliminateRowFilterMarker extends Rule[LogicalPlan] {
+class RuleEliminateMarker extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformUp { case rf: RowFilterMarker => rf.table }
+    plan.transformUp { case rf: RowFilterAndDataMaskingMarker => rf.table }
   }
 }
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
index 62a7bda2b..38297b229 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
+++ b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
@@ -521,6 +521,331 @@
       "guid": "f588a9ed-f7b1-48f7-9d0d-c12cf2b9b7ed",
       "isEnabled": true,
       "version": 26
+    },
+    {
+      "service": "hive_jenkins",
+      "name": "src_value_hash",
+      "policyType": 1,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "database": {
+          "values": [
+            "default",
+            "spark_catalog"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "column": {
+          "values": [
+            "value1"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "table": {
+          "values": [
+            "src"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [
+        {
+          "dataMaskInfo": {
+            "dataMaskType": "MASK_HASH"
+          },
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "bob"
+          ],
+          "groups": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "rowFilterPolicyItems": [],
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [
+        ""
+      ],
+      "id": 5,
+      "guid": "ed1868a1-bf79-4721-a3d5-6815cc7d4986",
+      "isEnabled": true,
+      "version": 1
+    },
+    {
+      "service": "hive_jenkins",
+      "name": "src_value2_nullify",
+      "policyType": 1,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "database": {
+          "values": [
+            "default",
+            "spark_catalog"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "column": {
+          "values": [
+            "value2"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "table": {
+          "values": [
+            "src"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [
+        {
+          "dataMaskInfo": {
+            "dataMaskType": "MASK"
+          },
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "bob"
+          ],
+          "groups": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "rowFilterPolicyItems": [],
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [
+        ""
+      ],
+      "id": 6,
+      "guid": "98a04cd7-8d14-4466-adc9-126d87a3af69",
+      "isEnabled": true,
+      "version": 1
+    },
+    {
+      "service": "hive_jenkins",
+      "name": "src_value3_sf4",
+      "policyType": 1,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "database": {
+          "values": [
+            "default",
+            "spark_catalog"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "column": {
+          "values": [
+            "value3"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "table": {
+          "values": [
+            "src"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [
+        {
+          "dataMaskInfo": {
+            "dataMaskType": "MASK_SHOW_FIRST_4"
+          },
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "bob"
+          ],
+          "groups": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "rowFilterPolicyItems": [],
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [
+        ""
+      ],
+      "id": 7,
+      "guid": "9d50a525-b24c-4cf5-a885-d10d426368d1",
+      "isEnabled": true,
+      "version": 1
+    },
+    {
+      "service": "hive_jenkins",
+      "name": "src_value4_sf4",
+      "policyType": 1,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "database": {
+          "values": [
+            "default",
+            "spark_catalog"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "column": {
+          "values": [
+            "value4"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "table": {
+          "values": [
+            "src"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [
+        {
+          "dataMaskInfo": {
+            "dataMaskType": "MASK_DATE_SHOW_YEAR"
+          },
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "bob"
+          ],
+          "groups": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "rowFilterPolicyItems": [],
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [
+        ""
+      ],
+      "id": 8,
+      "guid": "9d50a526-b24c-4cf5-a885-d10d426368d1",
+      "isEnabled": true,
+      "version": 1
+    },
+    {
+      "service": "hive_jenkins",
+      "name": "src_value5_show_last_4",
+      "policyType": 1,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "database": {
+          "values": [
+            "default",
+            "spark_catalog"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "column": {
+          "values": [
+            "value5"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "table": {
+          "values": [
+            "src"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [
+        {
+          "dataMaskInfo": {
+            "dataMaskType": "MASK_SHOW_LAST_4"
+          },
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "bob"
+          ],
+          "groups": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "rowFilterPolicyItems": [],
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [
+        ""
+      ],
+      "id": 32,
+      "guid": "b3f1f1e0-2bd6-4b20-8a32-a531006ae151",
+      "isEnabled": true,
+      "version": 1
     }
   ],
   "serviceDef": {
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index e3962609c..631ed6e7d 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -18,9 +18,11 @@
 package org.apache.kyuubi.plugin.spark.authz.ranger
 
 import java.security.PrivilegedExceptionAction
+import java.sql.Timestamp
 
 import scala.util.Try
 
+import org.apache.commons.codec.digest.DigestUtils
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.spark.sql.{Row, SparkSessionExtensions}
 
@@ -166,6 +168,73 @@ abstract class RangerSparkExtensionSuite extends KyuubiFunSuite with SparkSessio
       doAs("admin", sql(s"DROP TABLE IF EXISTS $db.$table"))
     }
   }
+
+  test("data masking") {
+    val db = "default"
+    val table = "src"
+    val col = "key"
+    val create =
+      s"CREATE TABLE IF NOT EXISTS $db.$table" +
+        s" ($col int, value1 int, value2 string, value3 string, value4 timestamp, value5 string)" +
+        s" USING $format"
+    try {
+      doAs("admin", assert(Try { sql(create) }.isSuccess))
+      doAs(
+        "admin",
+        sql(
+          s"INSERT INTO $db.$table SELECT 1, 1, 'hello', 'world', " +
+            s"timestamp'2018-11-17 12:34:56', 'World'"))
+      doAs(
+        "admin",
+        sql(
+          s"INSERT INTO $db.$table SELECT 20, 2, 'kyuubi', 'y', " +
+            s"timestamp'2018-11-17 12:34:56', 'world'"))
+      doAs(
+        "admin",
+        sql(
+          s"INSERT INTO $db.$table SELECT 30, 3, 'spark', 'a'," +
+            s" timestamp'2018-11-17 12:34:56', 'world'"))
+
+      doAs(
+        "kent",
+        assert(sql(s"SELECT key FROM $db.$table order by key").collect() ===
+          Seq(Row(1), Row(20), Row(30))))
+
+      Seq(
+        s"SELECT value1, value2, value3, value4, value5 FROM $db.$table",
+        s"SELECT value1 as key, value2, value3, value4, value5 FROM $db.$table",
+        s"SELECT max(value1), max(value2), max(value3), max(value4), max(value5) FROM $db.$table",
+        s"SELECT coalesce(max(value1), 1), coalesce(max(value2), 1), coalesce(max(value3), 1), " +
+          s"coalesce(max(value4), timestamp '2018-01-01 22:33:44'), coalesce(max(value5), 1) " +
+          s"FROM $db.$table",
+        s"SELECT value1, value2, value3, value4, value5 FROM $db.$table WHERE value2 in" +
+          s" (SELECT value2 as key FROM $db.$table)")
+        .foreach { q =>
+          doAs(
+            "bob", {
+              withClue(q) {
+                assert(sql(q).collect() ===
+                  Seq(
+                    Row(
+                      DigestUtils.md5Hex("1"),
+                      "xxxxx",
+                      "worlx",
+                      Timestamp.valueOf("2018-01-01 00:00:00"),
+                      "Xorld")))
+              }
+            })
+        }
+      doAs(
+        "bob", {
+          sql(s"CREATE TABLE $db.src2 using $format AS SELECT value1 FROM $db.$table")
+          assert(sql(s"SELECT value1 FROM $db.${table}2").collect() ===
+            Seq(Row(DigestUtils.md5Hex("1"))))
+        })
+    } finally {
+      doAs("admin", sql(s"DROP TABLE IF EXISTS $db.${table}2"))
+      doAs("admin", sql(s"DROP TABLE IF EXISTS $db.$table"))
+    }
+  }
 }
 
 class InMemoryCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala
index ae1679579..73d638136 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala
@@ -39,4 +39,25 @@ class SparkRangerAdminPluginSuite extends KyuubiFunSuite {
       assert(maybeString.isEmpty)
     }
   }
+
+  test("get data masker") {
+    val bob = UserGroupInformation.createRemoteUser("bob")
+    def buildAccessRequest(ugi: UserGroupInformation, column: String): AccessRequest = {
+      val are = AccessResource(ObjectType.COLUMN, "default", "src", column)
+      AccessRequest(are, ugi, OperationType.QUERY, AccessType.SELECT)
+    }
+    assert(getMaskingExpr(buildAccessRequest(bob, "value1")).get === "md5(cast(value1 as string))")
+    assert(getMaskingExpr(buildAccessRequest(bob, "value2")).get ===
+      "regexp_replace(regexp_replace(regexp_replace(value2, '[A-Z]', 'X'), '[a-z]', 'x')," +
+      " '[0-9]', 'n')")
+    assert(getMaskingExpr(buildAccessRequest(bob, "value3")).get contains "regexp_replace")
+    assert(getMaskingExpr(buildAccessRequest(bob, "value4")).get === "date_trunc('YEAR', value4)")
+    assert(getMaskingExpr(buildAccessRequest(bob, "value5")).get contains "regexp_replace")
+
+    Seq("admin", "alice").foreach { user =>
+      val ugi = UserGroupInformation.createRemoteUser(user)
+      val maybeString = getMaskingExpr(buildAccessRequest(ugi, "value1"))
+      assert(maybeString.isEmpty)
+    }
+  }
 }