You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by li...@apache.org on 2020/03/11 06:57:34 UTC

[submarine] branch master updated: SUBMARINE-418. Support Data Masking for spark-security

This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 97d6dfc  SUBMARINE-418. Support Data Masking for spark-security
97d6dfc is described below

commit 97d6dfc9e1c24f3869e6802a4afea58472b75ec4
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Wed Mar 11 14:21:42 2020 +0800

    SUBMARINE-418. Support Data Masking for spark-security
    
    ### What is this PR for?
    
    add data masking for spark-security, which enable administrators to mask sensitive data per column
    
    e.g. phone num 12345678901 can be masked only show last 4 - nnnnnnn8901
    
    ### What type of PR is it?
    feature
    ### Todos
    
    ### What is the Jira issue?
    * Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE-418
    
    ### How should this be tested?
    
    add unit tests
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? /No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Kent Yao <ya...@hotmail.com>
    
    Closes #219 from yaooqinn/SUBMARINE-418 and squashes the following commits:
    
    d2a70ea [Kent Yao] nit
    95cc726 [Kent Yao] SUBMARINE-418. Support Data Masking
---
 .../optimizer/SubmarineDataMaskingExtension.scala  | 227 +++++++++++++++++++++
 .../plans/logical/SubmarineDataMasking.scala}      |  18 +-
 .../execution/SubmarineSparkPlanOmitStrategy.scala |   5 +-
 .../spark/security/RangerSparkSQLExtension.scala   |   3 +-
 .../src/test/resources/sparkSql_hive_jenkins.json  |  64 ++++++
 .../org/apache/spark/sql/SubmarineSparkUtils.scala |   7 +-
 .../SubmarineDataMaskingExtensionTest.scala        |  48 +++++
 .../spark/security/DataMaskingSQLTest.scala        | 189 +++++++++++++++++
 8 files changed, 546 insertions(+), 15 deletions(-)

diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
new file mode 100644
index 0000000..7e2b99c
--- /dev/null
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.ranger.plugin.model.RangerPolicy
+import org.apache.ranger.plugin.policyengine.RangerAccessResult
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable, HiveTableRelation}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, Project, SubmarineDataMasking, Subquery}
+import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, CreateViewCommand, InsertIntoDataSourceDirCommand}
+import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand}
+import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable}
+
+import org.apache.submarine.spark.security.{RangerSparkAccessRequest, RangerSparkAuditHandler, RangerSparkPlugin, RangerSparkResource, SparkAccessType}
+
+/**
+ * An Apache Spark's [[Optimizer]] extension for column data masking.
+ */
+case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[LogicalPlan] {
+  import RangerPolicy._
+
+  // register all built-in masking udfs
+  Map("mask" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMask",
+    "mask_first_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskFirstN",
+    "mask_hash" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash",
+    "mask_last_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskLastN",
+    "mask_show_first_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskShowFirstN",
+    "mask_show_last_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskShowLastN")
+    .map(x => CatalogFunction(FunctionIdentifier(x._1), x._2, Seq.empty))
+    .foreach(spark.sessionState.catalog.registerFunction(_, true))
+
+  private lazy val sparkPlugin = RangerSparkPlugin.build().getOrCreate()
+  private lazy val sqlParser = spark.sessionState.sqlParser
+  private lazy val analyzer = spark.sessionState.analyzer
+  private lazy val rangerSparkOptimizer = new SubmarineSparkOptimizer(spark)
+
+  /**
+   * Collecting transformers from Ranger data masking policies, and mapping the to the
+   * [[LogicalPlan]] output attributes.
+   *
+   * @param plan the original logical plan with a underlying catalog table
+   * @param table the catalog table
+   * @return a list of key-value pairs of original expression with its masking representation
+   */
+  private def collectTransformers(
+      plan: LogicalPlan,
+      table: CatalogTable,
+      aliases: mutable.Map[Alias, ExprId]): Map[ExprId, NamedExpression] = {
+    val auditHandler = new RangerSparkAuditHandler()
+    val ugi = UserGroupInformation.getCurrentUser
+    val userName = ugi.getShortUserName
+    val groups = ugi.getGroupNames.toSet
+    try {
+      val identifier = table.identifier
+      import org.apache.submarine.spark.security.SparkObjectType._
+
+      val maskEnableResults = plan.output.map { expr =>
+        val resource = RangerSparkResource(COLUMN, identifier.database, identifier.table, expr.name)
+        val req = new RangerSparkAccessRequest(resource, userName, groups, COLUMN.toString,
+          SparkAccessType.SELECT, sparkPlugin.getClusterName)
+        (expr, sparkPlugin.evalDataMaskPolicies(req, auditHandler))
+      }.filter(x => isMaskEnabled(x._2))
+
+      val originMaskers = maskEnableResults.map { case (expr, result) =>
+        if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_NULL)) {
+          val sql = s"SELECT NULL AS ${expr.name} FROM ${table.qualifiedName}"
+          val plan = analyzer.execute(sqlParser.parsePlan(sql))
+          (expr, plan)
+        } else if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_CUSTOM)) {
+          val maskVal = result.getMaskedValue
+          if (maskVal == null) {
+            val sql = s"SELECT NULL AS ${expr.name} FROM ${table.qualifiedName}"
+            val plan = analyzer.execute(sqlParser.parsePlan(sql))
+            (expr, plan)
+          } else {
+            val sql = s"SELECT ${maskVal.replace("{col}", expr.name)} AS ${expr.name} FROM" +
+              s" ${table.qualifiedName}"
+            val plan = analyzer.execute(sqlParser.parsePlan(sql))
+            (expr, plan)
+          }
+        } else if (result.getMaskTypeDef != null) {
+          val transformer = result.getMaskTypeDef.getTransformer
+          if (StringUtils.isNotEmpty(transformer)) {
+            val trans = transformer.replace("{col}", expr.name)
+            val sql = s"SELECT $trans AS ${expr.name} FROM ${table.qualifiedName}"
+            val plan = analyzer.execute(sqlParser.parsePlan(sql))
+            (expr, plan)
+          } else {
+            (expr, null)
+          }
+        } else {
+          (expr, null)
+        }
+      }.filter(_._2 != null)
+
+      val formedMaskers: Map[ExprId, Alias] =
+        originMaskers.map { case (expr, p) => (expr, p.asInstanceOf[Project].projectList.head) }
+          .map { case (expr, attr) =>
+            val originalAlias = attr.asInstanceOf[Alias]
+            val newChild = originalAlias.child mapChildren {
+              case _: AttributeReference => expr
+              case o => o
+            }
+            val newAlias = originalAlias.copy(child = newChild)(
+              originalAlias.exprId, originalAlias.qualifier, originalAlias.explicitMetadata)
+            (expr.exprId, newAlias)
+          }.toMap
+
+      val aliasedMaskers = new mutable.HashMap[ExprId, Alias]()
+      for ((alias, id) <- aliases if formedMaskers.contains(id)) {
+        val originalAlias = formedMaskers(id)
+        val newChild = originalAlias.child mapChildren {
+          case ar: AttributeReference =>
+            ar.copy(name = alias.name)(alias.exprId, alias.qualifier)
+          case o => o
+        }
+        val newAlias = originalAlias.copy(child = newChild, alias.name)(
+          originalAlias.exprId, originalAlias.qualifier, originalAlias.explicitMetadata)
+        aliasedMaskers.put(alias.exprId, newAlias)
+      }
+      formedMaskers ++ aliasedMaskers
+    } catch {
+      case e: Exception => throw e
+    }
+  }
+
+  private def isMaskEnabled(result: RangerAccessResult): Boolean = {
+    result != null && result.isMaskEnabled
+  }
+
+  private def hasCatalogTable(plan: LogicalPlan): Boolean = plan match {
+    case _: HiveTableRelation => true
+    case l: LogicalRelation if l.catalogTable.isDefined => true
+    case _ => false
+  }
+
+  private def collectAllAliases(plan: LogicalPlan): mutable.HashMap[Alias, ExprId] = {
+    val aliases = new mutable.HashMap[Alias, ExprId]()
+    plan.transformAllExpressions {
+      case a: Alias =>
+        a.child match {
+          case ne: NamedExpression =>
+            aliases.put(a, ne.exprId)
+          case _ =>
+        }
+        a
+    }
+    aliases
+  }
+
+  private def collectAllTransformers(
+      plan: LogicalPlan,
+      aliases: mutable.Map[Alias, ExprId]): Map[ExprId, NamedExpression] = {
+    plan.collectLeaves().flatMap {
+      case h: HiveTableRelation =>
+        collectTransformers(h, h.tableMeta, aliases)
+      case l: LogicalRelation if l.catalogTable.isDefined =>
+        collectTransformers(l, l.catalogTable.get, aliases)
+      case _ => Seq.empty
+    }.toMap
+  }
+
+  private def doMasking(plan: LogicalPlan): LogicalPlan = plan match {
+    case s: Subquery => s
+    case m: SubmarineDataMasking => m // escape the optimize iteration if already masked
+    case fixed if fixed.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty => fixed
+    case _ =>
+      val aliases = collectAllAliases(plan)
+      val transformers = collectAllTransformers(plan, aliases)
+      val newPlan =
+        if (transformers.nonEmpty && plan.output.exists(o => transformers.get(o.exprId).nonEmpty)) {
+          val newOutput = plan.output.map(attr => transformers.getOrElse(attr.exprId, attr))
+          Project(newOutput, plan)
+        } else {
+          plan
+        }
+
+      val marked = newPlan transformUp {
+        case p if hasCatalogTable(p) => SubmarineDataMasking(p)
+      }
+
+      marked transformAllExpressions {
+        case s: SubqueryExpression =>
+          val Subquery(newPlan) =
+            rangerSparkOptimizer.execute(Subquery(SubmarineDataMasking(s.plan)))
+          s.withNewPlan(newPlan)
+      }
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case c: Command => c match {
+      case c: CreateDataSourceTableAsSelectCommand => c.copy(query = doMasking(c.query))
+      case c: CreateHiveTableAsSelectCommand => c.copy(query = doMasking(c.query))
+      case c: CreateViewCommand => c.copy(child = doMasking(c.child))
+      case i: InsertIntoDataSourceCommand => i.copy(query = doMasking(i.query))
+      case i: InsertIntoDataSourceDirCommand => i.copy(query = doMasking(i.query))
+      case i: InsertIntoHadoopFsRelationCommand => i.copy(query = doMasking(i.query))
+      case i: InsertIntoHiveDirCommand => i.copy(query = doMasking(i.query))
+      case i: InsertIntoHiveTable => i.copy(query = doMasking(i.query))
+      case s: SaveIntoDataSourceCommand => s.copy(query = doMasking(s.query))
+      case cmd => cmd
+    }
+    case other => doMasking(other)
+  }
+}
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineDataMasking.scala
similarity index 60%
copy from submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
copy to submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineDataMasking.scala
index ae9aad8..4eba657 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineDataMasking.scala
@@ -15,18 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution
+package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.{SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubmarineRowFilter}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 
 /**
- * An Apache Spark's [[Strategy]] extension for omitting marker for row level filtering and data
- * masking.
+ * A marker [[LogicalPlan]] for column data masking, which will be removed during
+ * LogicalPlan -> PhysicalPlan
  */
-case class SubmarineSparkPlanOmitStrategy(spark: SparkSession) extends Strategy {
-  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-    case SubmarineRowFilter(child) => planLater(child) :: Nil
-    case _ => Nil
-  }
-}
\ No newline at end of file
+case class SubmarineDataMasking(child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+}
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
index ae9aad8..b768eb6 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.{SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubmarineRowFilter}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubmarineDataMasking, SubmarineRowFilter}
 
 /**
  * An Apache Spark's [[Strategy]] extension for omitting marker for row level filtering and data
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubmarineRowFil
 case class SubmarineSparkPlanOmitStrategy(spark: SparkSession) extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case SubmarineRowFilter(child) => planLater(child) :: Nil
+    case SubmarineDataMasking(child) => planLater(child) :: Nil
     case _ => Nil
   }
-}
\ No newline at end of file
+}
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
index 80dc127..e90e2f3 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
@@ -18,13 +18,14 @@
 package org.apache.submarine.spark.security
 
 import org.apache.spark.sql.SparkSessionExtensions
-import org.apache.spark.sql.catalyst.optimizer.{SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension}
+import org.apache.spark.sql.catalyst.optimizer.{SubmarineDataMaskingExtension, SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension}
 import org.apache.spark.sql.execution.SubmarineSparkPlanOmitStrategy
 
 class RangerSparkSQLExtension extends Extensions {
   override def apply(ext: SparkSessionExtensions): Unit = {
     ext.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
     ext.injectOptimizerRule(SubmarineRowFilterExtension)
+    ext.injectOptimizerRule(SubmarineDataMaskingExtension)
     ext.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
   }
 }
diff --git a/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json b/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json
index 6691216..b356fd5 100644
--- a/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json
+++ b/submarine-security/spark-security/src/test/resources/sparkSql_hive_jenkins.json
@@ -2128,6 +2128,70 @@
       "guid": "98a04cd7-8d14-4466-adc9-126d87a3af69",
       "isEnabled": true,
       "version": 1
+    },
+    {
+      "service": "hive_jenkins",
+      "name": "rangertbl5_value_show_last_4",
+      "policyType": 1,
+      "policyPriority": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "database": {
+          "values": [
+            "default"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "column": {
+          "values": [
+            "value"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        },
+        "table": {
+          "values": [
+            "rangertbl5"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [
+        {
+          "dataMaskInfo": {
+            "dataMaskType": "MASK_SHOW_LAST_4"
+          },
+          "accesses": [
+            {
+              "type": "select",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "bob"
+          ],
+          "groups": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "rowFilterPolicyItems": [],
+      "options": {},
+      "validitySchedules": [],
+      "policyLabels": [
+        ""
+      ],
+      "id": 32,
+      "guid": "b3f1f1e0-2bd6-4b20-8a32-a531006ae151",
+      "isEnabled": true,
+      "version": 1
     }
   ],
   "serviceDef": {
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
index 1472413..7f29cd7 100644
--- a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
+++ b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import java.security.PrivilegedExceptionAction
 
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.sql.catalyst.optimizer.{SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension}
+import org.apache.spark.sql.catalyst.optimizer.{SubmarineDataMaskingExtension, SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension}
 import org.apache.spark.sql.execution.SubmarineSparkPlanOmitStrategy
 
 object SubmarineSparkUtils {
@@ -40,4 +40,9 @@ object SubmarineSparkUtils {
     spark.extensions.injectOptimizerRule(SubmarineRowFilterExtension)
     spark.extensions.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
   }
+
+  def enableDataMasking(spark: SparkSession): Unit = {
+    spark.extensions.injectOptimizerRule(SubmarineDataMaskingExtension)
+    spark.extensions.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
+  }
 }
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtensionTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtensionTest.scala
new file mode 100644
index 0000000..be12a93
--- /dev/null
+++ b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtensionTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.plans.logical.{Project, SubmarineDataMasking}
+import org.apache.spark.sql.SubmarineSparkUtils
+import org.apache.spark.sql.hive.test.TestHive
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class SubmarineDataMaskingExtensionTest extends FunSuite with BeforeAndAfterAll {
+
+  private val spark = TestHive.sparkSession.newSession()
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    spark.reset()
+  }
+
+  test("applying condition to original query if data masking exists in ranger") {
+    val extension = SubmarineDataMaskingExtension(spark)
+    val frame = spark.sql("select * from src")
+    SubmarineSparkUtils.withUser("bob") {
+      val plan = extension.apply(frame.queryExecution.optimizedPlan)
+      assert(plan.asInstanceOf[Project].projectList(1).collectLeaves().length === 7)
+      assert(plan.children.head.isInstanceOf[SubmarineDataMasking])
+    }
+
+    SubmarineSparkUtils.withUser("alice") {
+      val plan = extension.apply(frame.queryExecution.optimizedPlan)
+      assert(plan.isInstanceOf[SubmarineDataMasking])
+    }
+  }
+}
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
new file mode 100644
index 0000000..9a4d23c
--- /dev/null
+++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.spark.security
+
+import org.apache.commons.codec.digest.DigestUtils
+import org.apache.spark.sql.SubmarineSparkUtils.{enableDataMasking, withUser}
+import org.apache.spark.sql.catalyst.plans.logical.{Project, SubmarineDataMasking}
+import org.apache.spark.sql.hive.test.TestHive
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+case class DataMaskingSQLTest() extends FunSuite with BeforeAndAfterAll {
+  private val spark = TestHive.sparkSession.newSession()
+  private lazy val sql = spark.sql _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    sql(
+      """
+        |CREATE TABLE IF NOT EXISTS default.rangertbl1 AS SELECT * FROM default.src
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE TABLE IF NOT EXISTS default.rangertbl2 AS SELECT * FROM default.src
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE TABLE IF NOT EXISTS default.rangertbl3 AS SELECT * FROM default.src
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE TABLE IF NOT EXISTS default.rangertbl4 AS SELECT * FROM default.src
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE TABLE IF NOT EXISTS default.rangertbl5 AS SELECT * FROM default.src
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE TABLE IF NOT EXISTS default.rangertbl6 AS SELECT * FROM default.src
+      """.stripMargin)
+    enableDataMasking(spark)
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    spark.reset()
+  }
+
+  test("simple query") {
+    val statement = "select * from default.src"
+    withUser("bob") {
+      val df = sql(statement)
+      assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty)
+      assert(df.queryExecution.optimizedPlan.isInstanceOf[Project])
+      val project = df.queryExecution.optimizedPlan.asInstanceOf[Project]
+      val masker = project.projectList(1)
+      assert(masker.name === "value")
+      assert(masker.children.exists(_.sql.contains("mask_show_last_n")))
+      val row = df.take(1)(0)
+      assert(row.getString(1).startsWith("x"), "values should be masked")
+    }
+    withUser("alice") {
+      assert(!sql(statement).take(1)(0).getString(1).startsWith("x"))
+    }
+  }
+
+  test("projection with ranger filter key") {
+    withUser("bob") {
+      val statement = "select key from default.src where key = 0"
+      val df = sql(statement)
+      assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty)
+      val row = df.take(1)(0)
+      assert(row.getInt(0) === 0, "key is not masked")
+    }
+    withUser("bob") {
+      val statement = "select value from default.src where key = 0"
+      val df = sql(statement)
+      assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty)
+      val row = df.take(1)(0)
+      assert(row.getString(0).startsWith("x"), "value is masked")
+    }
+  }
+
+  test("alias") {
+    val statement = "select key as k1, value v1 from default.src"
+    withUser("bob") {
+      val df = sql(statement)
+      val row = df.take(1)(0)
+      assert(row.getString(1).startsWith("x"), "values should be masked")
+    }
+  }
+
+  test("agg") {
+    val statement = "select sum(key) as k1, value v1 from default.src group by v1"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val row = df.take(1)(0)
+      assert(row.getString(1).startsWith("x"), "values should be masked")
+    }
+    withUser("alice") {
+      val df = sql(statement)
+      val row = df.take(1)(0)
+      assert(row.getString(1).startsWith("val"), "values should not be masked")
+    }
+  }
+
+  test("MASK") {
+    val statement = "select * from default.rangertbl1"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val row = df.take(1)(0)
+      assert(row.getString(1).startsWith("x"), "values should be masked")
+    }
+  }
+
+  test("MASK_SHOW_FIRST_4") {
+    val statement = "select * from default.rangertbl2"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val row = df.take(1)(0)
+      assert(row.getString(1).startsWith("val_x"), "values should show first 4 characters")
+    }
+  }
+
+  test("MASK_HASH") {
+    val statement = "select * from default.rangertbl3 where value = 'val_277'"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val row = df.take(1)(0)
+      assert(row.getString(1) === DigestUtils.md5Hex("val_277"), "value is hashed")
+    }
+  }
+
+  test("MASK_SHOW_LAST_4") {
+    val statement = "select * from default.rangertbl5 where value = 'val_277'"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val row = df.take(1)(0)
+      assert(row.getString(1) === "xxx_277", "value shows last 4 characters")
+    }
+  }
+
+  test("NO MASKING") {
+    val statement = "select * from default.rangertbl6 where value = 'val_277'"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val row = df.take(1)(0)
+      assert(row.getString(1) === "val_277", "value has no mask")
+    }
+  }
+
+  test("commands") {
+    withUser("bob") {
+      val statement = "create view v1 as select * from default.rangertbl5 where value = 'val_277'"
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+
+      val row = sql("select * from v1").take(1)(0)
+      assert(row.getString(1) === "xxx_277", "value shows last 4 characters")
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org