You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by me...@apache.org on 2018/12/19 11:50:55 UTC
[ranger] 05/39: RANGER-2222:Apache RangerKafkaPlugin support to
handle Kafka Cluster as a new resource
This is an automated email from the ASF dual-hosted git repository.
mehul pushed a commit to branch ranger-1.1
in repository https://gitbox.apache.org/repos/asf/ranger.git
commit 78064a2a3bb0512f6d10693fca21b883f272c227
Author: rmani <rm...@hortonworks.com>
AuthorDate: Mon Oct 8 12:09:34 2018 -0700
RANGER-2222:Apache RangerKafkaPlugin support to handle Kafka Cluster as a new resource
Signed-off-by: rmani <rm...@hortonworks.com>
---
.../service-defs/ranger-servicedef-kafka.json | 49 ++-
.../kafka/authorizer/RangerKafkaAuditHandler.java | 74 ++++
.../kafka/authorizer/RangerKafkaAuthorizer.java | 16 +-
.../authorizer/KafkaRangerAuthorizerGSSTest.java | 1 -
.../authorizer/KafkaRangerTopicCreationTest.java | 191 +++++++++++
.../src/test/resources/kafka-policies.json | 198 ++++++++++-
.../src/test/resources/kafka_kerberos.jaas | 8 +-
.../optimized/current/ranger_core_db_mysql.sql | 1 +
.../optimized/current/ranger_core_db_oracle.sql | 1 +
.../optimized/current/ranger_core_db_postgres.sql | 1 +
.../current/ranger_core_db_sqlanywhere.sql | 2 +
.../optimized/current/ranger_core_db_sqlserver.sql | 1 +
.../PatchForKafkaServiceDefUpdate_J10025.java | 381 +++++++++++++++++++++
src/main/assembly/plugin-kafka.xml | 1 -
14 files changed, 900 insertions(+), 25 deletions(-)
diff --git a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
index ca3e0fe..7e91aab 100644
--- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
+++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
@@ -23,13 +23,15 @@
"validationMessage":"",
"uiHint":"",
"label":"Topic",
- "description":"Topic"
+ "description":"Topic",
+ "accessTypeRestrictions": ["create", "delete", "configure", "alter_configs", "describe", "describe_configs", "consume", "publish", "kafka_admin"]
},
{
"itemId":2,
"name":"transactionalid",
"type":"string",
"level":1,
+ "mandatory":true,
"excludesSupported":true,
"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
"matcherOptions":{
@@ -37,9 +39,41 @@
"ignoreCase":true
},
"label":"Transactional Id",
- "description":"Transactional Id"
+ "description":"Transactional Id",
+ "accessTypeRestrictions": ["publish", "describe"]
+ },
+ {
+ "itemId":3,
+ "name":"cluster",
+ "type":"string",
+ "level":1,
+ "mandatory":true,
+ "excludesSupported":true,
+ "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+ "matcherOptions":{
+ "wildCard":true,
+ "ignoreCase":true
+ },
+ "label":"Cluster",
+ "description":"Cluster",
+ "accessTypeRestrictions": ["configure", "alter_configs", "describe", "describe_configs", "kafka_admin", "idempotent_write"]
+ },
+ {
+ "itemId":4,
+ "name":"delegationtoken",
+ "type":"string",
+ "level":1,
+ "mandatory":true,
+ "excludesSupported":true,
+ "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+ "matcherOptions":{
+ "wildCard":true,
+ "ignoreCase":true
+ },
+ "label":"Delegation Token",
+ "description":"Delegation Token",
+ "accessTypeRestrictions": ["describe"]
}
-
],
"accessTypes":[
{
@@ -49,7 +83,6 @@
"impliedGrants":[
"describe"
]
-
},
{
"itemId":2,
@@ -58,7 +91,6 @@
"impliedGrants":[
"describe"
]
-
},
{
"itemId":5,
@@ -67,7 +99,6 @@
"impliedGrants":[
"describe"
]
-
},
{
"itemId":6,
@@ -99,7 +130,6 @@
"create",
"delete"
]
-
},
{
"itemId":10,
@@ -150,13 +180,10 @@
"mandatory":false,
"label":"Ranger Plugin SSL CName"
}
-
],
"enums":[
-
],
"contextEnrichers":[
-
],
"policyConditions":[
{
@@ -164,7 +191,6 @@
"name":"ip-range",
"evaluator":"org.apache.ranger.plugin.conditionevaluator.RangerIpMatcher",
"evaluatorOptions":{
-
},
"validationRegEx":"",
"validationMessage":"",
@@ -172,6 +198,5 @@
"label":"IP Address Range",
"description":"IP Address Range"
}
-
]
}
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java
new file mode 100644
index 0000000..ee50e95
--- /dev/null
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.ranger.authorization.kafka.authorizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+
+public class RangerKafkaAuditHandler extends RangerDefaultAuditHandler {
+ private static final Log LOG = LogFactory.getLog(RangerKafkaAuditHandler.class);
+
+ private AuthzAuditEvent auditEvent = null;
+
+ public RangerKafkaAuditHandler(){
+ }
+
+ @Override
+ public void processResult(RangerAccessResult result) {
+ // If Cluster Resource Level Topic Creation is not Allowed we don't audit.
+ // Subsequent call from Kafka for Topic Creation at Topic resource Level will be audited.
+ if (!isAuditingNeeded(result)) {
+ return;
+ }
+ auditEvent = super.getAuthzEvents(result);
+ }
+
+ private boolean isAuditingNeeded(final RangerAccessResult result) {
+ boolean ret = true;
+ boolean isAllowed = result.getIsAllowed();
+ RangerAccessRequest request = result.getAccessRequest();
+ RangerAccessResourceImpl resource = (RangerAccessResourceImpl) request.getResource();
+ String resourceName = (String) resource.getValue(RangerKafkaAuthorizer.KEY_CLUSTER);
+ if (resourceName != null) {
+ if (request.getAccessType().equalsIgnoreCase(RangerKafkaAuthorizer.ACCESS_TYPE_CREATE) && !isAllowed) {
+ ret = false;
+ }
+ }
+ return ret;
+ }
+
+ public void flushAudit() {
+ if(LOG.isDebugEnabled()) {
+ LOG.info("==> RangerYarnAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent + ")");
+ }
+ if (auditEvent != null) {
+ super.logAuthzAudit(auditEvent);
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.info("<== RangerYarnAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent + ")");
+ }
+ }
+}
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index eab869a..8a661d8 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -40,7 +40,6 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.authenticator.LoginManager;
import org.apache.kafka.common.security.kerberos.KerberosLogin;
import org.apache.ranger.audit.provider.MiscUtil;
-import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
@@ -59,6 +58,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
public static final String KEY_CLUSTER = "cluster";
public static final String KEY_CONSUMER_GROUP = "consumer_group";
public static final String KEY_TRANSACTIONALID = "transactionalid";
+ public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
public static final String ACCESS_TYPE_READ = "consume";
public static final String ACCESS_TYPE_WRITE = "publish";
@@ -72,6 +72,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
private static volatile RangerBasePlugin rangerPlugin = null;
+ RangerKafkaAuditHandler auditHandler = null;
public RangerKafkaAuthorizer() {
}
@@ -115,7 +116,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
}
logger.info("Calling plugin.init()");
rangerPlugin.init();
- RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler();
+ auditHandler = new RangerKafkaAuditHandler();
rangerPlugin.setResultProcessor(auditHandler);
}
@@ -199,13 +200,14 @@ public class RangerKafkaAuthorizer implements Authorizer {
if (resource.resourceType().equals(Topic$.MODULE$)) {
rangerResource.setValue(KEY_TOPIC, resource.name());
- } else if (resource.resourceType().equals(Cluster$.MODULE$)) { //NOPMD
- // CLUSTER should go as null
- // rangerResource.setValue(KEY_CLUSTER, resource.name());
+ } else if (resource.resourceType().equals(Cluster$.MODULE$)) {
+ rangerResource.setValue(KEY_CLUSTER, resource.name());
} else if (resource.resourceType().equals(Group$.MODULE$)) {
rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
- rangerResource.setValue(KEY_TRANSACTIONALID,resource.name());
+ rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
+ } else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
+ rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
} else {
logger.fatal("Unsupported resourceType=" + resource.resourceType());
validationFailed = true;
@@ -228,6 +230,8 @@ public class RangerKafkaAuthorizer implements Authorizer {
} catch (Throwable t) {
logger.error("Error while calling isAccessAllowed(). request="
+ rangerRequest, t);
+ } finally {
+ auditHandler.flushAudit();
}
}
RangerPerfTracer.log(perf);
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
index c1386fe..43e88b5 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
@@ -308,7 +308,6 @@ public class KafkaRangerAuthorizerGSSTest {
producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
producer.flush();
record.get();
- Assert.fail("Authorization failure expected");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains("Not authorized to access topics"));
}
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
new file mode 100644
index 0000000..a12817e
--- /dev/null
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.authorization.kafka.authorizer;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+public class KafkaRangerTopicCreationTest {
+ private final static Logger LOG = LoggerFactory.getLogger(KafkaRangerTopicCreationTest.class);
+
+ private static KafkaServerStartable kafkaServer;
+ private static TestingServer zkServer;
+ private static int port;
+ private static Path tempDir;
+ private static SimpleKdcServer kerbyServer;
+
+ @org.junit.BeforeClass
+ public static void setup() throws Exception {
+ String basedir = System.getProperty("basedir");
+ if (basedir == null) {
+ basedir = new File(".").getCanonicalPath();
+ }
+ System.out.println("Base Dir " + basedir);
+
+ configureKerby(basedir);
+
+ // JAAS Config file - We need to point to the correct keytab files
+ Path path = FileSystems.getDefault().getPath(basedir, "/src/test/resources/kafka_kerberos.jaas");
+ String content = new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
+ content = content.replaceAll("<basedir>", basedir);
+ //content = content.replaceAll("zookeeper/localhost", "zookeeper/" + address);
+
+ Path path2 = FileSystems.getDefault().getPath(basedir, "/target/test-classes/kafka_kerberos.jaas");
+ Files.write(path2, content.getBytes(StandardCharsets.UTF_8));
+
+ System.setProperty("java.security.auth.login.config", path2.toString());
+
+ // Set up Zookeeper to require SASL
+ Map<String,Object> zookeeperProperties = new HashMap<>();
+ zookeeperProperties.put("authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+ zookeeperProperties.put("requireClientAuthScheme", "sasl");
+ zookeeperProperties.put("jaasLoginRenew", "3600000");
+
+ InstanceSpec instanceSpec = new InstanceSpec(null, -1, -1, -1, true, 1,-1, -1, zookeeperProperties, "localhost");
+
+ zkServer = new TestingServer(instanceSpec, true);
+
+ // Get a random port
+ ServerSocket serverSocket = new ServerSocket(0);
+ port = serverSocket.getLocalPort();
+ serverSocket.close();
+
+ tempDir = Files.createTempDirectory("kafka");
+
+ LOG.info("Port is {}", port);
+ LOG.info("Temporary directory is at {}", tempDir);
+
+ final Properties props = new Properties();
+ props.put("broker.id", 1);
+ props.put("host.name", "localhost");
+ props.put("port", port);
+ props.put("log.dir", tempDir.toString());
+ props.put("zookeeper.connect", zkServer.getConnectString());
+ props.put("replica.socket.timeout.ms", "1500");
+ props.put("controlled.shutdown.enable", Boolean.TRUE.toString());
+ // Enable SASL_PLAINTEXT
+ props.put("listeners", "SASL_PLAINTEXT://localhost:" + port);
+ props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+ props.put("sasl.enabled.mechanisms", "GSSAPI");
+ props.put("sasl.mechanism.inter.broker.protocol", "GSSAPI");
+ props.put("sasl.kerberos.service.name", "kafka");
+ props.put("offsets.topic.replication.factor", (short) 1);
+ props.put("offsets.topic.num.partitions", 1);
+
+ // Plug in Apache Ranger authorizer
+ props.put("authorizer.class.name", "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer");
+
+ // Create users for testing
+ UserGroupInformation.createUserForTesting("client@kafka.apache.org", new String[] {"public"});
+ UserGroupInformation.createUserForTesting("kafka/localhost@kafka.apache.org", new String[] {"IT"});
+
+ KafkaConfig config = new KafkaConfig(props);
+ kafkaServer = new KafkaServerStartable(config);
+ kafkaServer.startup();
+ }
+
+ private static void configureKerby(String baseDir) throws Exception {
+
+ //System.setProperty("sun.security.krb5.debug", "true");
+ System.setProperty("java.security.krb5.conf", baseDir + "/target/krb5.conf");
+
+ kerbyServer = new SimpleKdcServer();
+
+ kerbyServer.setKdcRealm("kafka.apache.org");
+ kerbyServer.setAllowUdp(false);
+ kerbyServer.setWorkDir(new File(baseDir + "/target"));
+
+ kerbyServer.init();
+
+ // Create principals
+ String zookeeper = "zookeeper/localhost@kafka.apache.org";
+ String kafka = "kafka/localhost@kafka.apache.org";
+ String client = "client@kafka.apache.org";
+
+ kerbyServer.createPrincipal(zookeeper, "zookeeper");
+ File keytabFile = new File(baseDir + "/target/zookeeper.keytab");
+ kerbyServer.exportPrincipal(zookeeper, keytabFile);
+
+ kerbyServer.createPrincipal(kafka, "kafka");
+ keytabFile = new File(baseDir + "/target/kafka.keytab");
+ kerbyServer.exportPrincipal(kafka, keytabFile);
+
+ kerbyServer.createPrincipal(client, "client");
+ keytabFile = new File(baseDir + "/target/client.keytab");
+ kerbyServer.exportPrincipal(client, keytabFile);
+
+ kerbyServer.start();
+ }
+
+ @org.junit.AfterClass
+ public static void cleanup() throws Exception {
+ if (kafkaServer != null) {
+ kafkaServer.shutdown();
+ }
+ if (zkServer != null) {
+ zkServer.stop();
+ }
+ if (kerbyServer != null) {
+ kerbyServer.stop();
+ }
+ }
+
+ @Test
+ public void testCreateTopic() throws Exception {
+ final String topic = "test";
+ Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + port);
+ properties.put("client.id", "test-consumer-id");
+ properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ AdminClient client = KafkaAdminClient.create(properties);
+ CreateTopicsResult result = client.createTopics(Arrays.asList(new NewTopic(topic, 1, (short) 1)));
+ result.values().get(topic).get();
+ for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
+ System.out.println("Create Topic : " + entry.getKey() + " " +
+ "isCancelled : " + entry.getValue().isCancelled() + " " +
+ "isCompletedExceptionally : " + entry.getValue().isCompletedExceptionally() + " " +
+ "isDone : " + entry.getValue().isDone());
+ }
+ }
+}
diff --git a/plugin-kafka/src/test/resources/kafka-policies.json b/plugin-kafka/src/test/resources/kafka-policies.json
index 0c07604..e4f5db1 100644
--- a/plugin-kafka/src/test/resources/kafka-policies.json
+++ b/plugin-kafka/src/test/resources/kafka-policies.json
@@ -6,6 +6,84 @@
"policies": [
{
"service": "cl1_kafka",
+ "name": "all - cluster",
+ "policyType": 0,
+ "description": "Policy for all - cluster",
+ "isAuditEnabled": true,
+ "resources": {
+ "cluster": {
+ "values": [
+ "*"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ }
+ },
+ "policyItems": [
+ {
+ "accesses": [
+ {
+ "type": "publish",
+ "isAllowed": true
+ },
+ {
+ "type": "consume",
+ "isAllowed": true
+ },
+ {
+ "type": "configure",
+ "isAllowed": true
+ },
+ {
+ "type": "describe",
+ "isAllowed": true
+ },
+ {
+ "type": "create",
+ "isAllowed": true
+ },
+ {
+ "type": "delete",
+ "isAllowed": true
+ },
+ {
+ "type": "kafka_admin",
+ "isAllowed": true
+ },
+ {
+ "type": "idempotent_write",
+ "isAllowed": true
+ },
+ {
+ "type": "describe_configs",
+ "isAllowed": true
+ },
+ {
+ "type": "alter_configs",
+ "isAllowed": true
+ }
+ ],
+ "users": [
+ "admin","kafka"
+ ],
+ "groups": [
+ "IT"
+ ],
+ "conditions": [],
+ "delegateAdmin": true
+ }
+ ],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [],
+ "rowFilterPolicyItems": [],
+ "id": 40,
+ "isEnabled": true,
+ "version": 2
+ },
+ {
+ "service": "cl1_kafka",
"name": "all - topic",
"policyType": 0,
"description": "Policy for all - topic",
@@ -64,7 +142,7 @@
}
],
"users": [
- "admin","kafka"
+ "admin","kafka", "client"
],
"groups": [
"IT"
@@ -243,6 +321,84 @@
"id": 30,
"isEnabled": true,
"version": 1
+ },
+ {
+ "service": "cl1_kafka",
+ "name": "DelegationToken Policy",
+ "policyType": 0,
+ "description": "DelegationTokenPolicy",
+ "isAuditEnabled": true,
+ "resources": {
+ "delegationtoken": {
+ "values": [
+ "*"
+ ],
+ "isExcludes": false,
+ "isRecursive": false
+ }
+ },
+ "policyItems": [
+ {
+ "accesses": [
+ {
+ "type": "publish",
+ "isAllowed": true
+ },
+ {
+ "type": "consume",
+ "isAllowed": true
+ },
+ {
+ "type": "configure",
+ "isAllowed": true
+ },
+ {
+ "type": "describe",
+ "isAllowed": true
+ },
+ {
+ "type": "create",
+ "isAllowed": true
+ },
+ {
+ "type": "delete",
+ "isAllowed": true
+ },
+ {
+ "type": "kafka_admin",
+ "isAllowed": true
+ },
+ {
+ "type": "idempotent_write",
+ "isAllowed": true
+ },
+ {
+ "type": "describe_configs",
+ "isAllowed": true
+ },
+ {
+ "type": "alter_configs",
+ "isAllowed": true
+ }
+ ],
+ "users": [
+ "admin","kafka", "client"
+ ],
+ "groups": [
+ "IT"
+ ],
+ "conditions": [],
+ "delegateAdmin": true
+ }
+ ],
+ "denyPolicyItems": [],
+ "allowExceptions": [],
+ "denyExceptions": [],
+ "dataMaskPolicyItems": [],
+ "rowFilterPolicyItems": [],
+ "id": 31,
+ "isEnabled": true,
+ "version": 2
}
],
"serviceDef": {
@@ -322,6 +478,46 @@
"uiHint":"",
"label":"Transactional Id",
"description":"Transactional Id"
+ },
+ {
+ "itemId":3,
+ "name":"cluster",
+ "type":"string",
+ "level":1,
+ "mandatory":true,
+ "lookupSupported":false,
+ "recursiveSupported":false,
+ "excludesSupported":true,
+ "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+ "matcherOptions":{
+ "wildCard":true,
+ "ignoreCase":true
+ },
+ "validationRegEx":"",
+ "validationMessage":"",
+ "uiHint":"",
+ "label":"Cluster",
+ "description":"Cluster"
+ },
+ {
+ "itemId":4,
+ "name":"delegationtoken",
+ "type":"string",
+ "level":1,
+ "mandatory":true,
+ "lookupSupported":false,
+ "recursiveSupported":false,
+ "excludesSupported":true,
+ "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+ "matcherOptions":{
+ "wildCard":true,
+ "ignoreCase":true
+ },
+ "validationRegEx":"",
+ "validationMessage":"",
+ "uiHint":"",
+ "label":"Delegation Token",
+ "description":"Delegation Token"
}
],
"accessTypes": [
diff --git a/plugin-kafka/src/test/resources/kafka_kerberos.jaas b/plugin-kafka/src/test/resources/kafka_kerberos.jaas
index 1de804b..2e83c7c 100644
--- a/plugin-kafka/src/test/resources/kafka_kerberos.jaas
+++ b/plugin-kafka/src/test/resources/kafka_kerberos.jaas
@@ -1,20 +1,20 @@
Server {
- com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true
+ com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true serviceName="kafka"
keyTab="<basedir>/target/zookeeper.keytab" storeKey=true principal="zookeeper/localhost";
};
KafkaServer {
- com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true
+ com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true serviceName="kafka"
keyTab="<basedir>/target/kafka.keytab" storeKey=true principal="kafka/localhost";
};
Client {
- com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true
+ com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true serviceName="kafka"
keyTab="<basedir>/target/kafka.keytab" storeKey=true principal="kafka/localhost";
};
KafkaClient {
- com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true
+ com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true serviceName="kafka"
keyTab="<basedir>/target/client.keytab" storeKey=true principal="client";
};
diff --git a/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql b/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
index f743a65..0066339 100644
--- a/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
+++ b/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
@@ -1430,4 +1430,5 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10019',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10020',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10025',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
+INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10018',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('JAVA_PATCHES',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
diff --git a/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql b/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
index a7d6f73..de12102 100644
--- a/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
+++ b/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
@@ -1575,5 +1575,6 @@ INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,act
INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'J10019',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'J10020',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'J10025',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
+INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'J10018',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'JAVA_PATCHES',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
commit;
diff --git a/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql b/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
index 8969c26..35a133a 100644
--- a/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
+++ b/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
@@ -1519,6 +1519,7 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10019',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10020',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10025',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
+INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10018',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('JAVA_PATCHES',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
DROP VIEW IF EXISTS vx_trx_log;
diff --git a/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql b/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
index 9dc2515..43da93f 100644
--- a/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
+++ b/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
@@ -1869,6 +1869,8 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active
GO
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10025',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
GO
+INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10018,CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
+GO
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('JAVA_PATCHES',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
GO
exit
diff --git a/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql b/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
index 8351c70..a1ac530 100644
--- a/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
+++ b/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
@@ -3289,6 +3289,7 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10019',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10020',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10025',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
+INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10018',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('JAVA_PATCHES',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
GO
CREATE VIEW [dbo].[vx_trx_log] AS
diff --git a/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java
new file mode 100644
index 0000000..0ef1544
--- /dev/null
+++ b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.patch;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.ranger.biz.RangerBizUtil;
+import org.apache.ranger.biz.ServiceDBStore;
+import org.apache.ranger.common.GUIDUtil;
+import org.apache.ranger.common.JSONUtil;
+import org.apache.ranger.common.RangerValidatorFactory;
+import org.apache.ranger.common.StringUtil;
+import org.apache.ranger.db.RangerDaoManager;
+import org.apache.ranger.entity.XXAccessTypeDef;
+import org.apache.ranger.entity.XXPolicy;
+import org.apache.ranger.entity.XXPolicyItem;
+import org.apache.ranger.entity.XXPolicyItemAccess;
+import org.apache.ranger.entity.XXPolicyItemUserPerm;
+import org.apache.ranger.entity.XXPolicyResource;
+import org.apache.ranger.entity.XXPolicyResourceMap;
+import org.apache.ranger.entity.XXPortalUser;
+import org.apache.ranger.entity.XXResourceDef;
+import org.apache.ranger.entity.XXService;
+import org.apache.ranger.entity.XXServiceDef;
+import org.apache.ranger.entity.XXUser;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerPolicyResourceSignature;
+import org.apache.ranger.plugin.model.RangerServiceDef;
+import org.apache.ranger.plugin.model.validation.RangerServiceDefValidator;
+import org.apache.ranger.plugin.model.validation.RangerValidator.Action;
+import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil;
+import org.apache.ranger.service.RangerPolicyService;
+import org.apache.ranger.service.XPermMapService;
+import org.apache.ranger.service.XPolicyService;
+import org.apache.ranger.util.CLIUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class PatchForKafkaServiceDefUpdate_J10025 extends BaseLoader {
+ private static final Logger logger = Logger.getLogger(PatchForKafkaServiceDefUpdate_J10025.class);
+ private static final List<String> POLICY_NAMES = new ArrayList<>(Arrays.asList("all - cluster", "all - delegationtoken"));
+ private static final String LOGIN_ID_ADMIN = "admin";
+ private static final String KAFKA_RESOURCE_CLUSTER = "cluster";
+ private static final String KAFKA_RESOURCE_DELEGATIONTOKEN = "delegationtoken";
+
+ private static final List<String> DEFAULT_POLICY_USERS = new ArrayList<>(Arrays.asList("kafka","rangerlookup"));
+
+
+ public static final String SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME = "kafka";
+ public static final String CLUSTER_RESOURCE_NAME ="cluster";
+
+
+ @Autowired
+ RangerDaoManager daoMgr;
+
+ @Autowired
+ ServiceDBStore svcDBStore;
+
+ @Autowired
+ JSONUtil jsonUtil;
+
+ @Autowired
+ RangerPolicyService policyService;
+
+ @Autowired
+ StringUtil stringUtil;
+
+ @Autowired
+ GUIDUtil guidUtil;
+
+ @Autowired
+ XPolicyService xPolService;
+
+ @Autowired
+ XPermMapService xPermMapService;
+
+ @Autowired
+ RangerBizUtil bizUtil;
+
+ @Autowired
+ RangerValidatorFactory validatorFactory;
+
+ @Autowired
+ ServiceDBStore svcStore;
+
+ public static void main(String[] args) {
+ logger.info("main()");
+ try {
+ PatchForKafkaServiceDefUpdate_J10025 loader = (PatchForKafkaServiceDefUpdate_J10025) CLIUtil.getBean(PatchForKafkaServiceDefUpdate_J10025.class);
+ loader.init();
+ while (loader.isMoreToProcess()) {
+ loader.load();
+ }
+ logger.info("Load complete. Exiting!!!");
+ System.exit(0);
+ } catch (Exception e) {
+ logger.error("Error loading", e);
+ System.exit(1);
+ }
+ }
+
+ @Override
+ public void init() throws Exception {
+ // Do Nothing
+ }
+
+ @Override
+ public void execLoad() {
+ logger.info("==> PatchForKafkaServiceDefUpdate_J10025.execLoad()");
+ try {
+ updateKafkaServiceDef();
+ } catch (Exception e) {
+ logger.error("Error while applying PatchForKafkaServiceDefUpdate_J10025...", e);
+ }
+ logger.info("<== PatchForKafkaServiceDefUpdate_J10025.execLoad()");
+ }
+
+ @Override
+ public void printStats() {
+ logger.info("PatchForKafkaServiceDefUpdate_J10025 ");
+ }
+
+ private void updateKafkaServiceDef(){
+ RangerServiceDef ret = null;
+ RangerServiceDef embeddedKafkaServiceDef = null;
+ RangerServiceDef dbKafkaServiceDef = null;
+ List<RangerServiceDef.RangerResourceDef> embeddedKafkaResourceDefs = null;
+ List<RangerServiceDef.RangerAccessTypeDef> embeddedKafkaAccessTypes = null;
+ XXServiceDef xXServiceDefObj = null;
+ try{
+ embeddedKafkaServiceDef=EmbeddedServiceDefsUtil.instance().getEmbeddedServiceDef(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+ if(embeddedKafkaServiceDef!=null){
+
+ xXServiceDefObj = daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+ Map<String, String> serviceDefOptionsPreUpdate=null;
+ String jsonStrPreUpdate=null;
+ if(xXServiceDefObj!=null) {
+ jsonStrPreUpdate=xXServiceDefObj.getDefOptions();
+ serviceDefOptionsPreUpdate=jsonStringToMap(jsonStrPreUpdate);
+ xXServiceDefObj=null;
+ }
+ dbKafkaServiceDef=svcDBStore.getServiceDefByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+
+ if(dbKafkaServiceDef!=null){
+ embeddedKafkaResourceDefs = embeddedKafkaServiceDef.getResources();
+ embeddedKafkaAccessTypes = embeddedKafkaServiceDef.getAccessTypes();
+
+ if (checkNewKafkaresourcePresent(embeddedKafkaResourceDefs)) {
+ // This is to check if CLUSTER resource is added to the resource definition, if so update the resource def and accessType def
+ if (embeddedKafkaResourceDefs != null) {
+ dbKafkaServiceDef.setResources(embeddedKafkaResourceDefs);
+ }
+ if (embeddedKafkaAccessTypes != null) {
+ if(!embeddedKafkaAccessTypes.toString().equalsIgnoreCase(dbKafkaServiceDef.getAccessTypes().toString())) {
+ dbKafkaServiceDef.setAccessTypes(embeddedKafkaAccessTypes);
+ }
+ }
+ }
+
+ RangerServiceDefValidator validator = validatorFactory.getServiceDefValidator(svcStore);
+ validator.validate(dbKafkaServiceDef, Action.UPDATE);
+
+ ret = svcStore.updateServiceDef(dbKafkaServiceDef);
+ if(ret==null){
+ logger.error("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def");
+ throw new RuntimeException("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def");
+ }
+ xXServiceDefObj = daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+ if(xXServiceDefObj!=null) {
+ String jsonStrPostUpdate=xXServiceDefObj.getDefOptions();
+ Map<String, String> serviceDefOptionsPostUpdate=jsonStringToMap(jsonStrPostUpdate);
+ if (serviceDefOptionsPostUpdate != null && serviceDefOptionsPostUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES)) {
+ if(serviceDefOptionsPreUpdate == null || !serviceDefOptionsPreUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES)) {
+ String preUpdateValue = serviceDefOptionsPreUpdate == null ? null : serviceDefOptionsPreUpdate.get(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES);
+ if (preUpdateValue == null) {
+ serviceDefOptionsPostUpdate.remove(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES);
+ } else {
+ serviceDefOptionsPostUpdate.put(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES, preUpdateValue);
+ }
+ xXServiceDefObj.setDefOptions(mapToJsonString(serviceDefOptionsPostUpdate));
+ daoMgr.getXXServiceDef().update(xXServiceDefObj);
+ }
+ }
+ createDefaultPolicyForNewResources();
+ }
+ }
+ }
+ }catch(Exception e)
+ {
+ logger.error("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def", e);
+ }
+ }
+
+ private boolean checkNewKafkaresourcePresent(List<RangerServiceDef.RangerResourceDef> resourceDefs) {
+ boolean ret = false;
+ for(RangerServiceDef.RangerResourceDef resourceDef : resourceDefs) {
+ if (CLUSTER_RESOURCE_NAME.equals(resourceDef.getName()) ) {
+ ret = true ;
+ break;
+ }
+ }
+ return ret;
+ }
+
+ private String mapToJsonString(Map<String, String> map) {
+ String ret = null;
+ if(map != null) {
+ try {
+ ret = jsonUtil.readMapToString(map);
+ } catch(Exception excp) {
+ logger.warn("mapToJsonString() failed to convert map: " + map, excp);
+ }
+ }
+ return ret;
+ }
+
+ protected Map<String, String> jsonStringToMap(String jsonStr) {
+ Map<String, String> ret = null;
+ if(!StringUtils.isEmpty(jsonStr)) {
+ try {
+ ret = jsonUtil.jsonToMap(jsonStr);
+ } catch(Exception excp) {
+ // fallback to earlier format: "name1=value1;name2=value2"
+ for(String optionString : jsonStr.split(";")) {
+ if(StringUtils.isEmpty(optionString)) {
+ continue;
+ }
+ String[] nvArr = optionString.split("=");
+ String name = (nvArr != null && nvArr.length > 0) ? nvArr[0].trim() : null;
+ String value = (nvArr != null && nvArr.length > 1) ? nvArr[1].trim() : null;
+ if(StringUtils.isEmpty(name)) {
+ continue;
+ }
+ if(ret == null) {
+ ret = new HashMap<String, String>();
+ }
+ ret.put(name, value);
+ }
+ }
+ }
+ return ret;
+ }
+
+ private void createDefaultPolicyForNewResources() {
+ logger.info("==> createDefaultPolicyForNewResources ");
+ XXPortalUser xxPortalUser = daoMgr.getXXPortalUser().findByLoginId(LOGIN_ID_ADMIN);
+ Long currentUserId = xxPortalUser.getId();
+
+ XXServiceDef xXServiceDefObj = daoMgr.getXXServiceDef()
+ .findByName(EmbeddedServiceDefsUtil.EMBEDDED_SERVICEDEF_KAFKA_NAME);
+ if (xXServiceDefObj == null) {
+ logger.debug("ServiceDef not fount with name :" + EmbeddedServiceDefsUtil.EMBEDDED_SERVICEDEF_KAFKA_NAME);
+ return;
+ }
+
+ Long xServiceDefId = xXServiceDefObj.getId();
+ List<XXService> xxServices = daoMgr.getXXService().findByServiceDefId(xServiceDefId);
+
+ for (XXService xxService : xxServices) {
+ int resourceMapOrder = 0;
+ for (String newResource : POLICY_NAMES) {
+ XXPolicy xxPolicy = new XXPolicy();
+ xxPolicy.setName(newResource);
+ xxPolicy.setDescription(newResource);
+ xxPolicy.setService(xxService.getId());
+ xxPolicy.setPolicyPriority(RangerPolicy.POLICY_PRIORITY_NORMAL);
+ xxPolicy.setIsAuditEnabled(Boolean.TRUE);
+ xxPolicy.setIsEnabled(Boolean.TRUE);
+ xxPolicy.setPolicyType(RangerPolicy.POLICY_TYPE_ACCESS);
+ xxPolicy.setGuid(guidUtil.genGUID());
+ xxPolicy.setAddedByUserId(currentUserId);
+ xxPolicy.setUpdatedByUserId(currentUserId);
+ RangerPolicy rangerPolicy = new RangerPolicy();
+ RangerPolicyResourceSignature resourceSignature = new RangerPolicyResourceSignature(rangerPolicy);
+ xxPolicy.setResourceSignature(resourceSignature.getSignature());
+ XXPolicy createdPolicy = daoMgr.getXXPolicy().create(xxPolicy);
+
+ XXPolicyItem xxPolicyItem = new XXPolicyItem();
+ xxPolicyItem.setIsEnabled(Boolean.TRUE);
+ xxPolicyItem.setDelegateAdmin(Boolean.TRUE);
+ xxPolicyItem.setItemType(0);
+ xxPolicyItem.setOrder(0);
+ xxPolicyItem.setAddedByUserId(currentUserId);
+ xxPolicyItem.setUpdatedByUserId(currentUserId);
+ xxPolicyItem.setPolicyId(createdPolicy.getId());
+ XXPolicyItem createdXXPolicyItem = daoMgr.getXXPolicyItem().create(xxPolicyItem);
+
+ List<String> accessTypes = Arrays.asList("create", "delete", "configure", "alter_configs", "describe", "describe_configs", "consume", "publish", "kafka_admin","idempotent_write");
+ for (int i = 0; i < accessTypes.size(); i++) {
+ XXAccessTypeDef xAccTypeDef = daoMgr.getXXAccessTypeDef().findByNameAndServiceId(accessTypes.get(i),
+ xxPolicy.getService());
+ if (xAccTypeDef == null) {
+ throw new RuntimeException(accessTypes.get(i) + ": is not a valid access-type. policy='"
+ + xxPolicy.getName() + "' service='" + xxPolicy.getService() + "'");
+ }
+ XXPolicyItemAccess xPolItemAcc = new XXPolicyItemAccess();
+ xPolItemAcc.setIsAllowed(Boolean.TRUE);
+ xPolItemAcc.setType(xAccTypeDef.getId());
+ xPolItemAcc.setOrder(i);
+ xPolItemAcc.setAddedByUserId(currentUserId);
+ xPolItemAcc.setUpdatedByUserId(currentUserId);
+ xPolItemAcc.setPolicyitemid(createdXXPolicyItem.getId());
+ daoMgr.getXXPolicyItemAccess().create(xPolItemAcc);
+ }
+
+ for (int i = 0; i < DEFAULT_POLICY_USERS.size(); i++) {
+ String user = DEFAULT_POLICY_USERS.get(i);
+ if (StringUtils.isBlank(user)) {
+ continue;
+ }
+ XXUser xxUser = daoMgr.getXXUser().findByUserName(user);
+ if (xxUser == null) {
+ throw new RuntimeException(user + ": user does not exist. policy='" + xxPolicy.getName()
+ + "' service='" + xxPolicy.getService() + "' user='" + user + "'");
+ }
+ XXPolicyItemUserPerm xUserPerm = new XXPolicyItemUserPerm();
+ xUserPerm.setUserId(xxUser.getId());
+ xUserPerm.setPolicyItemId(createdXXPolicyItem.getId());
+ xUserPerm.setOrder(i);
+ xUserPerm.setAddedByUserId(currentUserId);
+ xUserPerm.setUpdatedByUserId(currentUserId);
+ daoMgr.getXXPolicyItemUserPerm().create(xUserPerm);
+ }
+
+ String policyResourceName = KAFKA_RESOURCE_CLUSTER;
+ if ("all - delegationtoken".equals(newResource)) {
+ policyResourceName = KAFKA_RESOURCE_DELEGATIONTOKEN;
+ }
+ XXResourceDef xResDef = daoMgr.getXXResourceDef().findByNameAndPolicyId(policyResourceName,
+ createdPolicy.getId());
+ if (xResDef == null) {
+ throw new RuntimeException(policyResourceName + ": is not a valid resource-type. policy='"
+ + createdPolicy.getName() + "' service='" + createdPolicy.getService() + "'");
+ }
+
+ XXPolicyResource xPolRes = new XXPolicyResource();
+
+ xPolRes.setAddedByUserId(currentUserId);
+ xPolRes.setUpdatedByUserId(currentUserId);
+ xPolRes.setIsExcludes(Boolean.FALSE);
+ xPolRes.setIsRecursive(Boolean.FALSE);
+ xPolRes.setPolicyId(createdPolicy.getId());
+ xPolRes.setResDefId(xResDef.getId());
+ xPolRes = daoMgr.getXXPolicyResource().create(xPolRes);
+
+ XXPolicyResourceMap xPolResMap = new XXPolicyResourceMap();
+ xPolResMap.setResourceId(xPolRes.getId());
+ xPolResMap.setValue("*");
+ xPolResMap.setOrder(resourceMapOrder);
+ xPolResMap.setAddedByUserId(currentUserId);
+ xPolResMap.setUpdatedByUserId(currentUserId);
+ daoMgr.getXXPolicyResourceMap().create(xPolResMap);
+ resourceMapOrder++;
+ logger.info("Creating policy for service id : " + xxService.getId());
+ }
+ }
+ logger.info("<== createDefaultPolicyForNewResources ");
+ }
+}
\ No newline at end of file
diff --git a/src/main/assembly/plugin-kafka.xml b/src/main/assembly/plugin-kafka.xml
index 97ff8ad..7c55128 100644
--- a/src/main/assembly/plugin-kafka.xml
+++ b/src/main/assembly/plugin-kafka.xml
@@ -62,7 +62,6 @@
</include>
<include>commons-lang:commons-lang</include>
<include>commons-io:commons-io</include>
- <include>com.google.guava:guava:jar:${google.guava.version}</include>
<include>org.apache.httpcomponents:httpclient:jar:${httpcomponents.httpclient.version}
</include>
<include>org.apache.httpcomponents:httpcore:jar:${httpcomponents.httpcore.version}