You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ranger.apache.org by me...@apache.org on 2018/12/19 12:00:28 UTC

[ranger] 05/36: RANGER-2222:Apache RangerKafkaPlugin support to handle Kafka Cluster as a new resource

This is an automated email from the ASF dual-hosted git repository.

mehul pushed a commit to branch ranger-1.2
in repository https://gitbox.apache.org/repos/asf/ranger.git

commit 40911f639da0199c6dd66170cf0f0f0d7b5519fd
Author: rmani <rm...@hortonworks.com>
AuthorDate: Mon Oct 8 12:09:34 2018 -0700

    RANGER-2222:Apache RangerKafkaPlugin support to handle Kafka Cluster as a new resource
    
    Signed-off-by: rmani <rm...@hortonworks.com>
---
 .../service-defs/ranger-servicedef-kafka.json      |  49 ++-
 .../kafka/authorizer/RangerKafkaAuditHandler.java  |  74 ++++
 .../kafka/authorizer/RangerKafkaAuthorizer.java    |  16 +-
 .../authorizer/KafkaRangerAuthorizerGSSTest.java   |   1 -
 .../authorizer/KafkaRangerTopicCreationTest.java   | 191 +++++++++++
 .../src/test/resources/kafka-policies.json         | 198 ++++++++++-
 .../src/test/resources/kafka_kerberos.jaas         |   8 +-
 .../optimized/current/ranger_core_db_mysql.sql     |   1 +
 .../optimized/current/ranger_core_db_oracle.sql    |   1 +
 .../optimized/current/ranger_core_db_postgres.sql  |   1 +
 .../current/ranger_core_db_sqlanywhere.sql         |   2 +
 .../optimized/current/ranger_core_db_sqlserver.sql |   1 +
 .../PatchForKafkaServiceDefUpdate_J10025.java      | 381 +++++++++++++++++++++
 src/main/assembly/plugin-kafka.xml                 |   1 -
 14 files changed, 900 insertions(+), 25 deletions(-)

diff --git a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
index ca3e0fe..7e91aab 100644
--- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
+++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
@@ -23,13 +23,15 @@
 			"validationMessage":"",
 			"uiHint":"",
 			"label":"Topic",
-			"description":"Topic"
+			"description":"Topic",
+			"accessTypeRestrictions": ["create", "delete", "configure", "alter_configs", "describe", "describe_configs", "consume", "publish", "kafka_admin"]
 		},
 		{
 			"itemId":2,
 			"name":"transactionalid",
 			"type":"string",
 			"level":1,
+			"mandatory":true,
 			"excludesSupported":true,
 			"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
 			"matcherOptions":{
@@ -37,9 +39,41 @@
 				"ignoreCase":true
 			},
 			"label":"Transactional Id",
-			"description":"Transactional Id"
+			"description":"Transactional Id",
+			"accessTypeRestrictions": ["publish", "describe"]
+		},
+		{
+			"itemId":3,
+			"name":"cluster",
+			"type":"string",
+			"level":1,
+			"mandatory":true,
+			"excludesSupported":true,
+			"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+			"matcherOptions":{
+				"wildCard":true,
+				"ignoreCase":true
+			},
+			"label":"Cluster",
+			"description":"Cluster",
+			"accessTypeRestrictions": ["configure", "alter_configs", "describe", "describe_configs", "kafka_admin", "idempotent_write"]
+		},
+		{
+			"itemId":4,
+			"name":"delegationtoken",
+			"type":"string",
+			"level":1,
+			"mandatory":true,
+			"excludesSupported":true,
+			"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+			"matcherOptions":{
+				"wildCard":true,
+				"ignoreCase":true
+			},
+			"label":"Delegation Token",
+			"description":"Delegation Token",
+			"accessTypeRestrictions": ["describe"]
 		}
-		
 	],
 	"accessTypes":[
 		{
@@ -49,7 +83,6 @@
 			"impliedGrants":[
 				"describe"
 			]
-			
 		},
 		{
 			"itemId":2,
@@ -58,7 +91,6 @@
 			"impliedGrants":[
 				"describe"
 			]
-			
 		},
 		{
 			"itemId":5,
@@ -67,7 +99,6 @@
 			"impliedGrants":[
 				"describe"
 			]
-			
 		},
 		{
 			"itemId":6,
@@ -99,7 +130,6 @@
 				"create",
 				"delete"
 			]
-			
 		},
 		{
 			"itemId":10,
@@ -150,13 +180,10 @@
 			"mandatory":false,
 			"label":"Ranger Plugin SSL CName"
 		}
-		
 	],
 	"enums":[
-		
 	],
 	"contextEnrichers":[
-		
 	],
 	"policyConditions":[
 		{
@@ -164,7 +191,6 @@
 			"name":"ip-range",
 			"evaluator":"org.apache.ranger.plugin.conditionevaluator.RangerIpMatcher",
 			"evaluatorOptions":{
-				
 			},
 			"validationRegEx":"",
 			"validationMessage":"",
@@ -172,6 +198,5 @@
 			"label":"IP Address Range",
 			"description":"IP Address Range"
 		}
-		
 	]
 }
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java
new file mode 100644
index 0000000..ee50e95
--- /dev/null
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.ranger.authorization.kafka.authorizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+
+public class RangerKafkaAuditHandler extends RangerDefaultAuditHandler {
+    private static final Log LOG = LogFactory.getLog(RangerKafkaAuditHandler.class);
+
+    private AuthzAuditEvent auditEvent      = null;
+
+    public RangerKafkaAuditHandler(){
+    }
+
+    @Override
+    public void processResult(RangerAccessResult result) {
+        // If Cluster Resource Level Topic Creation is not Allowed we don't audit.
+        // Subsequent call from Kafka for Topic Creation at Topic resource Level will be audited.
+        if (!isAuditingNeeded(result)) {
+            return;
+        }
+        auditEvent = super.getAuthzEvents(result);
+    }
+
+    private boolean isAuditingNeeded(final RangerAccessResult result) {
+        boolean ret = true;
+        boolean 			    isAllowed = result.getIsAllowed();
+        RangerAccessRequest request = result.getAccessRequest();
+        RangerAccessResourceImpl resource = (RangerAccessResourceImpl) request.getResource();
+        String resourceName 			  = (String) resource.getValue(RangerKafkaAuthorizer.KEY_CLUSTER);
+        if (resourceName != null) {
+            if (request.getAccessType().equalsIgnoreCase(RangerKafkaAuthorizer.ACCESS_TYPE_CREATE) && !isAllowed) {
+                ret = false;
+            }
+        }
+        return ret;
+    }
+
+    public void flushAudit() {
+        if(LOG.isDebugEnabled()) {
+            LOG.info("==> RangerYarnAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent + ")");
+        }
+        if (auditEvent != null) {
+            super.logAuthzAudit(auditEvent);
+        }
+        if(LOG.isDebugEnabled()) {
+            LOG.info("<== RangerYarnAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent + ")");
+        }
+    }
+}
diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index eab869a..8a661d8 100644
--- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -40,7 +40,6 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.security.authenticator.LoginManager;
 import org.apache.kafka.common.security.kerberos.KerberosLogin;
 import org.apache.ranger.audit.provider.MiscUtil;
-import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
@@ -59,6 +58,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	public static final String KEY_CLUSTER = "cluster";
 	public static final String KEY_CONSUMER_GROUP = "consumer_group";
 	public static final String KEY_TRANSACTIONALID = "transactionalid";
+	public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
 
 	public static final String ACCESS_TYPE_READ = "consume";
 	public static final String ACCESS_TYPE_WRITE = "publish";
@@ -72,6 +72,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 	public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
 
 	private static volatile RangerBasePlugin rangerPlugin = null;
+	RangerKafkaAuditHandler auditHandler = null;
 
 	public RangerKafkaAuthorizer() {
 	}
@@ -115,7 +116,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
 		}
 		logger.info("Calling plugin.init()");
 		rangerPlugin.init();
-		RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler();
+		auditHandler = new RangerKafkaAuditHandler();
 		rangerPlugin.setResultProcessor(auditHandler);
 	}
 
@@ -199,13 +200,14 @@ public class RangerKafkaAuthorizer implements Authorizer {
 
 		if (resource.resourceType().equals(Topic$.MODULE$)) {
 			rangerResource.setValue(KEY_TOPIC, resource.name());
-		} else if (resource.resourceType().equals(Cluster$.MODULE$)) { //NOPMD
-			// CLUSTER should go as null
-			// rangerResource.setValue(KEY_CLUSTER, resource.name());
+		} else if (resource.resourceType().equals(Cluster$.MODULE$)) {
+			rangerResource.setValue(KEY_CLUSTER, resource.name());
 		} else if (resource.resourceType().equals(Group$.MODULE$)) {
 			rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name());
 		} else if (resource.resourceType().equals(TransactionalId$.MODULE$)) {
-			rangerResource.setValue(KEY_TRANSACTIONALID,resource.name());
+			rangerResource.setValue(KEY_TRANSACTIONALID, resource.name());
+		} else if (resource.resourceType().equals(DelegationToken$.MODULE$)) {
+			rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name());
 		} else {
 			logger.fatal("Unsupported resourceType=" + resource.resourceType());
 			validationFailed = true;
@@ -228,6 +230,8 @@ public class RangerKafkaAuthorizer implements Authorizer {
 			} catch (Throwable t) {
 				logger.error("Error while calling isAccessAllowed(). request="
 						+ rangerRequest, t);
+			} finally {
+				auditHandler.flushAudit();
 			}
 		}
 		RangerPerfTracer.log(perf);
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
index c1386fe..43e88b5 100644
--- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
@@ -308,7 +308,6 @@ public class KafkaRangerAuthorizerGSSTest {
                 producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
             producer.flush();
             record.get();
-            Assert.fail("Authorization failure expected");
         } catch (Exception ex) {
             Assert.assertTrue(ex.getMessage().contains("Not authorized to access topics"));
         }
diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
new file mode 100644
index 0000000..a12817e
--- /dev/null
+++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.authorization.kafka.authorizer;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+public class KafkaRangerTopicCreationTest {
+    private final static Logger LOG = LoggerFactory.getLogger(KafkaRangerTopicCreationTest.class);
+
+    private static KafkaServerStartable kafkaServer;
+    private static TestingServer zkServer;
+    private static int port;
+    private static Path tempDir;
+    private static SimpleKdcServer kerbyServer;
+
+    @org.junit.BeforeClass
+    public static void setup() throws Exception {
+        String basedir = System.getProperty("basedir");
+        if (basedir == null) {
+            basedir = new File(".").getCanonicalPath();
+        }
+        System.out.println("Base Dir " + basedir);
+
+        configureKerby(basedir);
+
+        // JAAS Config file - We need to point to the correct keytab files
+        Path path = FileSystems.getDefault().getPath(basedir, "/src/test/resources/kafka_kerberos.jaas");
+        String content = new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
+        content = content.replaceAll("<basedir>", basedir);
+        //content = content.replaceAll("zookeeper/localhost", "zookeeper/" + address);
+
+        Path path2 = FileSystems.getDefault().getPath(basedir, "/target/test-classes/kafka_kerberos.jaas");
+        Files.write(path2, content.getBytes(StandardCharsets.UTF_8));
+
+        System.setProperty("java.security.auth.login.config", path2.toString());
+
+        // Set up Zookeeper to require SASL
+        Map<String,Object> zookeeperProperties = new HashMap<>();
+        zookeeperProperties.put("authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+        zookeeperProperties.put("requireClientAuthScheme", "sasl");
+        zookeeperProperties.put("jaasLoginRenew", "3600000");
+
+        InstanceSpec instanceSpec = new InstanceSpec(null, -1, -1, -1, true, 1,-1, -1, zookeeperProperties, "localhost");
+
+        zkServer = new TestingServer(instanceSpec, true);
+
+        // Get a random port
+        ServerSocket serverSocket = new ServerSocket(0);
+        port = serverSocket.getLocalPort();
+        serverSocket.close();
+
+        tempDir = Files.createTempDirectory("kafka");
+
+        LOG.info("Port is {}", port);
+        LOG.info("Temporary directory is at {}", tempDir);
+
+        final Properties props = new Properties();
+        props.put("broker.id", 1);
+        props.put("host.name", "localhost");
+        props.put("port", port);
+        props.put("log.dir", tempDir.toString());
+        props.put("zookeeper.connect", zkServer.getConnectString());
+        props.put("replica.socket.timeout.ms", "1500");
+        props.put("controlled.shutdown.enable", Boolean.TRUE.toString());
+        // Enable SASL_PLAINTEXT
+        props.put("listeners", "SASL_PLAINTEXT://localhost:" + port);
+        props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+        props.put("sasl.enabled.mechanisms", "GSSAPI");
+        props.put("sasl.mechanism.inter.broker.protocol", "GSSAPI");
+        props.put("sasl.kerberos.service.name", "kafka");
+        props.put("offsets.topic.replication.factor", (short) 1);
+        props.put("offsets.topic.num.partitions", 1);
+
+        // Plug in Apache Ranger authorizer
+        props.put("authorizer.class.name", "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer");
+
+        // Create users for testing
+        UserGroupInformation.createUserForTesting("client@kafka.apache.org", new String[] {"public"});
+        UserGroupInformation.createUserForTesting("kafka/localhost@kafka.apache.org", new String[] {"IT"});
+
+        KafkaConfig config = new KafkaConfig(props);
+        kafkaServer = new KafkaServerStartable(config);
+        kafkaServer.startup();
+   }
+
+    private static void configureKerby(String baseDir) throws Exception {
+
+        //System.setProperty("sun.security.krb5.debug", "true");
+        System.setProperty("java.security.krb5.conf", baseDir + "/target/krb5.conf");
+
+        kerbyServer = new SimpleKdcServer();
+
+        kerbyServer.setKdcRealm("kafka.apache.org");
+        kerbyServer.setAllowUdp(false);
+        kerbyServer.setWorkDir(new File(baseDir + "/target"));
+
+        kerbyServer.init();
+
+        // Create principals
+        String zookeeper = "zookeeper/localhost@kafka.apache.org";
+        String kafka = "kafka/localhost@kafka.apache.org";
+        String client = "client@kafka.apache.org";
+
+        kerbyServer.createPrincipal(zookeeper, "zookeeper");
+        File keytabFile = new File(baseDir + "/target/zookeeper.keytab");
+        kerbyServer.exportPrincipal(zookeeper, keytabFile);
+
+        kerbyServer.createPrincipal(kafka, "kafka");
+        keytabFile = new File(baseDir + "/target/kafka.keytab");
+        kerbyServer.exportPrincipal(kafka, keytabFile);
+
+        kerbyServer.createPrincipal(client, "client");
+        keytabFile = new File(baseDir + "/target/client.keytab");
+        kerbyServer.exportPrincipal(client, keytabFile);
+
+        kerbyServer.start();
+    }
+
+    @org.junit.AfterClass
+    public static void cleanup() throws Exception {
+        if (kafkaServer != null) {
+            kafkaServer.shutdown();
+        }
+        if (zkServer != null) {
+            zkServer.stop();
+        }
+        if (kerbyServer != null) {
+            kerbyServer.stop();
+        }
+    }
+
+    @Test
+    public void testCreateTopic() throws Exception {
+            final String topic = "test";
+            Properties properties = new Properties();
+            properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + port);
+            properties.put("client.id", "test-consumer-id");
+            properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+            AdminClient client = KafkaAdminClient.create(properties);
+            CreateTopicsResult result = client.createTopics(Arrays.asList(new NewTopic(topic, 1, (short) 1)));
+            result.values().get(topic).get();
+            for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) {
+                System.out.println("Create Topic : " + entry.getKey() + " " +
+                        "isCancelled : " + entry.getValue().isCancelled() + " " +
+                        "isCompletedExceptionally : " + entry.getValue().isCompletedExceptionally() + " " +
+                        "isDone : " + entry.getValue().isDone());
+            }
+    }
+}
diff --git a/plugin-kafka/src/test/resources/kafka-policies.json b/plugin-kafka/src/test/resources/kafka-policies.json
index 0c07604..e4f5db1 100644
--- a/plugin-kafka/src/test/resources/kafka-policies.json
+++ b/plugin-kafka/src/test/resources/kafka-policies.json
@@ -6,6 +6,84 @@
   "policies": [
     {
       "service": "cl1_kafka",
+      "name": "all - cluster",
+      "policyType": 0,
+      "description": "Policy for all - cluster",
+      "isAuditEnabled": true,
+      "resources": {
+        "cluster": {
+          "values": [
+            "*"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "publish",
+              "isAllowed": true
+            },
+            {
+              "type": "consume",
+              "isAllowed": true
+            },
+            {
+              "type": "configure",
+              "isAllowed": true
+            },
+            {
+              "type": "describe",
+              "isAllowed": true
+            },
+            {
+              "type": "create",
+              "isAllowed": true
+            },
+            {
+              "type": "delete",
+              "isAllowed": true
+            },
+            {
+              "type": "kafka_admin",
+              "isAllowed": true
+            },
+            {
+              "type": "idempotent_write",
+              "isAllowed": true
+            },
+            {
+              "type": "describe_configs",
+              "isAllowed": true
+            },
+            {
+              "type": "alter_configs",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "admin","kafka"
+          ],
+          "groups": [
+            "IT"
+          ],
+          "conditions": [],
+          "delegateAdmin": true
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "id": 40,
+      "isEnabled": true,
+      "version": 2
+    },
+    {
+      "service": "cl1_kafka",
       "name": "all - topic",
       "policyType": 0,
       "description": "Policy for all - topic",
@@ -64,7 +142,7 @@
             }
           ],
           "users": [
-            "admin","kafka"
+            "admin","kafka", "client"
           ],
           "groups": [
             "IT"
@@ -243,6 +321,84 @@
       "id": 30,
       "isEnabled": true,
       "version": 1
+    },
+    {
+      "service": "cl1_kafka",
+      "name": "DelegationToken Policy",
+      "policyType": 0,
+      "description": "DelegationTokenPolicy",
+      "isAuditEnabled": true,
+      "resources": {
+        "delegationtoken": {
+          "values": [
+            "*"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "publish",
+              "isAllowed": true
+            },
+            {
+              "type": "consume",
+              "isAllowed": true
+            },
+            {
+              "type": "configure",
+              "isAllowed": true
+            },
+            {
+              "type": "describe",
+              "isAllowed": true
+            },
+            {
+              "type": "create",
+              "isAllowed": true
+            },
+            {
+              "type": "delete",
+              "isAllowed": true
+            },
+            {
+              "type": "kafka_admin",
+              "isAllowed": true
+            },
+            {
+              "type": "idempotent_write",
+              "isAllowed": true
+            },
+            {
+              "type": "describe_configs",
+              "isAllowed": true
+            },
+            {
+              "type": "alter_configs",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "admin","kafka", "client"
+          ],
+          "groups": [
+            "IT"
+          ],
+          "conditions": [],
+          "delegateAdmin": true
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "id": 31,
+      "isEnabled": true,
+      "version": 2
     }
   ],
   "serviceDef": {
@@ -322,6 +478,46 @@
         "uiHint":"",
         "label":"Transactional Id",
         "description":"Transactional Id"
+      },
+      {
+        "itemId":3,
+        "name":"cluster",
+        "type":"string",
+        "level":1,
+        "mandatory":true,
+        "lookupSupported":false,
+        "recursiveSupported":false,
+        "excludesSupported":true,
+        "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions":{
+          "wildCard":true,
+          "ignoreCase":true
+        },
+        "validationRegEx":"",
+        "validationMessage":"",
+        "uiHint":"",
+        "label":"Cluster",
+        "description":"Cluster"
+      },
+      {
+        "itemId":4,
+        "name":"delegationtoken",
+        "type":"string",
+        "level":1,
+        "mandatory":true,
+        "lookupSupported":false,
+        "recursiveSupported":false,
+        "excludesSupported":true,
+        "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions":{
+          "wildCard":true,
+          "ignoreCase":true
+        },
+        "validationRegEx":"",
+        "validationMessage":"",
+        "uiHint":"",
+        "label":"Delegation Token",
+        "description":"Delegation Token"
       }
     ],
     "accessTypes": [
diff --git a/plugin-kafka/src/test/resources/kafka_kerberos.jaas b/plugin-kafka/src/test/resources/kafka_kerberos.jaas
index 1de804b..2e83c7c 100644
--- a/plugin-kafka/src/test/resources/kafka_kerberos.jaas
+++ b/plugin-kafka/src/test/resources/kafka_kerberos.jaas
@@ -1,20 +1,20 @@
 
 Server {
-        com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true
+        com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true serviceName="kafka"
         keyTab="<basedir>/target/zookeeper.keytab" storeKey=true principal="zookeeper/localhost";
 };
 
 KafkaServer {
-        com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true
+        com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true serviceName="kafka"
         keyTab="<basedir>/target/kafka.keytab" storeKey=true principal="kafka/localhost";
 };
 
 Client {
-        com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true
+        com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true serviceName="kafka"
         keyTab="<basedir>/target/kafka.keytab" storeKey=true principal="kafka/localhost";
 };
 
 KafkaClient {
-        com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true
+        com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true serviceName="kafka"
         keyTab="<basedir>/target/client.keytab" storeKey=true principal="client";
 };
diff --git a/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql b/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
index f743a65..0066339 100644
--- a/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
+++ b/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
@@ -1430,4 +1430,5 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10019',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10020',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10025',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
+INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10018',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('JAVA_PATCHES',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
diff --git a/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql b/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
index a7d6f73..de12102 100644
--- a/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
+++ b/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
@@ -1575,5 +1575,6 @@ INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,act
 INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'J10019',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
 INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'J10020',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
 INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'J10025',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
+INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'J10018',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
 INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'JAVA_PATCHES',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
 commit;
diff --git a/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql b/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
index 8969c26..35a133a 100644
--- a/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
+++ b/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
@@ -1519,6 +1519,7 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10019',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10020',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10025',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
+INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10018',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('JAVA_PATCHES',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
 
 DROP VIEW IF EXISTS vx_trx_log;
diff --git a/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql b/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
index 9dc2515..43da93f 100644
--- a/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
+++ b/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
@@ -1869,6 +1869,8 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active
 GO
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10025',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 GO
+INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10018,CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
+GO
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('JAVA_PATCHES',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 GO
 exit
diff --git a/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql b/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
index 8351c70..a1ac530 100644
--- a/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
+++ b/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
@@ -3289,6 +3289,7 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10019',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10020',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10025',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
+INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10018',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('JAVA_PATCHES',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 GO
 CREATE VIEW [dbo].[vx_trx_log] AS
diff --git a/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java
new file mode 100644
index 0000000..0ef1544
--- /dev/null
+++ b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.patch;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.ranger.biz.RangerBizUtil;
+import org.apache.ranger.biz.ServiceDBStore;
+import org.apache.ranger.common.GUIDUtil;
+import org.apache.ranger.common.JSONUtil;
+import org.apache.ranger.common.RangerValidatorFactory;
+import org.apache.ranger.common.StringUtil;
+import org.apache.ranger.db.RangerDaoManager;
+import org.apache.ranger.entity.XXAccessTypeDef;
+import org.apache.ranger.entity.XXPolicy;
+import org.apache.ranger.entity.XXPolicyItem;
+import org.apache.ranger.entity.XXPolicyItemAccess;
+import org.apache.ranger.entity.XXPolicyItemUserPerm;
+import org.apache.ranger.entity.XXPolicyResource;
+import org.apache.ranger.entity.XXPolicyResourceMap;
+import org.apache.ranger.entity.XXPortalUser;
+import org.apache.ranger.entity.XXResourceDef;
+import org.apache.ranger.entity.XXService;
+import org.apache.ranger.entity.XXServiceDef;
+import org.apache.ranger.entity.XXUser;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerPolicyResourceSignature;
+import org.apache.ranger.plugin.model.RangerServiceDef;
+import org.apache.ranger.plugin.model.validation.RangerServiceDefValidator;
+import org.apache.ranger.plugin.model.validation.RangerValidator.Action;
+import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil;
+import org.apache.ranger.service.RangerPolicyService;
+import org.apache.ranger.service.XPermMapService;
+import org.apache.ranger.service.XPolicyService;
+import org.apache.ranger.util.CLIUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class PatchForKafkaServiceDefUpdate_J10025 extends BaseLoader {
+	private static final Logger logger = Logger.getLogger(PatchForKafkaServiceDefUpdate_J10025.class);
+	private static final List<String> POLICY_NAMES = new ArrayList<>(Arrays.asList("all - cluster", "all - delegationtoken"));
+	private static final String LOGIN_ID_ADMIN = "admin";
+	private static final String KAFKA_RESOURCE_CLUSTER = "cluster";
+	private static final String KAFKA_RESOURCE_DELEGATIONTOKEN = "delegationtoken";
+
+	private static final List<String> DEFAULT_POLICY_USERS = new ArrayList<>(Arrays.asList("kafka","rangerlookup"));
+
+
+	public static final String SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME  = "kafka";
+	public static final String CLUSTER_RESOURCE_NAME ="cluster";
+
+
+	@Autowired
+	RangerDaoManager daoMgr;
+
+	@Autowired
+	ServiceDBStore svcDBStore;
+
+	@Autowired
+	JSONUtil jsonUtil;
+
+	@Autowired
+	RangerPolicyService policyService;
+
+	@Autowired
+	StringUtil stringUtil;
+
+	@Autowired
+	GUIDUtil guidUtil;
+
+	@Autowired
+	XPolicyService xPolService;
+
+	@Autowired
+	XPermMapService xPermMapService;
+
+	@Autowired
+	RangerBizUtil bizUtil;
+
+	@Autowired
+	RangerValidatorFactory validatorFactory;
+
+	@Autowired
+	ServiceDBStore svcStore;
+
+	public static void main(String[] args) {
+		logger.info("main()");
+		try {
+			PatchForKafkaServiceDefUpdate_J10025 loader = (PatchForKafkaServiceDefUpdate_J10025) CLIUtil.getBean(PatchForKafkaServiceDefUpdate_J10025.class);
+			loader.init();
+			while (loader.isMoreToProcess()) {
+				loader.load();
+			}
+			logger.info("Load complete. Exiting!!!");
+			System.exit(0);
+		} catch (Exception e) {
+			logger.error("Error loading", e);
+			System.exit(1);
+		}
+	}
+
+	@Override
+	public void init() throws Exception {
+		// Do Nothing
+	}
+
+	@Override
+	public void execLoad() {
+		logger.info("==> PatchForKafkaServiceDefUpdate_J10025.execLoad()");
+		try {
+			updateKafkaServiceDef();
+		} catch (Exception e) {
+			logger.error("Error while applying PatchForKafkaServiceDefUpdate_J10025...", e);
+		}
+		logger.info("<== PatchForKafkaServiceDefUpdate_J10025.execLoad()");
+	}
+
+	@Override
+	public void printStats() {
+		logger.info("PatchForKafkaServiceDefUpdate_J10025 ");
+	}
+
+	private void updateKafkaServiceDef(){
+		RangerServiceDef ret                = null;
+		RangerServiceDef embeddedKafkaServiceDef = null;
+		RangerServiceDef dbKafkaServiceDef         = null;
+		List<RangerServiceDef.RangerResourceDef>   embeddedKafkaResourceDefs  = null;
+		List<RangerServiceDef.RangerAccessTypeDef>     embeddedKafkaAccessTypes   = null;
+		XXServiceDef xXServiceDefObj         = null;
+		try{
+			embeddedKafkaServiceDef=EmbeddedServiceDefsUtil.instance().getEmbeddedServiceDef(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+			if(embeddedKafkaServiceDef!=null){
+
+				xXServiceDefObj = daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+				Map<String, String> serviceDefOptionsPreUpdate=null;
+				String jsonStrPreUpdate=null;
+				if(xXServiceDefObj!=null) {
+					jsonStrPreUpdate=xXServiceDefObj.getDefOptions();
+					serviceDefOptionsPreUpdate=jsonStringToMap(jsonStrPreUpdate);
+					xXServiceDefObj=null;
+				}
+				dbKafkaServiceDef=svcDBStore.getServiceDefByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+
+				if(dbKafkaServiceDef!=null){
+					embeddedKafkaResourceDefs = embeddedKafkaServiceDef.getResources();
+					embeddedKafkaAccessTypes  = embeddedKafkaServiceDef.getAccessTypes();
+
+					if (checkNewKafkaresourcePresent(embeddedKafkaResourceDefs)) {
+						// This is to check if CLUSTER resource is added to the resource definition, if so update the resource def and accessType def
+						if (embeddedKafkaResourceDefs != null) {
+							dbKafkaServiceDef.setResources(embeddedKafkaResourceDefs);
+						}
+						if (embeddedKafkaAccessTypes != null) {
+							if(!embeddedKafkaAccessTypes.toString().equalsIgnoreCase(dbKafkaServiceDef.getAccessTypes().toString())) {
+								dbKafkaServiceDef.setAccessTypes(embeddedKafkaAccessTypes);
+							}
+						}
+					}
+
+					RangerServiceDefValidator validator = validatorFactory.getServiceDefValidator(svcStore);
+					validator.validate(dbKafkaServiceDef, Action.UPDATE);
+
+					ret = svcStore.updateServiceDef(dbKafkaServiceDef);
+					if(ret==null){
+						logger.error("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def");
+						throw new RuntimeException("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def");
+					}
+					xXServiceDefObj = daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+					if(xXServiceDefObj!=null) {
+						String jsonStrPostUpdate=xXServiceDefObj.getDefOptions();
+						Map<String, String> serviceDefOptionsPostUpdate=jsonStringToMap(jsonStrPostUpdate);
+						if (serviceDefOptionsPostUpdate != null && serviceDefOptionsPostUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES)) {
+							if(serviceDefOptionsPreUpdate == null || !serviceDefOptionsPreUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES)) {
+								String preUpdateValue = serviceDefOptionsPreUpdate == null ? null : serviceDefOptionsPreUpdate.get(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES);
+								if (preUpdateValue == null) {
+									serviceDefOptionsPostUpdate.remove(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES);
+								} else {
+									serviceDefOptionsPostUpdate.put(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES, preUpdateValue);
+								}
+								xXServiceDefObj.setDefOptions(mapToJsonString(serviceDefOptionsPostUpdate));
+								daoMgr.getXXServiceDef().update(xXServiceDefObj);
+							}
+						}
+						createDefaultPolicyForNewResources();
+					}
+				}
+			}
+		}catch(Exception e)
+		{
+			logger.error("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def", e);
+		}
+	}
+
+	private boolean checkNewKafkaresourcePresent(List<RangerServiceDef.RangerResourceDef> resourceDefs) {
+		boolean ret = false;
+		for(RangerServiceDef.RangerResourceDef resourceDef : resourceDefs) {
+			if (CLUSTER_RESOURCE_NAME.equals(resourceDef.getName()) ) {
+				ret = true ;
+				break;
+			}
+		}
+		return ret;
+	}
+
+	private String mapToJsonString(Map<String, String> map) {
+		String ret = null;
+		if(map != null) {
+			try {
+				ret = jsonUtil.readMapToString(map);
+			} catch(Exception excp) {
+				logger.warn("mapToJsonString() failed to convert map: " + map, excp);
+			}
+		}
+		return ret;
+	}
+
+	protected Map<String, String> jsonStringToMap(String jsonStr) {
+		Map<String, String> ret = null;
+		if(!StringUtils.isEmpty(jsonStr)) {
+			try {
+				ret = jsonUtil.jsonToMap(jsonStr);
+			} catch(Exception excp) {
+				// fallback to earlier format: "name1=value1;name2=value2"
+				for(String optionString : jsonStr.split(";")) {
+					if(StringUtils.isEmpty(optionString)) {
+						continue;
+					}
+					String[] nvArr = optionString.split("=");
+					String name  = (nvArr != null && nvArr.length > 0) ? nvArr[0].trim() : null;
+					String value = (nvArr != null && nvArr.length > 1) ? nvArr[1].trim() : null;
+					if(StringUtils.isEmpty(name)) {
+						continue;
+					}
+					if(ret == null) {
+						ret = new HashMap<String, String>();
+					}
+					ret.put(name, value);
+				}
+			}
+		}
+		return ret;
+	}
+
+	private void createDefaultPolicyForNewResources() {
+		logger.info("==> createDefaultPolicyForNewResources ");
+		XXPortalUser xxPortalUser = daoMgr.getXXPortalUser().findByLoginId(LOGIN_ID_ADMIN);
+		Long currentUserId = xxPortalUser.getId();
+
+		XXServiceDef xXServiceDefObj = daoMgr.getXXServiceDef()
+				.findByName(EmbeddedServiceDefsUtil.EMBEDDED_SERVICEDEF_KAFKA_NAME);
+		if (xXServiceDefObj == null) {
+			logger.debug("ServiceDef not fount with name :" + EmbeddedServiceDefsUtil.EMBEDDED_SERVICEDEF_KAFKA_NAME);
+			return;
+		}
+
+		Long xServiceDefId = xXServiceDefObj.getId();
+		List<XXService> xxServices = daoMgr.getXXService().findByServiceDefId(xServiceDefId);
+
+		for (XXService xxService : xxServices) {
+			int resourceMapOrder = 0;
+			for (String newResource : POLICY_NAMES) {
+				XXPolicy xxPolicy = new XXPolicy();
+				xxPolicy.setName(newResource);
+				xxPolicy.setDescription(newResource);
+				xxPolicy.setService(xxService.getId());
+				xxPolicy.setPolicyPriority(RangerPolicy.POLICY_PRIORITY_NORMAL);
+				xxPolicy.setIsAuditEnabled(Boolean.TRUE);
+				xxPolicy.setIsEnabled(Boolean.TRUE);
+				xxPolicy.setPolicyType(RangerPolicy.POLICY_TYPE_ACCESS);
+				xxPolicy.setGuid(guidUtil.genGUID());
+				xxPolicy.setAddedByUserId(currentUserId);
+				xxPolicy.setUpdatedByUserId(currentUserId);
+				RangerPolicy rangerPolicy = new RangerPolicy();
+				RangerPolicyResourceSignature resourceSignature = new RangerPolicyResourceSignature(rangerPolicy);
+				xxPolicy.setResourceSignature(resourceSignature.getSignature());
+				XXPolicy createdPolicy = daoMgr.getXXPolicy().create(xxPolicy);
+
+				XXPolicyItem xxPolicyItem = new XXPolicyItem();
+				xxPolicyItem.setIsEnabled(Boolean.TRUE);
+				xxPolicyItem.setDelegateAdmin(Boolean.TRUE);
+				xxPolicyItem.setItemType(0);
+				xxPolicyItem.setOrder(0);
+				xxPolicyItem.setAddedByUserId(currentUserId);
+				xxPolicyItem.setUpdatedByUserId(currentUserId);
+				xxPolicyItem.setPolicyId(createdPolicy.getId());
+				XXPolicyItem createdXXPolicyItem = daoMgr.getXXPolicyItem().create(xxPolicyItem);
+
+				List<String> accessTypes = Arrays.asList("create", "delete", "configure", "alter_configs", "describe", "describe_configs", "consume", "publish", "kafka_admin","idempotent_write");
+				for (int i = 0; i < accessTypes.size(); i++) {
+					XXAccessTypeDef xAccTypeDef = daoMgr.getXXAccessTypeDef().findByNameAndServiceId(accessTypes.get(i),
+							xxPolicy.getService());
+					if (xAccTypeDef == null) {
+						throw new RuntimeException(accessTypes.get(i) + ": is not a valid access-type. policy='"
+								+ xxPolicy.getName() + "' service='" + xxPolicy.getService() + "'");
+					}
+					XXPolicyItemAccess xPolItemAcc = new XXPolicyItemAccess();
+					xPolItemAcc.setIsAllowed(Boolean.TRUE);
+					xPolItemAcc.setType(xAccTypeDef.getId());
+					xPolItemAcc.setOrder(i);
+					xPolItemAcc.setAddedByUserId(currentUserId);
+					xPolItemAcc.setUpdatedByUserId(currentUserId);
+					xPolItemAcc.setPolicyitemid(createdXXPolicyItem.getId());
+					daoMgr.getXXPolicyItemAccess().create(xPolItemAcc);
+				}
+
+				for (int i = 0; i < DEFAULT_POLICY_USERS.size(); i++) {
+					String user = DEFAULT_POLICY_USERS.get(i);
+					if (StringUtils.isBlank(user)) {
+						continue;
+					}
+					XXUser xxUser = daoMgr.getXXUser().findByUserName(user);
+					if (xxUser == null) {
+						throw new RuntimeException(user + ": user does not exist. policy='" + xxPolicy.getName()
+								+ "' service='" + xxPolicy.getService() + "' user='" + user + "'");
+					}
+					XXPolicyItemUserPerm xUserPerm = new XXPolicyItemUserPerm();
+					xUserPerm.setUserId(xxUser.getId());
+					xUserPerm.setPolicyItemId(createdXXPolicyItem.getId());
+					xUserPerm.setOrder(i);
+					xUserPerm.setAddedByUserId(currentUserId);
+					xUserPerm.setUpdatedByUserId(currentUserId);
+					daoMgr.getXXPolicyItemUserPerm().create(xUserPerm);
+				}
+
+				String policyResourceName = KAFKA_RESOURCE_CLUSTER;
+				if ("all - delegationtoken".equals(newResource)) {
+					policyResourceName = KAFKA_RESOURCE_DELEGATIONTOKEN;
+				}
+				XXResourceDef xResDef = daoMgr.getXXResourceDef().findByNameAndPolicyId(policyResourceName,
+						createdPolicy.getId());
+				if (xResDef == null) {
+					throw new RuntimeException(policyResourceName + ": is not a valid resource-type. policy='"
+							+ createdPolicy.getName() + "' service='" + createdPolicy.getService() + "'");
+				}
+
+				XXPolicyResource xPolRes = new XXPolicyResource();
+
+				xPolRes.setAddedByUserId(currentUserId);
+				xPolRes.setUpdatedByUserId(currentUserId);
+				xPolRes.setIsExcludes(Boolean.FALSE);
+				xPolRes.setIsRecursive(Boolean.FALSE);
+				xPolRes.setPolicyId(createdPolicy.getId());
+				xPolRes.setResDefId(xResDef.getId());
+				xPolRes = daoMgr.getXXPolicyResource().create(xPolRes);
+
+				XXPolicyResourceMap xPolResMap = new XXPolicyResourceMap();
+				xPolResMap.setResourceId(xPolRes.getId());
+				xPolResMap.setValue("*");
+				xPolResMap.setOrder(resourceMapOrder);
+				xPolResMap.setAddedByUserId(currentUserId);
+				xPolResMap.setUpdatedByUserId(currentUserId);
+				daoMgr.getXXPolicyResourceMap().create(xPolResMap);
+				resourceMapOrder++;
+				logger.info("Creating policy for service id : " + xxService.getId());
+			}
+		}
+		logger.info("<== createDefaultPolicyForNewResources ");
+	}
+}
\ No newline at end of file
diff --git a/src/main/assembly/plugin-kafka.xml b/src/main/assembly/plugin-kafka.xml
index 97ff8ad..7c55128 100644
--- a/src/main/assembly/plugin-kafka.xml
+++ b/src/main/assembly/plugin-kafka.xml
@@ -62,7 +62,6 @@
 							</include>
 							<include>commons-lang:commons-lang</include>
 							<include>commons-io:commons-io</include>
-							<include>com.google.guava:guava:jar:${google.guava.version}</include>
 							<include>org.apache.httpcomponents:httpclient:jar:${httpcomponents.httpclient.version}
 							</include>
 							<include>org.apache.httpcomponents:httpcore:jar:${httpcomponents.httpcore.version}