You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by li...@apache.org on 2020/03/21 13:22:46 UTC
[submarine] branch master updated: SUBMARINE-448. Make Spark
Security's RangerSparkPlugin static
This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 9f75f51 SUBMARINE-448. Make Spark Security's RangerSparkPlugin static
9f75f51 is described below
commit 9f75f51205b69d5314bfbb1f1d546e8cbbd8f5c3
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Sat Mar 21 20:05:54 2020 +0800
SUBMARINE-448. Make Spark Security's RangerSparkPlugin static
### What is this PR for?
Now the RangerSparkPlugin is non-static and will be created and init times and times again, this should be an object, not a class in scala, to avoid high load policies decoding.
### What type of PR is it?
Refactoring
### Todos
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-448
### How should this be tested?
existing tests
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Kent Yao <ya...@hotmail.com>
Closes #242 from yaooqinn/SUBMARINE-448 and squashes the following commits:
9beaad5 [Kent Yao] SUBMARINE-448. Make RangerSparkPlugin static
---
.../optimizer/SubmarineDataMaskingExtension.scala | 5 ++--
.../optimizer/SubmarineRowFilterExtension.scala | 5 ++--
.../spark/security/RangerSparkAuthorizer.scala | 19 ++++++------
.../spark/security/RangerSparkPlugin.scala | 35 +++++-----------------
.../src/test/resources/log4j.properties | 2 ++
.../spark/security/RangerAdminClientImpl.scala | 19 +++++++-----
6 files changed, 35 insertions(+), 50 deletions(-)
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
index 2713f8d..5dad02a 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
@@ -55,7 +55,6 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic
.map(x => CatalogFunction(FunctionIdentifier(x._1), x._2, Seq.empty))
.foreach(spark.sessionState.catalog.registerFunction(_, overrideIfExists = true))
- private lazy val sparkPlugin = RangerSparkPlugin.build().getOrCreate()
private lazy val sqlParser = spark.sessionState.sqlParser
private lazy val analyzer = spark.sessionState.analyzer
private lazy val auditHandler = RangerSparkAuditHandler()
@@ -72,8 +71,8 @@ case class SubmarineDataMaskingExtension(spark: SparkSession) extends Rule[Logic
currentUser.getGroupNames.toSet,
COLUMN.toString,
SparkAccessType.SELECT,
- sparkPlugin.getClusterName)
- sparkPlugin.evalDataMaskPolicies(req, auditHandler)
+ RangerSparkPlugin.getClusterName)
+ RangerSparkPlugin.evalDataMaskPolicies(req, auditHandler)
}
/**
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
index 18f9520..7641368 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
@@ -38,7 +38,6 @@ import org.apache.submarine.spark.security._
* An Apache Spark's [[Optimizer]] extension for row level filtering.
*/
case class SubmarineRowFilterExtension(spark: SparkSession) extends Rule[LogicalPlan] {
- private lazy val sparkPlugin = RangerSparkPlugin.build().getOrCreate()
private lazy val rangerSparkOptimizer = new SubmarineSparkOptimizer(spark)
/**
@@ -56,8 +55,8 @@ case class SubmarineRowFilterExtension(spark: SparkSession) extends Rule[Logical
val ugi = UserGroupInformation.getCurrentUser
val request = new RangerSparkAccessRequest(resource, ugi.getShortUserName,
ugi.getGroupNames.toSet, SparkObjectType.TABLE.toString, SparkAccessType.SELECT,
- sparkPlugin.getClusterName)
- val result = sparkPlugin.evalRowFilterPolicies(request, auditHandler)
+ RangerSparkPlugin.getClusterName)
+ val result = RangerSparkPlugin.evalRowFilterPolicies(request, auditHandler)
if (isRowFilterEnabled(result)) {
val condition = spark.sessionState.sqlParser.parseExpression(result.getFilterExpr)
val analyzed = spark.sessionState.analyzer.execute(Filter(condition, plan))
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuthorizer.scala b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuthorizer.scala
index 45c1633..70fa093 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuthorizer.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkAuthorizer.scala
@@ -43,7 +43,6 @@ import org.apache.submarine.spark.security.SparkOperationType.SparkOperationType
object RangerSparkAuthorizer {
private val LOG = LogFactory.getLog(this.getClass.getSimpleName.stripSuffix("$"))
- private val sparkPlugin = RangerSparkPlugin.build().getOrCreate()
private def currentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
@@ -68,7 +67,7 @@ object RangerSparkAuthorizer {
if (inputs.isEmpty && opType == SparkOperationType.SHOWDATABASES) {
val resource = new RangerSparkResource(SparkObjectType.DATABASE, None)
requests += new RangerSparkAccessRequest(resource, user, groups, opType.toString,
- SparkAccessType.USE, sparkPlugin.getClusterName)
+ SparkAccessType.USE, RangerSparkPlugin.getClusterName)
}
def addAccessRequest(objs: Seq[SparkPrivilegeObject], isInput: Boolean): Unit = {
@@ -89,7 +88,7 @@ object RangerSparkAuthorizer {
if (accessType != SparkAccessType.NONE && !requests.exists(
o => o.getSparkAccessType == accessType && o.getResource == resource)) {
requests += new RangerSparkAccessRequest(resource, user, groups, opType.toString,
- accessType, sparkPlugin.getClusterName)
+ accessType, RangerSparkPlugin.getClusterName)
}
}
}
@@ -102,7 +101,7 @@ object RangerSparkAuthorizer {
val resource = request.getResource.asInstanceOf[RangerSparkResource]
if (resource.getObjectType == SparkObjectType.COLUMN &&
StringUtils.contains(resource.getColumn, ",")) {
- resource.setServiceDef(sparkPlugin.getServiceDef)
+ resource.setServiceDef(RangerSparkPlugin.getServiceDef)
val colReqs: JList[RangerAccessRequest] = resource.getColumn.split(",")
.filter(StringUtils.isNotBlank).map { c =>
val colRes = new RangerSparkResource(SparkObjectType.COLUMN,
@@ -111,7 +110,7 @@ object RangerSparkAuthorizer {
colReq.setResource(colRes)
colReq.asInstanceOf[RangerAccessRequest]
}.toList.asJava
- val colResults = sparkPlugin.isAccessAllowed(colReqs, auditHandler)
+ val colResults = RangerSparkPlugin.isAccessAllowed(colReqs, auditHandler)
if (colResults != null) {
for (c <- colResults.asScala) {
if (c != null && !c.getIsAllowed) {
@@ -121,7 +120,7 @@ object RangerSparkAuthorizer {
}
}
} else {
- val result = sparkPlugin.isAccessAllowed(request, auditHandler)
+ val result = RangerSparkPlugin.isAccessAllowed(request, auditHandler)
if (result != null && !result.getIsAllowed) {
throw new SparkAccessControlException(s"Permission denied: user [$user] does not" +
s" have [${request.getSparkAccessType}] privilege on [${resource.getAsString}]")
@@ -141,8 +140,8 @@ object RangerSparkAuthorizer {
createSparkResource(obj) match {
case Some(resource) =>
val request =
- new RangerSparkAccessRequest(resource, user, groups, sparkPlugin.getClusterName)
- val result = sparkPlugin.isAccessAllowed(request)
+ new RangerSparkAccessRequest(resource, user, groups, RangerSparkPlugin.getClusterName)
+ val result = RangerSparkPlugin.isAccessAllowed(request)
if (request == null) {
LOG.error("Internal error: null RangerAccessResult received back from isAccessAllowed")
false
@@ -255,7 +254,7 @@ object RangerSparkAuthorizer {
obj.getColumns.mkString(","))
case _ => null
}
- if (resource != null) resource.setServiceDef(sparkPlugin.getServiceDef)
+ if (resource != null) resource.setServiceDef(RangerSparkPlugin.getServiceDef)
resource
}
@@ -299,6 +298,6 @@ object RangerSparkAuthorizer {
}
private def isPathInFSScheme(objectName: String): Boolean = {
- objectName.nonEmpty && sparkPlugin.fsScheme.exists(objectName.startsWith)
+ objectName.nonEmpty && RangerSparkPlugin.fsScheme.exists(objectName.startsWith)
}
}
diff --git a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkPlugin.scala b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkPlugin.scala
index 58f9f8d..3e34ab7 100644
--- a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkPlugin.scala
+++ b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkPlugin.scala
@@ -25,10 +25,13 @@ import org.apache.commons.logging.LogFactory
import org.apache.ranger.authorization.hadoop.config.RangerConfiguration
import org.apache.ranger.plugin.service.RangerBasePlugin
-class RangerSparkPlugin private extends RangerBasePlugin("spark", "sparkSql") {
- import RangerSparkPlugin._
+object RangerSparkPlugin extends RangerBasePlugin("spark", "sparkSql") {
- private val LOG = LogFactory.getLog(classOf[RangerSparkPlugin])
+ private val LOG = LogFactory.getLog(RangerSparkPlugin.getClass)
+
+ private val rangerConf: RangerConfiguration = RangerConfiguration.getInstance
+ val showColumnsOption: String = rangerConf.get(
+ "xasecure.spark.describetable.showcolumns.authorization.option", "NONE")
lazy val fsScheme: Array[String] = RangerConfiguration.getInstance()
.get("ranger.plugin.spark.urlauth.filesystem.schemes", "hdfs:,file:")
@@ -50,29 +53,7 @@ class RangerSparkPlugin private extends RangerBasePlugin("spark", "sparkSql") {
}
LOG.info("Policy cache directory successfully set to " + cacheDir.getAbsolutePath)
}
-}
-object RangerSparkPlugin {
-
- private val rangerConf: RangerConfiguration = RangerConfiguration.getInstance
-
- val showColumnsOption: String = rangerConf.get(
- "xasecure.spark.describetable.showcolumns.authorization.option", "NONE")
-
- def build(): Builder = new Builder
-
- class Builder {
-
- @volatile private var sparkPlugin: RangerSparkPlugin = _
-
- def getOrCreate(): RangerSparkPlugin = RangerSparkPlugin.synchronized {
- if (sparkPlugin == null) {
- sparkPlugin = new RangerSparkPlugin
- sparkPlugin.init()
- sparkPlugin
- } else {
- sparkPlugin
- }
- }
- }
+ init()
}
+
diff --git a/submarine-security/spark-security/src/test/resources/log4j.properties b/submarine-security/spark-security/src/test/resources/log4j.properties
index d3d93a7..d633dc8 100644
--- a/submarine-security/spark-security/src/test/resources/log4j.properties
+++ b/submarine-security/spark-security/src/test/resources/log4j.properties
@@ -21,3 +21,5 @@ log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=OFF
diff --git a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RangerAdminClientImpl.scala b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RangerAdminClientImpl.scala
index 14bca20..9e2b9bd 100644
--- a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RangerAdminClientImpl.scala
+++ b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RangerAdminClientImpl.scala
@@ -32,17 +32,22 @@ class RangerAdminClientImpl extends RangerAdminRESTClient {
private val cacheFilename = "sparkSql_hive_jenkins.json"
private val gson =
new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create
-
- override def init(serviceName: String, appId: String, configPropertyPrefix: String): Unit = {}
+ private var policies: ServicePolicies = _
+
+ override def init(serviceName: String, appId: String, configPropertyPrefix: String): Unit = {
+ if (policies == null) {
+ val basedir = this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath
+ val cachePath = FileSystems.getDefault.getPath(basedir, cacheFilename)
+ LOG.info("Reading policies from " + cachePath)
+ val bytes = Files.readAllBytes(cachePath)
+ policies = gson.fromJson(new String(bytes), classOf[ServicePolicies])
+ }
+ }
override def getServicePoliciesIfUpdated(
lastKnownVersion: Long,
lastActivationTimeInMillis: Long): ServicePolicies = {
- val basedir = this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath
- val cachePath = FileSystems.getDefault.getPath(basedir, cacheFilename)
- LOG.info("Reading policies from " + cachePath)
- val bytes = Files.readAllBytes(cachePath)
- gson.fromJson(new String(bytes), classOf[ServicePolicies])
+ policies
}
override def grantAccess(request: GrantRevokeRequest): Unit = {}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org