You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by li...@apache.org on 2020/03/17 14:18:58 UTC
[submarine] branch master updated: SUBMARINE-434. Refine data
masking plan resolution for spark security
This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 40cd081 SUBMARINE-434. Refine data masking plan resolution for spark security
40cd081 is described below
commit 40cd08109cadf50e34f8cb862a21dd7699ce9c73
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Tue Mar 17 21:07:51 2020 +0800
SUBMARINE-434. Refine data masking plan resolution for spark security
### What is this PR for?
Refine data masking plan resolution for spark security
Now we use parseExpression to generate alias for masked out, it is neater.
### What type of PR is it?
Refactoring
### Todos
* [ ] - Task
### What is the Jira issue?
* Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE-434
### How should this be tested?
add tests
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Kent Yao <ya...@hotmail.com>
Closes #233 from yaooqinn/SUBMARINE-434 and squashes the following commits:
a4cc16b [Kent Yao] SUBMARINE-434. Refine data masking plan resolution for spark security
---
.../optimizer/SubmarineDataMaskingExtension.scala | 149 +++++++++++----------
.../spark/security/RangerSparkAuditHandler.scala | 2 +-
.../spark/security/DataMaskingSQLTest.scala | 20 +++
3 files changed, 100 insertions(+), 71 deletions(-)
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
index 094abc7..a829eae 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
@@ -23,25 +23,26 @@ import scala.collection.mutable
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.ranger.plugin.model.RangerPolicy
import org.apache.ranger.plugin.policyengine.RangerAccessResult
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable, HiveTableRelation}
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, NamedExpression, SubqueryExpression}
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, Project, SubmarineDataMasking, Subquery}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, ExprId, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, CreateViewCommand, InsertIntoDataSourceDirCommand}
import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand}
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable}
-
-import org.apache.submarine.spark.security.{RangerSparkAccessRequest, RangerSparkAuditHandler, RangerSparkPlugin, RangerSparkResource, SparkAccessType}
+import org.apache.submarine.spark.security._
+import org.apache.submarine.spark.security.SparkObjectType.COLUMN
/**
* An Apache Spark's [[Optimizer]] extension for column data masking.
+ * TODO(kent yao) implement this as analyzer rule
*/
case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[LogicalPlan] {
- import RangerPolicy._
+ import org.apache.ranger.plugin.model.RangerPolicy._
// register all built-in masking udfs
Map("mask" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMask",
@@ -51,12 +52,71 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic
"mask_show_first_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskShowFirstN",
"mask_show_last_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskShowLastN")
.map(x => CatalogFunction(FunctionIdentifier(x._1), x._2, Seq.empty))
- .foreach(spark.sessionState.catalog.registerFunction(_, true))
+ .foreach(spark.sessionState.catalog.registerFunction(_, overrideIfExists = true))
private lazy val sparkPlugin = RangerSparkPlugin.build().getOrCreate()
private lazy val sqlParser = spark.sessionState.sqlParser
private lazy val analyzer = spark.sessionState.analyzer
- private lazy val rangerSparkOptimizer = new SubmarineSparkOptimizer(spark)
+ private lazy val auditHandler = RangerSparkAuditHandler()
+ private def currentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
+
+ /**
+ * Get RangerAccessResult from ranger admin or local policies, which contains data masking rules
+ */
+ private def getAccessResult(identifier: TableIdentifier, attr: Attribute): RangerAccessResult = {
+ val resource = RangerSparkResource(COLUMN, identifier.database, identifier.table, attr.name)
+ val req = new RangerSparkAccessRequest(
+ resource,
+ currentUser.getShortUserName,
+ currentUser.getGroupNames.toSet,
+ COLUMN.toString,
+ SparkAccessType.SELECT,
+ sparkPlugin.getClusterName)
+ sparkPlugin.evalDataMaskPolicies(req, auditHandler)
+ }
+
+ /**
+ * Generate an [[Alias]] expression with the access result and original expression, which can be
+ * used to replace the original output of the query.
+ *
+ * This alias contains a child, which might be null literal or [[UnresolvedFunction]]. When the
+ * child is function, it replace the argument which is [[UnresolvedAttribute]] with the input
+ * attribute to resolve directly.
+ */
+ private def getMasker(attr: Attribute, result: RangerAccessResult): Alias = {
+ val expr = if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_NULL)) {
+ "NULL"
+ } else if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_CUSTOM)) {
+ val maskVal = result.getMaskedValue
+ if (maskVal == null) {
+ "NULL"
+ } else {
+ s"${maskVal.replace("{col}", attr.name)}"
+ }
+ } else if (result.getMaskTypeDef != null) {
+ val transformer = result.getMaskTypeDef.getTransformer
+ if (StringUtils.isNotEmpty(transformer)) {
+ s"${transformer.replace("{col}", attr.name)}"
+ } else {
+ return null
+ }
+ } else {
+ return null
+ }
+
+ // sql expression text -> UnresolvedFunction
+ val parsed = sqlParser.parseExpression(expr)
+
+ // Here we replace the attribute with the resolved one, e.g.
+ // 'mask_show_last_n('value, 4, x, x, x, -1, 1)
+ // ->
+ // 'mask_show_last_n(value#37, 4, x, x, x, -1, 1)
+ val resolved = parsed mapChildren {
+ case _: UnresolvedAttribute => attr
+ case o => o
+ }
+ Alias(resolved, attr.name)()
+ }
/**
* Collecting transformers from Ranger data masking policies, and mapping the to the
@@ -70,65 +130,14 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic
plan: LogicalPlan,
table: CatalogTable,
aliases: mutable.Map[Alias, ExprId]): Map[ExprId, NamedExpression] = {
- val auditHandler = new RangerSparkAuditHandler()
- val ugi = UserGroupInformation.getCurrentUser
- val userName = ugi.getShortUserName
- val groups = ugi.getGroupNames.toSet
try {
- val identifier = table.identifier
- import org.apache.submarine.spark.security.SparkObjectType._
-
val maskEnableResults = plan.output.map { expr =>
- val resource = RangerSparkResource(COLUMN, identifier.database, identifier.table, expr.name)
- val req = new RangerSparkAccessRequest(resource, userName, groups, COLUMN.toString,
- SparkAccessType.SELECT, sparkPlugin.getClusterName)
- (expr, sparkPlugin.evalDataMaskPolicies(req, auditHandler))
+ expr -> getAccessResult(table.identifier, expr)
}.filter(x => isMaskEnabled(x._2))
- val originMaskers = maskEnableResults.map { case (expr, result) =>
- if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_NULL)) {
- val sql = s"SELECT NULL AS ${expr.name} FROM ${table.qualifiedName}"
- val plan = analyzer.execute(sqlParser.parsePlan(sql))
- (expr, plan)
- } else if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_CUSTOM)) {
- val maskVal = result.getMaskedValue
- if (maskVal == null) {
- val sql = s"SELECT NULL AS ${expr.name} FROM ${table.qualifiedName}"
- val plan = analyzer.execute(sqlParser.parsePlan(sql))
- (expr, plan)
- } else {
- val sql = s"SELECT ${maskVal.replace("{col}", expr.name)} AS ${expr.name} FROM" +
- s" ${table.qualifiedName}"
- val plan = analyzer.execute(sqlParser.parsePlan(sql))
- (expr, plan)
- }
- } else if (result.getMaskTypeDef != null) {
- val transformer = result.getMaskTypeDef.getTransformer
- if (StringUtils.isNotEmpty(transformer)) {
- val trans = transformer.replace("{col}", expr.name)
- val sql = s"SELECT $trans AS ${expr.name} FROM ${table.qualifiedName}"
- val plan = analyzer.execute(sqlParser.parsePlan(sql))
- (expr, plan)
- } else {
- (expr, null)
- }
- } else {
- (expr, null)
- }
- }.filter(_._2 != null)
-
- val formedMaskers: Map[ExprId, Alias] =
- originMaskers.map { case (expr, p) => (expr, p.asInstanceOf[Project].projectList.head) }
- .map { case (expr, attr) =>
- val originalAlias = attr.asInstanceOf[Alias]
- val newChild = originalAlias.child mapChildren {
- case _: AttributeReference => expr
- case o => o
- }
- val newAlias = originalAlias.copy(child = newChild)(
- originalAlias.exprId, originalAlias.qualifier, originalAlias.explicitMetadata)
- (expr.exprId, newAlias)
- }.toMap
+ val formedMaskers = maskEnableResults.map { case (expr, result) =>
+ expr.exprId -> getMasker(expr, result)
+ }.filter(_._2 != null).toMap
val aliasedMaskers = new mutable.HashMap[ExprId, Alias]()
for ((alias, id) <- aliases if formedMaskers.contains(id)) {
@@ -138,10 +147,10 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic
ar.copy(name = alias.name)(alias.exprId, alias.qualifier)
case o => o
}
- val newAlias = originalAlias.copy(child = newChild, alias.name)(
- originalAlias.exprId, originalAlias.qualifier, originalAlias.explicitMetadata)
+ val newAlias = Alias(newChild, alias.name)()
aliasedMaskers.put(alias.exprId, newAlias)
}
+
formedMaskers ++ aliasedMaskers
} catch {
case e: Exception => throw e
@@ -199,14 +208,14 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic
plan
}
- val marked = newPlan transformUp {
+ // Call spark analysis here explicitly to resolve UnresolvedFunctions
+ val marked = analyzer.execute(newPlan) transformUp {
case p if hasCatalogTable(p) => SubmarineDataMasking(p)
}
marked transformAllExpressions {
case s: SubqueryExpression =>
- val Subquery(newPlan) =
- rangerSparkOptimizer.execute(Subquery(SubmarineDataMasking(s.plan)))
+ val Subquery(newPlan) = Subquery(SubmarineDataMasking(s.plan))
s.withNewPlan(newPlan)
}
}
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuditHandler.scala b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuditHandler.scala
index 32eb351..fff830a 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuditHandler.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuditHandler.scala
@@ -21,7 +21,7 @@ package org.apache.submarine.spark.security
import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler
-class RangerSparkAuditHandler extends RangerDefaultAuditHandler {
+case class RangerSparkAuditHandler() extends RangerDefaultAuditHandler {
// TODO(Kent Yao): Implementing meaningfully audit functions
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
index b2de098..f8b6dff 100644
--- a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
+++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
@@ -113,6 +113,16 @@ case class DataMaskingSQLTest() extends FunSuite with BeforeAndAfterAll {
}
}
+ test("alias, non-alias coexists") {
+ val statement = "select key as k1, value, value v1 from default.src"
+ withUser("bob") {
+ val df = sql(statement)
+ val row = df.take(1)(0)
+ assert(row.getString(1).startsWith("x"), "values should be masked")
+ assert(row.getString(2).startsWith("x"), "values should be masked")
+ }
+ }
+
test("agg") {
val statement = "select sum(key) as k1, value v1 from default.src group by v1"
withUser("bob") {
@@ -158,6 +168,16 @@ case class DataMaskingSQLTest() extends FunSuite with BeforeAndAfterAll {
}
}
+ test("MASK_NULL") {
+ val statement = "select * from default.rangertbl4 where value = 'val_277'"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val row = df.take(1)(0)
+ assert(row.getString(1) === null, "value is hashed")
+ }
+ }
+
test("MASK_SHOW_LAST_4") {
val statement = "select * from default.rangertbl5 where value = 'val_277'"
withUser("bob") {
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org