You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ya...@apache.org on 2022/04/14 08:39:04 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1451] Support Data Column Masking
This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new fa95281be [KYUUBI #1451] Support Data Column Masking
fa95281be is described below
commit fa95281becf93e4d6856626a72696a512a465e66
Author: Kent Yao <ya...@apache.org>
AuthorDate: Thu Apr 14 16:38:54 2022 +0800
[KYUUBI #1451] Support Data Column Masking
### _Why are the changes needed?_
Support Data Column Masking
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2352 from yaooqinn/datamask.
Closes #1451
0b7010c0 [Kent Yao] Support Data Column Masking
a27dc556 [Kent Yao] Support Data Column Masking
a5d825ac [Kent Yao] Support Data Column Masking
1b0bc5a5 [Kent Yao] Support Data Column Masking
ce4b5110 [Kent Yao] Support Data Column Masking
3a5798e3 [Kent Yao] Support Data Column Masking
Authored-by: Kent Yao <ya...@apache.org>
Signed-off-by: Kent Yao <ya...@apache.org>
---
extensions/spark/kyuubi-spark-authz/README.md | 2 +-
.../plugin/spark/authz/PrivilegesBuilder.scala | 11 +-
.../spark/authz/ranger/RangerSparkExtension.scala | 6 +-
...cala => RuleApplyRowFilterAndDataMasking.scala} | 34 ++-
.../authz/ranger/SparkRangerAdminPlugin.scala | 48 +++
.../plugin/spark/authz/util/AuthZUtils.scala | 56 ++++
...r.scala => RowFilterAndDataMaskingMarker.scala} | 2 +-
...ilterMarker.scala => RuleEliminateMarker.scala} | 4 +-
.../src/test/resources/sparkSql_hive_jenkins.json | 325 +++++++++++++++++++++
.../authz/ranger/RangerSparkExtensionSuite.scala | 69 +++++
.../authz/ranger/SparkRangerAdminPluginSuite.scala | 21 ++
11 files changed, 554 insertions(+), 24 deletions(-)
diff --git a/extensions/spark/kyuubi-spark-authz/README.md b/extensions/spark/kyuubi-spark-authz/README.md
index 06bf2d2b9..1360b6dc9 100644
--- a/extensions/spark/kyuubi-spark-authz/README.md
+++ b/extensions/spark/kyuubi-spark-authz/README.md
@@ -21,7 +21,7 @@
- [x] Column-level fine-grained authorization
- [x] Row-level fine-grained authorization, a.k.a. Row-level filtering
-- [ ] Data masking
+- [x] Data masking
## Build
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index f275877b3..3e7d503d8 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.plugin.spark.authz
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -34,10 +33,6 @@ import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
object PrivilegesBuilder {
- private val versionParts = SPARK_VERSION.split('.')
- private val majorVersion: Int = versionParts.head.toInt
- private val minorVersion: Int = versionParts(1).toInt
-
private def quoteIfNeeded(part: String): String = {
if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {
part
@@ -231,7 +226,7 @@ object PrivilegesBuilder {
case "AnalyzeColumnCommand" =>
val table = getTableIdent
val cols =
- if (majorVersion >= 3) {
+ if (isSparkVersionAtLeast("3.0")) {
getPlanField[Option[Seq[String]]]("columnNames").getOrElse(Nil)
} else {
getPlanField[Seq[String]]("columnNames")
@@ -263,7 +258,7 @@ object PrivilegesBuilder {
buildQuery(query, inputObjs)
case "CacheTableCommand" =>
- if (majorVersion == 3 && minorVersion == 1) {
+ if (isSparkVersionEqualTo("3.1")) {
outputObjs += tablePrivileges(getMultipartIdentifier)
} else {
outputObjs += tablePrivileges(getTableIdent)
@@ -281,7 +276,7 @@ object PrivilegesBuilder {
val view = getPlanField[TableIdentifier]("name")
outputObjs += tablePrivileges(view)
val query =
- if (majorVersion < 3 || (majorVersion == 3 && minorVersion <= 1)) {
+ if (isSparkVersionAtMost("3.1")) {
getPlanField[LogicalPlan]("child")
} else {
getPlanField[LogicalPlan]("plan")
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
index 949f4260d..daaa6754a 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
import org.apache.spark.sql.SparkSessionExtensions
-import org.apache.kyuubi.plugin.spark.authz.util.RuleEliminateRowFilterMarker
+import org.apache.kyuubi.plugin.spark.authz.util.RuleEliminateMarker
/**
* ACL Management for Apache Spark SQL with Apache Ranger, enabling:
@@ -39,8 +39,8 @@ class RangerSparkExtension extends (SparkSessionExtensions => Unit) {
SparkRangerAdminPlugin.init()
override def apply(v1: SparkSessionExtensions): Unit = {
- v1.injectResolutionRule(new RuleApplyRowFilter(_))
- v1.injectPostHocResolutionRule(_ => new RuleEliminateRowFilterMarker())
+ v1.injectResolutionRule(new RuleApplyRowFilterAndDataMasking(_))
+ v1.injectPostHocResolutionRule(_ => new RuleEliminateMarker())
v1.injectOptimizerRule(new RuleAuthorization(_))
}
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
similarity index 65%
rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala
rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
index 2e3b1454e..5ea1d2737 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilter.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleApplyRowFilterAndDataMasking.scala
@@ -19,44 +19,60 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.kyuubi.plugin.spark.authz.{ObjectType, OperationType}
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
-import org.apache.kyuubi.plugin.spark.authz.util.RowFilterMarker
+import org.apache.kyuubi.plugin.spark.authz.util.RowFilterAndDataMaskingMarker
+
+class RuleApplyRowFilterAndDataMasking(spark: SparkSession) extends Rule[LogicalPlan] {
-class RuleApplyRowFilter(spark: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
case hiveTableRelation if hasResolvedHiveTable(hiveTableRelation) =>
val table = getHiveTable(hiveTableRelation)
- applyFilter(hiveTableRelation, table, spark)
+ applyFilterAndMasking(hiveTableRelation, table, spark)
case logicalRelation if hasResolvedDatasourceTable(logicalRelation) =>
val table = getDatasourceTable(logicalRelation)
if (table.isEmpty) {
logicalRelation
} else {
- applyFilter(logicalRelation, table.get, spark)
+ applyFilterAndMasking(logicalRelation, table.get, spark)
}
}
}
- private def applyFilter(
+ private def applyFilterAndMasking(
plan: LogicalPlan,
table: CatalogTable,
spark: SparkSession): LogicalPlan = {
val identifier = table.identifier
val ugi = getAuthzUgi(spark.sparkContext)
val opType = OperationType(plan.nodeName)
+ val parse = spark.sessionState.sqlParser.parseExpression _
val are = AccessResource(ObjectType.TABLE, identifier.database.orNull, identifier.table, null)
val art = AccessRequest(are, ugi, opType, AccessType.SELECT)
val filterExprStr = SparkRangerAdminPlugin.getFilterExpr(art)
+ val newOutput = plan.output.map { attr =>
+ val are =
+ AccessResource(ObjectType.COLUMN, identifier.database.orNull, identifier.table, attr.name)
+ val art = AccessRequest(are, ugi, opType, AccessType.SELECT)
+ val maskExprStr = SparkRangerAdminPlugin.getMaskingExpr(art)
+ if (maskExprStr.isEmpty) {
+ attr
+ } else {
+ val maskExpr = parse(maskExprStr.get)
+ Alias(maskExpr, attr.name)()
+ }
+ }
+
if (filterExprStr.isEmpty) {
- plan
+ Project(newOutput, RowFilterAndDataMaskingMarker(plan))
} else {
- val filterExpr = spark.sessionState.sqlParser.parseExpression(filterExprStr.get)
- Filter(filterExpr, RowFilterMarker(plan))
+ val filterExpr = parse(filterExprStr.get)
+ Project(newOutput, Filter(filterExpr, RowFilterAndDataMaskingMarker(plan)))
}
}
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
index b1471b7e5..a72c2eb75 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPlugin.scala
@@ -19,6 +19,8 @@ package org.apache.kyuubi.plugin.spark.authz.ranger
import org.apache.ranger.plugin.service.RangerBasePlugin
+import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
+
object SparkRangerAdminPlugin extends RangerBasePlugin("spark", "sparkSql") {
def getFilterExpr(req: AccessRequest): Option[String] = {
@@ -28,4 +30,50 @@ object SparkRangerAdminPlugin extends RangerBasePlugin("spark", "sparkSql") {
.map(_.getFilterExpr)
.filter(fe => fe != null && fe.nonEmpty)
}
+
+ def getMaskingExpr(req: AccessRequest): Option[String] = {
+ val col = req.getResource.asInstanceOf[AccessResource].getColumn
+ val result = evalDataMaskPolicies(req, null)
+ Option(result).filter(_.isMaskEnabled).map { res =>
+ if ("MASK_NULL".equalsIgnoreCase(res.getMaskType)) {
+ "NULL"
+ } else if ("CUSTOM".equalsIgnoreCase(result.getMaskType)) {
+ val maskVal = res.getMaskedValue
+ if (maskVal == null) {
+ "NULL"
+ } else {
+ s"${maskVal.replace("{col}", col)}"
+ }
+ } else if (result.getMaskTypeDef != null) {
+ result.getMaskTypeDef.getName match {
+ case "MASK" => regexp_replace(col)
+ case "MASK_SHOW_FIRST_4" if isSparkVersionAtLeast("3.1") =>
+ regexp_replace(col, hasLen = true)
+ case "MASK_SHOW_FIRST_4" =>
+ val right = regexp_replace(s"substr($col, 5)")
+ s"concat(substr($col, 0, 4), $right)"
+ case "MASK_SHOW_LAST_4" =>
+ val left = regexp_replace(s"left($col, length($col) - 4)")
+ s"concat($left, right($col, 4))"
+ case "MASK_HASH" => s"md5(cast($col as string))"
+ case "MASK_DATE_SHOW_YEAR" => s"date_trunc('YEAR', $col)"
+ case _ => result.getMaskTypeDef.getTransformer match {
+ case transformer if transformer != null && transformer.nonEmpty =>
+ s"${transformer.replace("{col}", col)}"
+ case _ => null
+ }
+ }
+ } else {
+ null
+ }
+ }
+ }
+
+ private def regexp_replace(expr: String, hasLen: Boolean = false): String = {
+ val pos = if (hasLen) ", 5" else ""
+ val upper = s"regexp_replace($expr, '[A-Z]', 'X'$pos)"
+ val lower = s"regexp_replace($upper, '[a-z]', 'x'$pos)"
+ val digits = s"regexp_replace($lower, '[0-9]', 'n'$pos)"
+ digits
+ }
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index e2e691fb3..41bb0f888 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -20,6 +20,7 @@ package org.apache.kyuubi.plugin.spark.authz.util
import scala.util.{Failure, Success, Try}
import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.SPARK_VERSION
import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -72,4 +73,59 @@ private[authz] object AuthZUtils {
def getDatasourceTable(plan: LogicalPlan): Option[CatalogTable] = {
getFieldVal[Option[CatalogTable]](plan, "catalogTable")
}
+
+ /**
+ * Given a Kyuubi/Spark/Hive version string,
+ * return the (major version number, minor version number).
+ * E.g., for 2.0.1-SNAPSHOT, return (2, 0).
+ */
+ def majorMinorVersion(version: String): (Int, Int) = {
+ """^(\d+)\.(\d+)(\..*)?$""".r.findFirstMatchIn(version) match {
+ case Some(m) =>
+ (m.group(1).toInt, m.group(2).toInt)
+ case None =>
+ throw new IllegalArgumentException(s"Tried to parse '$version' as a project" +
+ s" version string, but it could not find the major and minor version numbers.")
+ }
+ }
+
+ /**
+ * Given a Kyuubi/Spark/Hive version string, return the major version number.
+ * E.g., for 2.0.1-SNAPSHOT, return 2.
+ */
+ def majorVersion(version: String): Int = majorMinorVersion(version)._1
+
+ /**
+ * Given a Kyuubi/Spark/Hive version string, return the minor version number.
+ * E.g., for 2.0.1-SNAPSHOT, return 0.
+ */
+ def minorVersion(version: String): Int = majorMinorVersion(version)._2
+
+ def isSparkVersionAtMost(ver: String): Boolean = {
+ val runtimeMajor = majorVersion(SPARK_VERSION)
+ val targetMajor = majorVersion(ver)
+ (runtimeMajor < targetMajor) || {
+ val runtimeMinor = minorVersion(SPARK_VERSION)
+ val targetMinor = minorVersion(ver)
+ runtimeMajor == targetMajor && runtimeMinor <= targetMinor
+ }
+ }
+
+ def isSparkVersionAtLeast(ver: String): Boolean = {
+ val runtimeMajor = majorVersion(SPARK_VERSION)
+ val targetMajor = majorVersion(ver)
+ (runtimeMajor > targetMajor) || {
+ val runtimeMinor = minorVersion(SPARK_VERSION)
+ val targetMinor = minorVersion(ver)
+ runtimeMajor == targetMajor && runtimeMinor >= targetMinor
+ }
+ }
+
+ def isSparkVersionEqualTo(ver: String): Boolean = {
+ val runtimeMajor = majorVersion(SPARK_VERSION)
+ val targetMajor = majorVersion(ver)
+ val runtimeMinor = minorVersion(SPARK_VERSION)
+ val targetMinor = minorVersion(ver)
+ runtimeMajor == targetMajor && runtimeMinor == targetMinor
+ }
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterMarker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
similarity index 92%
rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterMarker.scala
rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
index 6b2e2d4d9..b6da24217 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RowFilterAndDataMaskingMarker.scala
@@ -20,6 +20,6 @@ package org.apache.kyuubi.plugin.spark.authz.util
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
-case class RowFilterMarker(table: LogicalPlan) extends LeafNode {
+case class RowFilterAndDataMaskingMarker(table: LogicalPlan) extends LeafNode {
override def output: Seq[Attribute] = table.output
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateRowFilterMarker.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
similarity index 88%
rename from extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateRowFilterMarker.scala
rename to extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
index 4ec8bea38..77e4083ab 100644
--- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateRowFilterMarker.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/RuleEliminateMarker.scala
@@ -20,8 +20,8 @@ package org.apache.kyuubi.plugin.spark.authz.util
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-class RuleEliminateRowFilterMarker extends Rule[LogicalPlan] {
+class RuleEliminateMarker extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
- plan.transformUp { case rf: RowFilterMarker => rf.table }
+ plan.transformUp { case rf: RowFilterAndDataMaskingMarker => rf.table }
}
}
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
index 62a7bda2b..38297b229 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
+++ b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
@@ -521,6 +521,331 @@
"guid": "f588a9ed-f7b1-48f7-9d0d-c12cf2b9b7ed",
"isEnabled": true,
"version": 26
+ },
+ {
+ "service": "hive_jenkins",
+ "name": "src_value_hash",
+ "policyType": 1,
+ "policyPriority": 0,
+ "description": "",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": [
+ "default",
+ "spark_catalog"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "column": {
+ "values": [
+ "value1"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "table": {
+ "values": [
+ "src"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ }
+ },
+ "policyItems": [],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [
+ {
+ "dataMaskInfo": {
+ "dataMaskType": "MASK_HASH"
+ },
+ "accesses": [
+ {
+ "type": "select",
+ "isAllowed": true
+ }
+ ],
+ "users": [
+ "bob"
+ ],
+ "groups": [],
+ "conditions": [],
+ "delegateAdmin": false
+ }
+ ],
+ "rowFilterPolicyItems": [],
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [
+ ""
+ ],
+ "id": 5,
+ "guid": "ed1868a1-bf79-4721-a3d5-6815cc7d4986",
+ "isEnabled": true,
+ "version": 1
+ },
+ {
+ "service": "hive_jenkins",
+ "name": "src_value2_nullify",
+ "policyType": 1,
+ "policyPriority": 0,
+ "description": "",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": [
+ "default",
+ "spark_catalog"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "column": {
+ "values": [
+ "value2"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "table": {
+ "values": [
+ "src"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ }
+ },
+ "policyItems": [],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [
+ {
+ "dataMaskInfo": {
+ "dataMaskType": "MASK"
+ },
+ "accesses": [
+ {
+ "type": "select",
+ "isAllowed": true
+ }
+ ],
+ "users": [
+ "bob"
+ ],
+ "groups": [],
+ "conditions": [],
+ "delegateAdmin": false
+ }
+ ],
+ "rowFilterPolicyItems": [],
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [
+ ""
+ ],
+ "id": 6,
+ "guid": "98a04cd7-8d14-4466-adc9-126d87a3af69",
+ "isEnabled": true,
+ "version": 1
+ },
+ {
+ "service": "hive_jenkins",
+ "name": "src_value3_sf4",
+ "policyType": 1,
+ "policyPriority": 0,
+ "description": "",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": [
+ "default",
+ "spark_catalog"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "column": {
+ "values": [
+ "value3"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "table": {
+ "values": [
+ "src"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ }
+ },
+ "policyItems": [],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [
+ {
+ "dataMaskInfo": {
+ "dataMaskType": "MASK_SHOW_FIRST_4"
+ },
+ "accesses": [
+ {
+ "type": "select",
+ "isAllowed": true
+ }
+ ],
+ "users": [
+ "bob"
+ ],
+ "groups": [],
+ "conditions": [],
+ "delegateAdmin": false
+ }
+ ],
+ "rowFilterPolicyItems": [],
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [
+ ""
+ ],
+ "id": 7,
+ "guid": "9d50a525-b24c-4cf5-a885-d10d426368d1",
+ "isEnabled": true,
+ "version": 1
+ },
+ {
+ "service": "hive_jenkins",
+ "name": "src_value4_sf4",
+ "policyType": 1,
+ "policyPriority": 0,
+ "description": "",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": [
+ "default",
+ "spark_catalog"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "column": {
+ "values": [
+ "value4"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "table": {
+ "values": [
+ "src"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ }
+ },
+ "policyItems": [],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [
+ {
+ "dataMaskInfo": {
+ "dataMaskType": "MASK_DATE_SHOW_YEAR"
+ },
+ "accesses": [
+ {
+ "type": "select",
+ "isAllowed": true
+ }
+ ],
+ "users": [
+ "bob"
+ ],
+ "groups": [],
+ "conditions": [],
+ "delegateAdmin": false
+ }
+ ],
+ "rowFilterPolicyItems": [],
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [
+ ""
+ ],
+ "id": 8,
+ "guid": "9d50a526-b24c-4cf5-a885-d10d426368d1",
+ "isEnabled": true,
+ "version": 1
+ },
+ {
+ "service": "hive_jenkins",
+ "name": "src_value5_show_last_4",
+ "policyType": 1,
+ "policyPriority": 0,
+ "description": "",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": [
+ "default",
+ "spark_catalog"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "column": {
+ "values": [
+ "value5"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "table": {
+ "values": [
+ "src"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ }
+ },
+ "policyItems": [],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [
+ {
+ "dataMaskInfo": {
+ "dataMaskType": "MASK_SHOW_LAST_4"
+ },
+ "accesses": [
+ {
+ "type": "select",
+ "isAllowed": true
+ }
+ ],
+ "users": [
+ "bob"
+ ],
+ "groups": [],
+ "conditions": [],
+ "delegateAdmin": false
+ }
+ ],
+ "rowFilterPolicyItems": [],
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [
+ ""
+ ],
+ "id": 32,
+ "guid": "b3f1f1e0-2bd6-4b20-8a32-a531006ae151",
+ "isEnabled": true,
+ "version": 1
}
],
"serviceDef": {
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index e3962609c..631ed6e7d 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -18,9 +18,11 @@
package org.apache.kyuubi.plugin.spark.authz.ranger
import java.security.PrivilegedExceptionAction
+import java.sql.Timestamp
import scala.util.Try
+import org.apache.commons.codec.digest.DigestUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.sql.{Row, SparkSessionExtensions}
@@ -166,6 +168,73 @@ abstract class RangerSparkExtensionSuite extends KyuubiFunSuite with SparkSessio
doAs("admin", sql(s"DROP TABLE IF EXISTS $db.$table"))
}
}
+
+ test("data masking") {
+ val db = "default"
+ val table = "src"
+ val col = "key"
+ val create =
+ s"CREATE TABLE IF NOT EXISTS $db.$table" +
+ s" ($col int, value1 int, value2 string, value3 string, value4 timestamp, value5 string)" +
+ s" USING $format"
+ try {
+ doAs("admin", assert(Try { sql(create) }.isSuccess))
+ doAs(
+ "admin",
+ sql(
+ s"INSERT INTO $db.$table SELECT 1, 1, 'hello', 'world', " +
+ s"timestamp'2018-11-17 12:34:56', 'World'"))
+ doAs(
+ "admin",
+ sql(
+ s"INSERT INTO $db.$table SELECT 20, 2, 'kyuubi', 'y', " +
+ s"timestamp'2018-11-17 12:34:56', 'world'"))
+ doAs(
+ "admin",
+ sql(
+ s"INSERT INTO $db.$table SELECT 30, 3, 'spark', 'a'," +
+ s" timestamp'2018-11-17 12:34:56', 'world'"))
+
+ doAs(
+ "kent",
+ assert(sql(s"SELECT key FROM $db.$table order by key").collect() ===
+ Seq(Row(1), Row(20), Row(30))))
+
+ Seq(
+ s"SELECT value1, value2, value3, value4, value5 FROM $db.$table",
+ s"SELECT value1 as key, value2, value3, value4, value5 FROM $db.$table",
+ s"SELECT max(value1), max(value2), max(value3), max(value4), max(value5) FROM $db.$table",
+ s"SELECT coalesce(max(value1), 1), coalesce(max(value2), 1), coalesce(max(value3), 1), " +
+ s"coalesce(max(value4), timestamp '2018-01-01 22:33:44'), coalesce(max(value5), 1) " +
+ s"FROM $db.$table",
+ s"SELECT value1, value2, value3, value4, value5 FROM $db.$table WHERE value2 in" +
+ s" (SELECT value2 as key FROM $db.$table)")
+ .foreach { q =>
+ doAs(
+ "bob", {
+ withClue(q) {
+ assert(sql(q).collect() ===
+ Seq(
+ Row(
+ DigestUtils.md5Hex("1"),
+ "xxxxx",
+ "worlx",
+ Timestamp.valueOf("2018-01-01 00:00:00"),
+ "Xorld")))
+ }
+ })
+ }
+ doAs(
+ "bob", {
+ sql(s"CREATE TABLE $db.src2 using $format AS SELECT value1 FROM $db.$table")
+ assert(sql(s"SELECT value1 FROM $db.${table}2").collect() ===
+ Seq(Row(DigestUtils.md5Hex("1"))))
+ })
+ } finally {
+ doAs("admin", sql(s"DROP TABLE IF EXISTS $db.${table}2"))
+ doAs("admin", sql(s"DROP TABLE IF EXISTS $db.$table"))
+ }
+ }
}
class InMemoryCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala
index ae1679579..73d638136 100644
--- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala
+++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/SparkRangerAdminPluginSuite.scala
@@ -39,4 +39,25 @@ class SparkRangerAdminPluginSuite extends KyuubiFunSuite {
assert(maybeString.isEmpty)
}
}
+
+ test("get data masker") {
+ val bob = UserGroupInformation.createRemoteUser("bob")
+ def buildAccessRequest(ugi: UserGroupInformation, column: String): AccessRequest = {
+ val are = AccessResource(ObjectType.COLUMN, "default", "src", column)
+ AccessRequest(are, ugi, OperationType.QUERY, AccessType.SELECT)
+ }
+ assert(getMaskingExpr(buildAccessRequest(bob, "value1")).get === "md5(cast(value1 as string))")
+ assert(getMaskingExpr(buildAccessRequest(bob, "value2")).get ===
+ "regexp_replace(regexp_replace(regexp_replace(value2, '[A-Z]', 'X'), '[a-z]', 'x')," +
+ " '[0-9]', 'n')")
+ assert(getMaskingExpr(buildAccessRequest(bob, "value3")).get contains "regexp_replace")
+ assert(getMaskingExpr(buildAccessRequest(bob, "value4")).get === "date_trunc('YEAR', value4)")
+ assert(getMaskingExpr(buildAccessRequest(bob, "value5")).get contains "regexp_replace")
+
+ Seq("admin", "alice").foreach { user =>
+ val ugi = UserGroupInformation.createRemoteUser(user)
+ val maybeString = getMaskingExpr(buildAccessRequest(ugi, "value1"))
+ assert(maybeString.isEmpty)
+ }
+ }
}