You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by li...@apache.org on 2020/03/21 13:22:46 UTC

[submarine] branch master updated: SUBMARINE-448. Make Spark Security's RangerSparkPlugin static

This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f75f51  SUBMARINE-448. Make Spark Security's RangerSparkPlugin static
9f75f51 is described below

commit 9f75f51205b69d5314bfbb1f1d546e8cbbd8f5c3
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Sat Mar 21 20:05:54 2020 +0800

    SUBMARINE-448. Make Spark Security's RangerSparkPlugin static
    
    ### What is this PR for?
    
    Now the RangerSparkPlugin is non-static and will be created and init times and times again, this should be an object, not a class in scala, to avoid high load policies decoding.
    
    ### What type of PR is it?
    Refactoring
    ### Todos
    
    ### What is the Jira issue?
    
    https://issues.apache.org/jira/browse/SUBMARINE-448
    
    ### How should this be tested?
    
    existing tests
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Kent Yao <ya...@hotmail.com>
    
    Closes #242 from yaooqinn/SUBMARINE-448 and squashes the following commits:
    
    9beaad5 [Kent Yao] SUBMARINE-448. Make RangerSparkPlugin static
---
 .../optimizer/SubmarineDataMaskingExtension.scala  |  5 ++--
 .../optimizer/SubmarineRowFilterExtension.scala    |  5 ++--
 .../spark/security/RangerSparkAuthorizer.scala     | 19 ++++++------
 .../spark/security/RangerSparkPlugin.scala         | 35 +++++-----------------
 .../src/test/resources/log4j.properties            |  2 ++
 .../spark/security/RangerAdminClientImpl.scala     | 19 +++++++-----
 6 files changed, 35 insertions(+), 50 deletions(-)

diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
index 2713f8d..5dad02a 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
@@ -55,7 +55,6 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic
     .map(x => CatalogFunction(FunctionIdentifier(x._1), x._2, Seq.empty))
     .foreach(spark.sessionState.catalog.registerFunction(_, overrideIfExists = true))
 
-  private lazy val sparkPlugin = RangerSparkPlugin.build().getOrCreate()
   private lazy val sqlParser = spark.sessionState.sqlParser
   private lazy val analyzer = spark.sessionState.analyzer
   private lazy val auditHandler = RangerSparkAuditHandler()
@@ -72,8 +71,8 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic
       currentUser.getGroupNames.toSet,
       COLUMN.toString,
       SparkAccessType.SELECT,
-      sparkPlugin.getClusterName)
-    sparkPlugin.evalDataMaskPolicies(req, auditHandler)
+      RangerSparkPlugin.getClusterName)
+    RangerSparkPlugin.evalDataMaskPolicies(req, auditHandler)
   }
 
   /**
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
index 18f9520..7641368 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
@@ -38,7 +38,6 @@ import org.apache.submarine.spark.security._
  * An Apache Spark's [[Optimizer]] extension for row level filtering.
  */
 case class SubmarineRowFilterExtension(spark: SparkSession) extends Rule[LogicalPlan] {
-  private lazy val sparkPlugin = RangerSparkPlugin.build().getOrCreate()
   private lazy val rangerSparkOptimizer = new SubmarineSparkOptimizer(spark)
 
   /**
@@ -56,8 +55,8 @@ case class SubmarineRowFilterExtension(spark: SparkSession) extends Rule[Logical
       val ugi = UserGroupInformation.getCurrentUser
       val request = new RangerSparkAccessRequest(resource, ugi.getShortUserName,
         ugi.getGroupNames.toSet, SparkObjectType.TABLE.toString, SparkAccessType.SELECT,
-        sparkPlugin.getClusterName)
-      val result = sparkPlugin.evalRowFilterPolicies(request, auditHandler)
+        RangerSparkPlugin.getClusterName)
+      val result = RangerSparkPlugin.evalRowFilterPolicies(request, auditHandler)
       if (isRowFilterEnabled(result)) {
         val condition = spark.sessionState.sqlParser.parseExpression(result.getFilterExpr)
         val analyzed = spark.sessionState.analyzer.execute(Filter(condition, plan))
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuthorizer.scala b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuthorizer.scala
index 45c1633..70fa093 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuthorizer.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuthorizer.scala
@@ -43,7 +43,6 @@ import org.apache.submarine.spark.security.SparkOperationType.SparkOperationType
 
 object RangerSparkAuthorizer {
   private val LOG = LogFactory.getLog(this.getClass.getSimpleName.stripSuffix("$"))
-  private val sparkPlugin = RangerSparkPlugin.build().getOrCreate()
 
   private def currentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
 
@@ -68,7 +67,7 @@ object RangerSparkAuthorizer {
       if (inputs.isEmpty && opType == SparkOperationType.SHOWDATABASES) {
         val resource = new RangerSparkResource(SparkObjectType.DATABASE, None)
         requests += new RangerSparkAccessRequest(resource, user, groups, opType.toString,
-          SparkAccessType.USE, sparkPlugin.getClusterName)
+          SparkAccessType.USE, RangerSparkPlugin.getClusterName)
       }
 
       def addAccessRequest(objs: Seq[SparkPrivilegeObject], isInput: Boolean): Unit = {
@@ -89,7 +88,7 @@ object RangerSparkAuthorizer {
               if (accessType != SparkAccessType.NONE && !requests.exists(
                 o => o.getSparkAccessType == accessType && o.getResource == resource)) {
                 requests += new RangerSparkAccessRequest(resource, user, groups, opType.toString,
-                  accessType, sparkPlugin.getClusterName)
+                  accessType, RangerSparkPlugin.getClusterName)
               }
             }
           }
@@ -102,7 +101,7 @@ object RangerSparkAuthorizer {
         val resource = request.getResource.asInstanceOf[RangerSparkResource]
         if (resource.getObjectType == SparkObjectType.COLUMN &&
           StringUtils.contains(resource.getColumn, ",")) {
-          resource.setServiceDef(sparkPlugin.getServiceDef)
+          resource.setServiceDef(RangerSparkPlugin.getServiceDef)
           val colReqs: JList[RangerAccessRequest] = resource.getColumn.split(",")
             .filter(StringUtils.isNotBlank).map { c =>
             val colRes = new RangerSparkResource(SparkObjectType.COLUMN,
@@ -111,7 +110,7 @@ object RangerSparkAuthorizer {
             colReq.setResource(colRes)
             colReq.asInstanceOf[RangerAccessRequest]
           }.toList.asJava
-          val colResults = sparkPlugin.isAccessAllowed(colReqs, auditHandler)
+          val colResults = RangerSparkPlugin.isAccessAllowed(colReqs, auditHandler)
           if (colResults != null) {
             for (c <- colResults.asScala) {
               if (c != null && !c.getIsAllowed) {
@@ -121,7 +120,7 @@ object RangerSparkAuthorizer {
             }
           }
         } else {
-          val result = sparkPlugin.isAccessAllowed(request, auditHandler)
+          val result = RangerSparkPlugin.isAccessAllowed(request, auditHandler)
           if (result != null && !result.getIsAllowed) {
             throw new SparkAccessControlException(s"Permission denied: user [$user] does not" +
               s" have [${request.getSparkAccessType}] privilege on [${resource.getAsString}]")
@@ -141,8 +140,8 @@ object RangerSparkAuthorizer {
     createSparkResource(obj) match {
       case Some(resource) =>
         val request =
-          new RangerSparkAccessRequest(resource, user, groups, sparkPlugin.getClusterName)
-        val result = sparkPlugin.isAccessAllowed(request)
+          new RangerSparkAccessRequest(resource, user, groups, RangerSparkPlugin.getClusterName)
+        val result = RangerSparkPlugin.isAccessAllowed(request)
         if (request == null) {
           LOG.error("Internal error: null RangerAccessResult received back from isAccessAllowed")
           false
@@ -255,7 +254,7 @@ object RangerSparkAuthorizer {
           obj.getColumns.mkString(","))
       case _ => null
     }
-    if (resource != null) resource.setServiceDef(sparkPlugin.getServiceDef)
+    if (resource != null) resource.setServiceDef(RangerSparkPlugin.getServiceDef)
     resource
   }
 
@@ -299,6 +298,6 @@ object RangerSparkAuthorizer {
   }
 
   private def isPathInFSScheme(objectName: String): Boolean = {
-    objectName.nonEmpty && sparkPlugin.fsScheme.exists(objectName.startsWith)
+    objectName.nonEmpty && RangerSparkPlugin.fsScheme.exists(objectName.startsWith)
   }
 }
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkPlugin.scala b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkPlugin.scala
index 58f9f8d..3e34ab7 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkPlugin.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkPlugin.scala
@@ -25,10 +25,13 @@ import org.apache.commons.logging.LogFactory
 import org.apache.ranger.authorization.hadoop.config.RangerConfiguration
 import org.apache.ranger.plugin.service.RangerBasePlugin
 
-class RangerSparkPlugin private extends RangerBasePlugin("spark", "sparkSql") {
-  import RangerSparkPlugin._
+object RangerSparkPlugin extends RangerBasePlugin("spark", "sparkSql") {
 
-  private val LOG = LogFactory.getLog(classOf[RangerSparkPlugin])
+  private val LOG = LogFactory.getLog(RangerSparkPlugin.getClass)
+
+  private val rangerConf: RangerConfiguration = RangerConfiguration.getInstance
+  val showColumnsOption: String = rangerConf.get(
+    "xasecure.spark.describetable.showcolumns.authorization.option", "NONE")
 
   lazy val fsScheme: Array[String] = RangerConfiguration.getInstance()
     .get("ranger.plugin.spark.urlauth.filesystem.schemes", "hdfs:,file:")
@@ -50,29 +53,7 @@ class RangerSparkPlugin private extends RangerBasePlugin("spark", "sparkSql") {
     }
     LOG.info("Policy cache directory successfully set to " + cacheDir.getAbsolutePath)
   }
-}
 
-object RangerSparkPlugin {
-
-  private val rangerConf: RangerConfiguration = RangerConfiguration.getInstance
-
-  val showColumnsOption: String = rangerConf.get(
-    "xasecure.spark.describetable.showcolumns.authorization.option", "NONE")
-
-  def build(): Builder = new Builder
-
-  class Builder {
-
-    @volatile private var sparkPlugin: RangerSparkPlugin = _
-
-    def getOrCreate(): RangerSparkPlugin = RangerSparkPlugin.synchronized {
-      if (sparkPlugin == null) {
-        sparkPlugin = new RangerSparkPlugin
-        sparkPlugin.init()
-        sparkPlugin
-      } else {
-        sparkPlugin
-      }
-    }
-  }
+  init()
 }
+
diff --git a/submarine-security/spark-security/src/test/resources/log4j.properties b/submarine-security/spark-security/src/test/resources/log4j.properties
index d3d93a7..d633dc8 100644
--- a/submarine-security/spark-security/src/test/resources/log4j.properties
+++ b/submarine-security/spark-security/src/test/resources/log4j.properties
@@ -21,3 +21,5 @@ log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=OFF
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RangerAdminClientImpl.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RangerAdminClientImpl.scala
index 14bca20..9e2b9bd 100644
--- a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RangerAdminClientImpl.scala
+++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RangerAdminClientImpl.scala
@@ -32,17 +32,22 @@ class RangerAdminClientImpl extends RangerAdminRESTClient {
   private val cacheFilename = "sparkSql_hive_jenkins.json"
   private val gson =
     new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create
-
-  override def init(serviceName: String, appId: String, configPropertyPrefix: String): Unit = {}
+  private var policies: ServicePolicies = _
+
+  override def init(serviceName: String, appId: String, configPropertyPrefix: String): Unit = {
+    if (policies == null) {
+      val basedir = this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath
+      val cachePath = FileSystems.getDefault.getPath(basedir, cacheFilename)
+      LOG.info("Reading policies from " + cachePath)
+      val bytes = Files.readAllBytes(cachePath)
+      policies = gson.fromJson(new String(bytes), classOf[ServicePolicies])
+    }
+  }
 
   override def getServicePoliciesIfUpdated(
       lastKnownVersion: Long,
       lastActivationTimeInMillis: Long): ServicePolicies = {
-    val basedir = this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath
-    val cachePath = FileSystems.getDefault.getPath(basedir, cacheFilename)
-    LOG.info("Reading policies from " + cachePath)
-    val bytes = Files.readAllBytes(cachePath)
-    gson.fromJson(new String(bytes), classOf[ServicePolicies])
+    policies
   }
 
   override def grantAccess(request: GrantRevokeRequest): Unit = {}


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org