You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by li...@apache.org on 2020/03/11 06:57:34 UTC
[submarine] branch master updated: SUBMARINE-418. Support Data
Masking for spark-security
This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 97d6dfc SUBMARINE-418. Support Data Masking for spark-security
97d6dfc is described below
commit 97d6dfc9e1c24f3869e6802a4afea58472b75ec4
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Wed Mar 11 14:21:42 2020 +0800
SUBMARINE-418. Support Data Masking for spark-security
### What is this PR for?
add data masking for spark-security, which enable administrators to mask sensitive data per column
e.g. phone num 12345678901 can be masked only show last 4 - nnnnnnn8901
### What type of PR is it?
feature
### Todos
### What is the Jira issue?
* Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE-418
### How should this be tested?
add unit tests
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? /No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Kent Yao <ya...@hotmail.com>
Closes #219 from yaooqinn/SUBMARINE-418 and squashes the following commits:
d2a70ea [Kent Yao] nit
95cc726 [Kent Yao] SUBMARINE-418. Support Data Masking
---
.../optimizer/SubmarineDataMaskingExtension.scala | 227 +++++++++++++++++++++
.../plans/logical/SubmarineDataMasking.scala} | 18 +-
.../execution/SubmarineSparkPlanOmitStrategy.scala | 5 +-
.../spark/security/RangerSparkSQLExtension.scala | 3 +-
.../src/test/resources/sparkSql_hive_jenkins.json | 64 ++++++
.../org/apache/spark/sql/SubmarineSparkUtils.scala | 7 +-
.../SubmarineDataMaskingExtensionTest.scala | 48 +++++
.../spark/security/DataMaskingSQLTest.scala | 189 +++++++++++++++++
8 files changed, 546 insertions(+), 15 deletions(-)
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
new file mode 100644
index 0000000..7e2b99c
--- /dev/null
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.ranger.plugin.model.RangerPolicy
+import org.apache.ranger.plugin.policyengine.RangerAccessResult
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, Project, SubmarineDataMasking, Subquery}
+import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, CreateViewCommand, InsertIntoDataSourceDirCommand}
+import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand}
+import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable}
+
+import org.apache.submarine.spark.security.{RangerSparkAccessRequest, RangerSparkAuditHandler, RangerSparkPlugin, RangerSparkResource, SparkAccessType}
+
+/**
+ * An Apache Spark's [[Optimizer]] extension for column data masking.
+ */
+case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[LogicalPlan] {
+ import RangerPolicy._
+
+ // register all built-in masking udfs
+ Map("mask" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMask",
+ "mask_first_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskFirstN",
+ "mask_hash" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash",
+ "mask_last_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskLastN",
+ "mask_show_first_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskShowFirstN",
+ "mask_show_last_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskShowLastN")
+ .map(x => CatalogFunction(FunctionIdentifier(x._1), x._2, Seq.empty))
+ .foreach(spark.sessionState.catalog.registerFunction(_, true))
+
+ private lazy val sparkPlugin = RangerSparkPlugin.build().getOrCreate()
+ private lazy val sqlParser = spark.sessionState.sqlParser
+ private lazy val analyzer = spark.sessionState.analyzer
+ private lazy val rangerSparkOptimizer = new SubmarineSparkOptimizer(spark)
+
+ /**
+ * Collecting transformers from Ranger data masking policies, and mapping the to the
+ * [[LogicalPlan]] output attributes.
+ *
+ * @param plan the original logical plan with a underlying catalog table
+ * @param table the catalog table
+ * @return a list of key-value pairs of original expression with its masking representation
+ */
+ private def collectTransformers(
+ plan: LogicalPlan,
+ table: CatalogTable,
+ aliases: mutable.Map[Alias, ExprId]): Map[ExprId, NamedExpression] = {
+ val auditHandler = new RangerSparkAuditHandler()
+ val ugi = UserGroupInformation.getCurrentUser
+ val userName = ugi.getShortUserName
+ val groups = ugi.getGroupNames.toSet
+ try {
+ val identifier = table.identifier
+ import org.apache.submarine.spark.security.SparkObjectType._
+
+ val maskEnableResults = plan.output.map { expr =>
+ val resource = RangerSparkResource(COLUMN, identifier.database, identifier.table, expr.name)
+ val req = new RangerSparkAccessRequest(resource, userName, groups, COLUMN.toString,
+ SparkAccessType.SELECT, sparkPlugin.getClusterName)
+ (expr, sparkPlugin.evalDataMaskPolicies(req, auditHandler))
+ }.filter(x => isMaskEnabled(x._2))
+
+ val originMaskers = maskEnableResults.map { case (expr, result) =>
+ if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_NULL)) {
+ val sql = s"SELECT NULL AS ${expr.name} FROM ${table.qualifiedName}"
+ val plan = analyzer.execute(sqlParser.parsePlan(sql))
+ (expr, plan)
+ } else if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_CUSTOM)) {
+ val maskVal = result.getMaskedValue
+ if (maskVal == null) {
+ val sql = s"SELECT NULL AS ${expr.name} FROM ${table.qualifiedName}"
+ val plan = analyzer.execute(sqlParser.parsePlan(sql))
+ (expr, plan)
+ } else {
+ val sql = s"SELECT ${maskVal.replace("{col}", expr.name)} AS ${expr.name} FROM" +
+ s" ${table.qualifiedName}"
+ val plan = analyzer.execute(sqlParser.parsePlan(sql))
+ (expr, plan)
+ }
+ } else if (result.getMaskTypeDef != null) {
+ val transformer = result.getMaskTypeDef.getTransformer
+ if (StringUtils.isNotEmpty(transformer)) {
+ val trans = transformer.replace("{col}", expr.name)
+ val sql = s"SELECT $trans AS ${expr.name} FROM ${table.qualifiedName}"
+ val plan = analyzer.execute(sqlParser.parsePlan(sql))
+ (expr, plan)
+ } else {
+ (expr, null)
+ }
+ } else {
+ (expr, null)
+ }
+ }.filter(_._2 != null)
+
+ val formedMaskers: Map[ExprId, Alias] =
+ originMaskers.map { case (expr, p) => (expr, p.asInstanceOf[Project].projectList.head) }
+ .map { case (expr, attr) =>
+ val originalAlias = attr.asInstanceOf[Alias]
+ val newChild = originalAlias.child mapChildren {
+ case _: AttributeReference => expr
+ case o => o
+ }
+ val newAlias = originalAlias.copy(child = newChild)(
+ originalAlias.exprId, originalAlias.qualifier, originalAlias.explicitMetadata)
+ (expr.exprId, newAlias)
+ }.toMap
+
+ val aliasedMaskers = new mutable.HashMap[ExprId, Alias]()
+ for ((alias, id) <- aliases if formedMaskers.contains(id)) {
+ val originalAlias = formedMaskers(id)
+ val newChild = originalAlias.child mapChildren {
+ case ar: AttributeReference =>
+ ar.copy(name = alias.name)(alias.exprId, alias.qualifier)
+ case o => o
+ }
+ val newAlias = originalAlias.copy(child = newChild, alias.name)(
+ originalAlias.exprId, originalAlias.qualifier, originalAlias.explicitMetadata)
+ aliasedMaskers.put(alias.exprId, newAlias)
+ }
+ formedMaskers ++ aliasedMaskers
+ } catch {
+ case e: Exception => throw e
+ }
+ }
+
+ private def isMaskEnabled(result: RangerAccessResult): Boolean = {
+ result != null && result.isMaskEnabled
+ }
+
+ private def hasCatalogTable(plan: LogicalPlan): Boolean = plan match {
+ case _: HiveTableRelation => true
+ case l: LogicalRelation if l.catalogTable.isDefined => true
+ case _ => false
+ }
+
+ private def collectAllAliases(plan: LogicalPlan): mutable.HashMap[Alias, ExprId] = {
+ val aliases = new mutable.HashMap[Alias, ExprId]()
+ plan.transformAllExpressions {
+ case a: Alias =>
+ a.child match {
+ case ne: NamedExpression =>
+ aliases.put(a, ne.exprId)
+ case _ =>
+ }
+ a
+ }
+ aliases
+ }
+
+ private def collectAllTransformers(
+ plan: LogicalPlan,
+ aliases: mutable.Map[Alias, ExprId]): Map[ExprId, NamedExpression] = {
+ plan.collectLeaves().flatMap {
+ case h: HiveTableRelation =>
+ collectTransformers(h, h.tableMeta, aliases)
+ case l: LogicalRelation if l.catalogTable.isDefined =>
+ collectTransformers(l, l.catalogTable.get, aliases)
+ case _ => Seq.empty
+ }.toMap
+ }
+
+ private def doMasking(plan: LogicalPlan): LogicalPlan = plan match {
+ case s: Subquery => s
+ case m: SubmarineDataMasking => m // escape the optimize iteration if already masked
+ case fixed if fixed.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty => fixed
+ case _ =>
+ val aliases = collectAllAliases(plan)
+ val transformers = collectAllTransformers(plan, aliases)
+ val newPlan =
+ if (transformers.nonEmpty && plan.output.exists(o => transformers.get(o.exprId).nonEmpty)) {
+ val newOutput = plan.output.map(attr => transformers.getOrElse(attr.exprId, attr))
+ Project(newOutput, plan)
+ } else {
+ plan
+ }
+
+ val marked = newPlan transformUp {
+ case p if hasCatalogTable(p) => SubmarineDataMasking(p)
+ }
+
+ marked transformAllExpressions {
+ case s: SubqueryExpression =>
+ val Subquery(newPlan) =
+ rangerSparkOptimizer.execute(Subquery(SubmarineDataMasking(s.plan)))
+ s.withNewPlan(newPlan)
+ }
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan match {
+ case c: Command => c match {
+ case c: CreateDataSourceTableAsSelectCommand => c.copy(query = doMasking(c.query))
+ case c: CreateHiveTableAsSelectCommand => c.copy(query = doMasking(c.query))
+ case c: CreateViewCommand => c.copy(child = doMasking(c.child))
+ case i: InsertIntoDataSourceCommand => i.copy(query = doMasking(i.query))
+ case i: InsertIntoDataSourceDirCommand => i.copy(query = doMasking(i.query))
+ case i: InsertIntoHadoopFsRelationCommand => i.copy(query = doMasking(i.query))
+ case i: InsertIntoHiveDirCommand => i.copy(query = doMasking(i.query))
+ case i: InsertIntoHiveTable => i.copy(query = doMasking(i.query))
+ case s: SaveIntoDataSourceCommand => s.copy(query = doMasking(s.query))
+ case cmd => cmd
+ }
+ case other => doMasking(other)
+ }
+}
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineDataMasking.scala
similarity index 60%
copy from submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
copy to submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineDataMasking.scala
index ae9aad8..4eba657 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineDataMasking.scala
@@ -15,18 +15,14 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution
+package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.{SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubmarineRowFilter}
+import org.apache.spark.sql.catalyst.expressions.Attribute
/**
- * An Apache Spark's [[Strategy]] extension for omitting marker for row level filtering and data
- * masking.
+ * A marker [[LogicalPlan]] for column data masking, which will be removed during
+ * LogicalPlan -> PhysicalPlan
*/
-case class SubmarineSparkPlanOmitStrategy(spark: SparkSession) extends Strategy {
- override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case SubmarineRowFilter(child) => planLater(child) :: Nil
- case _ => Nil
- }
-}
\ No newline at end of file
+case class SubmarineDataMasking(child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
+}
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
index ae9aad8..b768eb6 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.sql.{SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubmarineRowFilter}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubmarineDataMasking, SubmarineRowFilter}
/**
* An Apache Spark's [[Strategy]] extension for omitting marker for row level filtering and data
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubmarineRowFil
case class SubmarineSparkPlanOmitStrategy(spark: SparkSession) extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case SubmarineRowFilter(child) => planLater(child) :: Nil
+ case SubmarineDataMasking(child) => planLater(child) :: Nil
case _ => Nil
}
-}
\ No newline at end of file
+}
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
index 80dc127..e90e2f3 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
@@ -18,13 +18,14 @@
package org.apache.submarine.spark.security
import org.apache.spark.sql.SparkSessionExtensions
-import org.apache.spark.sql.catalyst.optimizer.{SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension}
+import org.apache.spark.sql.catalyst.optimizer.{SubmarineDataMaskingExtension, SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension}
import org.apache.spark.sql.execution.SubmarineSparkPlanOmitStrategy
class RangerSparkSQLExtension extends Extensions {
override def apply(ext: SparkSessionExtensions): Unit = {
ext.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
ext.injectOptimizerRule(SubmarineRowFilterExtension)
+ ext.injectOptimizerRule(SubmarineDataMaskingExtension)
ext.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
}
}
diff --git a/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json b/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json
index 6691216..b356fd5 100644
--- a/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json
+++ b/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json
@@ -2128,6 +2128,70 @@
"guid": "98a04cd7-8d14-4466-adc9-126d87a3af69",
"isEnabled": true,
"version": 1
+ },
+ {
+ "service": "hive_jenkins",
+ "name": "rangertbl5_value_show_last_4",
+ "policyType": 1,
+ "policyPriority": 0,
+ "description": "",
+ "isAuditEnabled": true,
+ "resources": {
+ "database": {
+ "values": [
+ "default"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "column": {
+ "values": [
+ "value"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ },
+ "table": {
+ "values": [
+ "rangertbl5"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ }
+ },
+ "policyItems": [],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [
+ {
+ "dataMaskInfo": {
+ "dataMaskType": "MASK_SHOW_LAST_4"
+ },
+ "accesses": [
+ {
+ "type": "select",
+ "isAllowed": true
+ }
+ ],
+ "users": [
+ "bob"
+ ],
+ "groups": [],
+ "conditions": [],
+ "delegateAdmin": false
+ }
+ ],
+ "rowFilterPolicyItems": [],
+ "options": {},
+ "validitySchedules": [],
+ "policyLabels": [
+ ""
+ ],
+ "id": 32,
+ "guid": "b3f1f1e0-2bd6-4b20-8a32-a531006ae151",
+ "isEnabled": true,
+ "version": 1
}
],
"serviceDef": {
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
index 1472413..7f29cd7 100644
--- a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
+++ b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import java.security.PrivilegedExceptionAction
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.sql.catalyst.optimizer.{SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension}
+import org.apache.spark.sql.catalyst.optimizer.{SubmarineDataMaskingExtension, SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension}
import org.apache.spark.sql.execution.SubmarineSparkPlanOmitStrategy
object SubmarineSparkUtils {
@@ -40,4 +40,9 @@ object SubmarineSparkUtils {
spark.extensions.injectOptimizerRule(SubmarineRowFilterExtension)
spark.extensions.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
}
+
+ def enableDataMasking(spark: SparkSession): Unit = {
+ spark.extensions.injectOptimizerRule(SubmarineDataMaskingExtension)
+ spark.extensions.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
+ }
}
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtensionTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtensionTest.scala
new file mode 100644
index 0000000..be12a93
--- /dev/null
+++ b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtensionTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.plans.logical.{Project, SubmarineDataMasking}
+import org.apache.spark.sql.SubmarineSparkUtils
+import org.apache.spark.sql.hive.test.TestHive
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class SubmarineDataMaskingExtensionTest extends FunSuite with BeforeAndAfterAll {
+
+ private val spark = TestHive.sparkSession.newSession()
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ spark.reset()
+ }
+
+ test("applying condition to original query if data masking exists in ranger") {
+ val extension = SubmarineDataMaskingExtension(spark)
+ val frame = spark.sql("select * from src")
+ SubmarineSparkUtils.withUser("bob") {
+ val plan = extension.apply(frame.queryExecution.optimizedPlan)
+ assert(plan.asInstanceOf[Project].projectList(1).collectLeaves().length === 7)
+ assert(plan.children.head.isInstanceOf[SubmarineDataMasking])
+ }
+
+ SubmarineSparkUtils.withUser("alice") {
+ val plan = extension.apply(frame.queryExecution.optimizedPlan)
+ assert(plan.isInstanceOf[SubmarineDataMasking])
+ }
+ }
+}
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
new file mode 100644
index 0000000..9a4d23c
--- /dev/null
+++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.spark.security
+
+import org.apache.commons.codec.digest.DigestUtils
+import org.apache.spark.sql.SubmarineSparkUtils.{enableDataMasking, withUser}
+import org.apache.spark.sql.catalyst.plans.logical.{Project, SubmarineDataMasking}
+import org.apache.spark.sql.hive.test.TestHive
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+case class DataMaskingSQLTest() extends FunSuite with BeforeAndAfterAll {
+ private val spark = TestHive.sparkSession.newSession()
+ private lazy val sql = spark.sql _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ sql(
+ """
+ |CREATE TABLE IF NOT EXISTS default.rangertbl1 AS SELECT * FROM default.src
+ """.stripMargin)
+
+ sql(
+ """
+ |CREATE TABLE IF NOT EXISTS default.rangertbl2 AS SELECT * FROM default.src
+ """.stripMargin)
+
+ sql(
+ """
+ |CREATE TABLE IF NOT EXISTS default.rangertbl3 AS SELECT * FROM default.src
+ """.stripMargin)
+
+ sql(
+ """
+ |CREATE TABLE IF NOT EXISTS default.rangertbl4 AS SELECT * FROM default.src
+ """.stripMargin)
+
+ sql(
+ """
+ |CREATE TABLE IF NOT EXISTS default.rangertbl5 AS SELECT * FROM default.src
+ """.stripMargin)
+
+ sql(
+ """
+ |CREATE TABLE IF NOT EXISTS default.rangertbl6 AS SELECT * FROM default.src
+ """.stripMargin)
+ enableDataMasking(spark)
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ spark.reset()
+ }
+
+ test("simple query") {
+ val statement = "select * from default.src"
+ withUser("bob") {
+ val df = sql(statement)
+ assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty)
+ assert(df.queryExecution.optimizedPlan.isInstanceOf[Project])
+ val project = df.queryExecution.optimizedPlan.asInstanceOf[Project]
+ val masker = project.projectList(1)
+ assert(masker.name === "value")
+ assert(masker.children.exists(_.sql.contains("mask_show_last_n")))
+ val row = df.take(1)(0)
+ assert(row.getString(1).startsWith("x"), "values should be masked")
+ }
+ withUser("alice") {
+ assert(!sql(statement).take(1)(0).getString(1).startsWith("x"))
+ }
+ }
+
+ test("projection with ranger filter key") {
+ withUser("bob") {
+ val statement = "select key from default.src where key = 0"
+ val df = sql(statement)
+ assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty)
+ val row = df.take(1)(0)
+ assert(row.getInt(0) === 0, "key is not masked")
+ }
+ withUser("bob") {
+ val statement = "select value from default.src where key = 0"
+ val df = sql(statement)
+ assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty)
+ val row = df.take(1)(0)
+ assert(row.getString(0).startsWith("x"), "value is masked")
+ }
+ }
+
+ test("alias") {
+ val statement = "select key as k1, value v1 from default.src"
+ withUser("bob") {
+ val df = sql(statement)
+ val row = df.take(1)(0)
+ assert(row.getString(1).startsWith("x"), "values should be masked")
+ }
+ }
+
+ test("agg") {
+ val statement = "select sum(key) as k1, value v1 from default.src group by v1"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val row = df.take(1)(0)
+ assert(row.getString(1).startsWith("x"), "values should be masked")
+ }
+ withUser("alice") {
+ val df = sql(statement)
+ val row = df.take(1)(0)
+ assert(row.getString(1).startsWith("val"), "values should not be masked")
+ }
+ }
+
+ test("MASK") {
+ val statement = "select * from default.rangertbl1"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val row = df.take(1)(0)
+ assert(row.getString(1).startsWith("x"), "values should be masked")
+ }
+ }
+
+ test("MASK_SHOW_FIRST_4") {
+ val statement = "select * from default.rangertbl2"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val row = df.take(1)(0)
+ assert(row.getString(1).startsWith("val_x"), "values should show first 4 characters")
+ }
+ }
+
+ test("MASK_HASH") {
+ val statement = "select * from default.rangertbl3 where value = 'val_277'"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val row = df.take(1)(0)
+ assert(row.getString(1) === DigestUtils.md5Hex("val_277"), "value is hashed")
+ }
+ }
+
+ test("MASK_SHOW_LAST_4") {
+ val statement = "select * from default.rangertbl5 where value = 'val_277'"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val row = df.take(1)(0)
+ assert(row.getString(1) === "xxx_277", "value shows last 4 characters")
+ }
+ }
+
+ test("NO MASKING") {
+ val statement = "select * from default.rangertbl6 where value = 'val_277'"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val row = df.take(1)(0)
+ assert(row.getString(1) === "val_277", "value has no mask")
+ }
+ }
+
+ test("commands") {
+ withUser("bob") {
+ val statement = "create view v1 as select * from default.rangertbl5 where value = 'val_277'"
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+
+ val row = sql("select * from v1").take(1)(0)
+ assert(row.getString(1) === "xxx_277", "value shows last 4 characters")
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org