You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by li...@apache.org on 2020/03/17 14:18:58 UTC

[submarine] branch master updated: SUBMARINE-434. Refine data masking plan resolution for spark security

This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 40cd081  SUBMARINE-434. Refine data masking plan resolution for spark security
40cd081 is described below

commit 40cd08109cadf50e34f8cb862a21dd7699ce9c73
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Tue Mar 17 21:07:51 2020 +0800

    SUBMARINE-434. Refine data masking plan resolution for spark security
    
    ### What is this PR for?
    Refine data masking plan resolution for spark security
    
    Now we use parseExpression to generate alias for masked out, it is neater.
    
    ### What type of PR is it?
    
    Refactoring
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE-434
    
    ### How should this be tested?
    add tests
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Kent Yao <ya...@hotmail.com>
    
    Closes #233 from yaooqinn/SUBMARINE-434 and squashes the following commits:
    
    a4cc16b [Kent Yao] SUBMARINE-434. Refine data masking plan resolution for spark security
---
 .../optimizer/SubmarineDataMaskingExtension.scala  | 149 +++++++++++----------
 .../spark/security/RangerSparkAuditHandler.scala   |   2 +-
 .../spark/security/DataMaskingSQLTest.scala        |  20 +++
 3 files changed, 100 insertions(+), 71 deletions(-)

diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
index 094abc7..a829eae 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
@@ -23,25 +23,26 @@ import scala.collection.mutable
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.ranger.plugin.model.RangerPolicy
 import org.apache.ranger.plugin.policyengine.RangerAccessResult
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable, HiveTableRelation}
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, ExprId, NamedExpression, SubqueryExpression}
-import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, Project, SubmarineDataMasking, Subquery}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, ExprId, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, CreateViewCommand, InsertIntoDataSourceDirCommand}
 import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand}
 import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable}
-
-import org.apache.submarine.spark.security.{RangerSparkAccessRequest, RangerSparkAuditHandler, RangerSparkPlugin, RangerSparkResource, SparkAccessType}
+import org.apache.submarine.spark.security._
+import org.apache.submarine.spark.security.SparkObjectType.COLUMN
 
 /**
  * An Apache Spark's [[Optimizer]] extension for column data masking.
+ * TODO(kent yao) implement this as analyzer rule
  */
 case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[LogicalPlan] {
-  import RangerPolicy._
+  import org.apache.ranger.plugin.model.RangerPolicy._
 
   // register all built-in masking udfs
   Map("mask" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMask",
@@ -51,12 +52,71 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic
     "mask_show_first_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskShowFirstN",
     "mask_show_last_n" -> "org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskShowLastN")
     .map(x => CatalogFunction(FunctionIdentifier(x._1), x._2, Seq.empty))
-    .foreach(spark.sessionState.catalog.registerFunction(_, true))
+    .foreach(spark.sessionState.catalog.registerFunction(_, overrideIfExists = true))
 
   private lazy val sparkPlugin = RangerSparkPlugin.build().getOrCreate()
   private lazy val sqlParser = spark.sessionState.sqlParser
   private lazy val analyzer = spark.sessionState.analyzer
-  private lazy val rangerSparkOptimizer = new SubmarineSparkOptimizer(spark)
+  private lazy val auditHandler = RangerSparkAuditHandler()
+  private def currentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
+
+  /**
+   * Get RangerAccessResult from ranger admin or local policies, which contains data masking rules
+   */
+  private def getAccessResult(identifier: TableIdentifier, attr: Attribute): RangerAccessResult = {
+    val resource = RangerSparkResource(COLUMN, identifier.database, identifier.table, attr.name)
+    val req = new RangerSparkAccessRequest(
+      resource,
+      currentUser.getShortUserName,
+      currentUser.getGroupNames.toSet,
+      COLUMN.toString,
+      SparkAccessType.SELECT,
+      sparkPlugin.getClusterName)
+    sparkPlugin.evalDataMaskPolicies(req, auditHandler)
+  }
+
+  /**
+   * Generate an [[Alias]] expression with the access result and original expression, which can be
+   * used to replace the original output of the query.
+   *
+   * This alias contains a child, which might be null literal or [[UnresolvedFunction]]. When the
+   * child is function, it replace the argument which is [[UnresolvedAttribute]] with the input
+   * attribute to resolve directly.
+   */
+  private def getMasker(attr: Attribute, result: RangerAccessResult): Alias = {
+    val expr = if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_NULL)) {
+      "NULL"
+    } else if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_CUSTOM)) {
+      val maskVal = result.getMaskedValue
+      if (maskVal == null) {
+        "NULL"
+      } else {
+        s"${maskVal.replace("{col}", attr.name)}"
+      }
+    } else if (result.getMaskTypeDef != null) {
+      val transformer = result.getMaskTypeDef.getTransformer
+      if (StringUtils.isNotEmpty(transformer)) {
+        s"${transformer.replace("{col}", attr.name)}"
+      } else {
+        return null
+      }
+    } else {
+      return null
+    }
+
+    // sql expression text -> UnresolvedFunction
+    val parsed = sqlParser.parseExpression(expr)
+
+    // Here we replace the attribute with the resolved one, e.g.
+    // 'mask_show_last_n('value, 4, x, x, x, -1, 1)
+    // ->
+    // 'mask_show_last_n(value#37, 4, x, x, x, -1, 1)
+    val resolved = parsed mapChildren {
+      case _: UnresolvedAttribute => attr
+      case o => o
+    }
+    Alias(resolved, attr.name)()
+  }
 
   /**
    * Collecting transformers from Ranger data masking policies, and mapping the to the
@@ -70,65 +130,14 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic
       plan: LogicalPlan,
       table: CatalogTable,
       aliases: mutable.Map[Alias, ExprId]): Map[ExprId, NamedExpression] = {
-    val auditHandler = new RangerSparkAuditHandler()
-    val ugi = UserGroupInformation.getCurrentUser
-    val userName = ugi.getShortUserName
-    val groups = ugi.getGroupNames.toSet
     try {
-      val identifier = table.identifier
-      import org.apache.submarine.spark.security.SparkObjectType._
-
       val maskEnableResults = plan.output.map { expr =>
-        val resource = RangerSparkResource(COLUMN, identifier.database, identifier.table, expr.name)
-        val req = new RangerSparkAccessRequest(resource, userName, groups, COLUMN.toString,
-          SparkAccessType.SELECT, sparkPlugin.getClusterName)
-        (expr, sparkPlugin.evalDataMaskPolicies(req, auditHandler))
+        expr -> getAccessResult(table.identifier, expr)
       }.filter(x => isMaskEnabled(x._2))
 
-      val originMaskers = maskEnableResults.map { case (expr, result) =>
-        if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_NULL)) {
-          val sql = s"SELECT NULL AS ${expr.name} FROM ${table.qualifiedName}"
-          val plan = analyzer.execute(sqlParser.parsePlan(sql))
-          (expr, plan)
-        } else if (StringUtils.equalsIgnoreCase(result.getMaskType, MASK_TYPE_CUSTOM)) {
-          val maskVal = result.getMaskedValue
-          if (maskVal == null) {
-            val sql = s"SELECT NULL AS ${expr.name} FROM ${table.qualifiedName}"
-            val plan = analyzer.execute(sqlParser.parsePlan(sql))
-            (expr, plan)
-          } else {
-            val sql = s"SELECT ${maskVal.replace("{col}", expr.name)} AS ${expr.name} FROM" +
-              s" ${table.qualifiedName}"
-            val plan = analyzer.execute(sqlParser.parsePlan(sql))
-            (expr, plan)
-          }
-        } else if (result.getMaskTypeDef != null) {
-          val transformer = result.getMaskTypeDef.getTransformer
-          if (StringUtils.isNotEmpty(transformer)) {
-            val trans = transformer.replace("{col}", expr.name)
-            val sql = s"SELECT $trans AS ${expr.name} FROM ${table.qualifiedName}"
-            val plan = analyzer.execute(sqlParser.parsePlan(sql))
-            (expr, plan)
-          } else {
-            (expr, null)
-          }
-        } else {
-          (expr, null)
-        }
-      }.filter(_._2 != null)
-
-      val formedMaskers: Map[ExprId, Alias] =
-        originMaskers.map { case (expr, p) => (expr, p.asInstanceOf[Project].projectList.head) }
-          .map { case (expr, attr) =>
-            val originalAlias = attr.asInstanceOf[Alias]
-            val newChild = originalAlias.child mapChildren {
-              case _: AttributeReference => expr
-              case o => o
-            }
-            val newAlias = originalAlias.copy(child = newChild)(
-              originalAlias.exprId, originalAlias.qualifier, originalAlias.explicitMetadata)
-            (expr.exprId, newAlias)
-          }.toMap
+      val formedMaskers = maskEnableResults.map { case (expr, result) =>
+        expr.exprId -> getMasker(expr, result)
+      }.filter(_._2 != null).toMap
 
       val aliasedMaskers = new mutable.HashMap[ExprId, Alias]()
       for ((alias, id) <- aliases if formedMaskers.contains(id)) {
@@ -138,10 +147,10 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic
             ar.copy(name = alias.name)(alias.exprId, alias.qualifier)
           case o => o
         }
-        val newAlias = originalAlias.copy(child = newChild, alias.name)(
-          originalAlias.exprId, originalAlias.qualifier, originalAlias.explicitMetadata)
+        val newAlias = Alias(newChild, alias.name)()
         aliasedMaskers.put(alias.exprId, newAlias)
       }
+
       formedMaskers ++ aliasedMaskers
     } catch {
       case e: Exception => throw e
@@ -199,14 +208,14 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic
           plan
         }
 
-      val marked = newPlan transformUp {
+      // Call spark analysis here explicitly to resolve UnresolvedFunctions
+      val marked = analyzer.execute(newPlan) transformUp {
         case p if hasCatalogTable(p) => SubmarineDataMasking(p)
       }
 
       marked transformAllExpressions {
         case s: SubqueryExpression =>
-          val Subquery(newPlan) =
-            rangerSparkOptimizer.execute(Subquery(SubmarineDataMasking(s.plan)))
+          val Subquery(newPlan) = Subquery(SubmarineDataMasking(s.plan))
           s.withNewPlan(newPlan)
       }
   }
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuditHandler.scala b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuditHandler.scala
index 32eb351..fff830a 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuditHandler.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuditHandler.scala
@@ -21,7 +21,7 @@ package org.apache.submarine.spark.security
 
 import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler
 
-class RangerSparkAuditHandler extends RangerDefaultAuditHandler {
+case class RangerSparkAuditHandler() extends RangerDefaultAuditHandler {
 
   // TODO(Kent Yao): Implementing meaningfully audit functions
 
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
index b2de098..f8b6dff 100644
--- a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
+++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
@@ -113,6 +113,16 @@ case class DataMaskingSQLTest() extends FunSuite with BeforeAndAfterAll {
     }
   }
 
+  test("alias, non-alias coexists") {
+    val statement = "select key as k1, value, value v1 from default.src"
+    withUser("bob") {
+      val df = sql(statement)
+      val row = df.take(1)(0)
+      assert(row.getString(1).startsWith("x"), "values should be masked")
+      assert(row.getString(2).startsWith("x"), "values should be masked")
+    }
+  }
+
   test("agg") {
     val statement = "select sum(key) as k1, value v1 from default.src group by v1"
     withUser("bob") {
@@ -158,6 +168,16 @@ case class DataMaskingSQLTest() extends FunSuite with BeforeAndAfterAll {
     }
   }
 
+  test("MASK_NULL") {
+    val statement = "select * from default.rangertbl4 where value = 'val_277'"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val row = df.take(1)(0)
+      assert(row.getString(1) === null, "value is hashed")
+    }
+  }
+
   test("MASK_SHOW_LAST_4") {
     val statement = "select * from default.rangertbl5 where value = 'val_277'"
     withUser("bob") {


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org