You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2020/10/28 15:57:54 UTC

[atlas] branch branch-2.0 updated: ATLAS-3864 : Break the dependency between Atlas and Kafka's Zookeeper

This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new b1e76ba  ATLAS-3864 : Break the dependency between Atlas and Kafka's Zookeeper
b1e76ba is described below

commit b1e76ba877b2d2cf0359dfdc2e34b3542b68067f
Author: Jayendra Parab <ja...@freestoneinfotech.com>
AuthorDate: Tue Sep 22 11:58:10 2020 +0530

    ATLAS-3864 : Break the dependency between Atlas and Kafka's Zookeeper
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
    (cherry picked from commit d4b15c7a2ab47d64a49070fab766a95497db4f0f)
---
 .../atlas/hive/bridge/HiveMetaStoreBridgeTest.java |   2 +-
 .../org/apache/atlas/kafka/bridge/KafkaBridge.java |  62 ++---
 common/pom.xml                                     |  15 +-
 .../java/org/apache/atlas/utils/KafkaUtils.java    | 279 +++++++++++++++++++++
 .../org/apache/atlas/utils/KafkaUtilsTest.java     | 215 ++++++++++++++++
 .../org/apache/atlas/hook/AtlasTopicCreator.java   |  53 +---
 .../org/apache/atlas/kafka/KafkaNotification.java  | 139 +---------
 .../apache/atlas/hook/AtlasTopicCreatorTest.java   | 245 +++---------------
 .../atlas/kafka/KafkaNotificationMockTest.java     | 196 ---------------
 .../store/graph/v2/BulkImportPercentTest.java      |   2 +-
 .../notification/NotificationHookConsumerTest.java |   2 +-
 .../atlas/web/service/CuratorFactoryTest.java      |   3 +-
 12 files changed, 587 insertions(+), 626 deletions(-)

diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
index af61ade..ae7ab1a 100644
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
@@ -310,7 +310,7 @@ public class HiveMetaStoreBridgeTest {
         return table;
     }
 
-    private class MatchesReferenceableProperty extends ArgumentMatcher<Object> {
+    private class MatchesReferenceableProperty implements ArgumentMatcher<Object> {
         private final String attrName;
         private final Object attrValue;
 
diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index 40b1fee..a13b029 100644
--- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -18,12 +18,7 @@
 
 package org.apache.atlas.kafka.bridge;
 
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.kafka.common.security.JaasUtils;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClientV2;
 import org.apache.atlas.kafka.model.KafkaDataTypes;
@@ -32,6 +27,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.atlas.utils.KafkaUtils;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -53,6 +49,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 
 public class KafkaBridge {
@@ -74,22 +71,17 @@ public class KafkaBridge {
     private static final String TOPIC                      = "topic";
 
     private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME       = "%s@%s";
-    private static final String ZOOKEEPER_CONNECT                       = "atlas.kafka.zookeeper.connect";
-    private static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS         = "atlas.kafka.zookeeper.connection.timeout.ms";
-    private static final String ZOOKEEPER_SESSION_TIMEOUT_MS            = "atlas.kafka.zookeeper.session.timeout.ms";
-    private static final String DEFAULT_ZOOKEEPER_CONNECT               = "localhost:2181";
-    private static final int    DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS    = 10 * 1000;
-    private static final int    DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10 * 1000;
 
     private final List<String>  availableTopics;
     private final String        metadataNamespace;
     private final AtlasClientV2 atlasClientV2;
-    private final ZkUtils       zkUtils;
+    private final KafkaUtils    kafkaUtils;
 
 
     public static void main(String[] args) {
         int exitCode = EXIT_CODE_FAILED;
         AtlasClientV2 atlasClientV2 = null;
+        KafkaBridge importer = null;
 
         try {
             Options options = new Options();
@@ -118,7 +110,7 @@ public class KafkaBridge {
                 atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls);
             }
 
-            KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2);
+            importer = new KafkaBridge(atlasConf, atlasClientV2);
 
             if (StringUtils.isNotEmpty(fileToImport)) {
                 File f = new File(fileToImport);
@@ -153,21 +145,25 @@ public class KafkaBridge {
             if (atlasClientV2 != null) {
                 atlasClientV2.close();
             }
+            if (importer != null) {
+                importer.close();
+            }
         }
 
         System.exit(exitCode);
     }
 
     public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception {
-        String   zookeeperConnect    = getZKConnection(atlasConf);
-        int      sessionTimeOutMs    = atlasConf.getInt(ZOOKEEPER_SESSION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS) ;
-        int      connectionTimeOutMs = atlasConf.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
-        ZkClient zkClient            = new ZkClient(zookeeperConnect, sessionTimeOutMs, connectionTimeOutMs, ZKStringSerializer$.MODULE$);
-
         this.atlasClientV2     = atlasClientV2;
         this.metadataNamespace = getMetadataNamespace(atlasConf);
-        this.zkUtils           = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled());
-        this.availableTopics   = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
+        this.kafkaUtils        = new KafkaUtils(atlasConf);
+        this.availableTopics   = kafkaUtils.listAllTopics();
+    }
+
+    public void close() {
+        if (this.kafkaUtils != null) {
+            this.kafkaUtils.close();
+        }
     }
 
     private String getMetadataNamespace(Configuration config) {
@@ -225,7 +221,7 @@ public class KafkaBridge {
     }
 
     @VisibleForTesting
-    AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) {
+    AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) throws Exception {
         final AtlasEntity ret;
 
         if (topicEntity == null) {
@@ -242,7 +238,12 @@ public class KafkaBridge {
         ret.setAttribute(NAME,topic);
         ret.setAttribute(DESCRIPTION_ATTR, topic);
         ret.setAttribute(URI, topic);
-        ret.setAttribute(PARTITION_COUNT, (Integer) zkUtils.getTopicPartitionCount(topic).get());
+        try {
+            ret.setAttribute(PARTITION_COUNT, kafkaUtils.getPartitionCount(topic));
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("Error while getting partition count for topic :" + topic, e);
+            throw new Exception("Error while getting partition count for topic :" + topic, e);
+        }
 
         return ret;
     }
@@ -351,21 +352,4 @@ public class KafkaBridge {
             entity.getRelationshipAttributes().clear();
         }
     }
-
-    private String getStringValue(String[] vals) {
-        String ret = null;
-        for(String val:vals) {
-            ret = (ret == null) ? val : ret + "," + val;
-        }
-        return  ret;
-    }
-
-    private String getZKConnection(Configuration atlasConf) {
-        String ret = null;
-        ret = getStringValue(atlasConf.getStringArray(ZOOKEEPER_CONNECT));
-        if (StringUtils.isEmpty(ret) ) {
-            ret = DEFAULT_ZOOKEEPER_CONNECT;
-        }
-        return ret;
-    }
 }
diff --git a/common/pom.xml b/common/pom.xml
index bb1157f..ca35b7f 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -81,7 +81,14 @@
 
         <dependency>
             <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
+            <artifactId>mockito-core</artifactId>
+            <version>3.5.10</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <version>3.5.10</version>
         </dependency>
 
         <dependency>
@@ -115,6 +122,12 @@
             <version>1.3.2</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
new file mode 100644
index 0000000..7a397b1
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.utils;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class KafkaUtils implements AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
+
+    static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
+    private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
+    private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName";
+    private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag";
+    private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required";
+    private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS = "optional|requisite|sufficient|required";
+    private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
+    private static final String JAAS_PRINCIPAL_PROP = "principal";
+    private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
+    private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
+
+    public static final String ATLAS_KAFKA_PROPERTY_PREFIX   = "atlas.kafka";
+
+    final protected Properties kafkaConfiguration;
+
+    final protected AdminClient adminClient;
+
+    public KafkaUtils(Configuration atlasConfiguration) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("==> KafkaUtils() ");
+        }
+        this.kafkaConfiguration = ApplicationProperties.getSubsetAsProperties(atlasConfiguration, ATLAS_KAFKA_PROPERTY_PREFIX);
+
+        setKafkaJAASProperties(atlasConfiguration, kafkaConfiguration);
+        adminClient = AdminClient.create(this.kafkaConfiguration);
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("<== KafkaUtils() ");
+        }
+    }
+
+    public void createTopics(List<String> topicNames, int numPartitions, int replicationFactor)
+            throws TopicExistsException, ExecutionException, InterruptedException {
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("==> createTopics() ");
+        }
+
+        List<NewTopic> newTopicList = topicNames.stream()
+                .map(topicName -> new NewTopic(topicName, numPartitions, (short) replicationFactor))
+                .collect(Collectors.toList());
+
+        CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopicList);
+        Map<String, KafkaFuture<Void>> futureMap = createTopicsResult.values();
+        for(Map.Entry<String, KafkaFuture<Void>> futureEntry : futureMap.entrySet()) {
+            String topicName = futureEntry.getKey();
+            KafkaFuture<Void> future = futureEntry.getValue();
+            future.get();
+        }
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("<== createTopics() ");
+        }
+    }
+
+    public List<String> listAllTopics() throws ExecutionException, InterruptedException {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("==> KafkaUtils.listAllTopics() ");
+        }
+        ListTopicsResult listTopicsResult = adminClient.listTopics();
+        List<String> topicNameList = new ArrayList<>(listTopicsResult.names().get());
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("<== KafkaUtils.listAllTopics() ");
+        }
+
+        return topicNameList;
+    }
+
+    public Integer getPartitionCount(String topicName) throws ExecutionException, InterruptedException {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("==> KafkaUtils.getPartitionCount({})", topicName);
+        }
+
+        Integer partitionCount = null;
+        DescribeTopicsResult describeTopicsResult =  adminClient.describeTopics(Collections.singleton(topicName));
+        Map<String, KafkaFuture<TopicDescription>> futureMap = describeTopicsResult.values();
+        for(Map.Entry<String, KafkaFuture<TopicDescription>> futureEntry : futureMap.entrySet()) {
+            KafkaFuture<TopicDescription> topicDescriptionFuture = futureEntry.getValue();
+            TopicDescription topicDescription = topicDescriptionFuture.get();
+            List<TopicPartitionInfo> partitionList = topicDescription.partitions();
+            partitionCount = partitionList.size();
+        }
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("<== KafkaUtils.getPartitionCount returning for topic {} with count {}", topicName, partitionCount);
+        }
+
+        return partitionCount;
+    }
+
+    public void close() {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("==> KafkaUtils.close()");
+        }
+        if(adminClient != null) {
+            adminClient.close();
+        }
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("<== KafkaUtils.close()");
+        }
+    }
+
+    public static void setKafkaJAASProperties(Configuration configuration, Properties kafkaProperties) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("==> KafkaUtils.setKafkaJAASProperties()");
+        }
+
+        if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
+            LOG.debug("JAAS config is already set, returning");
+            return;
+        }
+
+        Properties jaasConfig = ApplicationProperties.getSubsetAsProperties(configuration, JAAS_CONFIG_PREFIX_PARAM);
+        // JAAS Configuration is present then update set those properties in sasl.jaas.config
+        if(jaasConfig != null && !jaasConfig.isEmpty()) {
+            String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
+
+            // Required for backward compatability for Hive CLI
+            if (!isLoginKeytabBased() && isLoginTicketBased()) {
+                LOG.debug("Checking if ticketBased-KafkaClient is set");
+                // if ticketBased-KafkaClient property is not specified then use the default client name
+                String        ticketBasedConfigPrefix = JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
+                Configuration ticketBasedConfig       = configuration.subset(ticketBasedConfigPrefix);
+
+                if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
+                    LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
+
+                    jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
+                } else {
+                    LOG.info("UserGroupInformation.isLoginTicketBased is true, but no JAAS configuration found for client {}. Will use JAAS configuration of client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName);
+                }
+            }
+
+            String keyPrefix       = jaasClientName + ".";
+            String keyParam        = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
+            String loginModuleName = jaasConfig.getProperty(keyParam);
+
+            if (loginModuleName == null) {
+                LOG.error("Unable to add JAAS configuration for client [{}] as it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName, keyParam, jaasClientName);
+                return;
+            }
+
+            keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
+            String controlFlag = jaasConfig.getProperty(keyParam);
+
+            if(StringUtils.isEmpty(controlFlag)) {
+                String validValues = JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
+                controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
+                LOG.warn("Unknown JAAS configuration value for ({}) = [{}], valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag, validValues);
+            }
+            String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
+            String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
+            int optionPrefixLen = optionPrefix.length();
+            StringBuffer optionStringBuffer = new StringBuffer();
+            for (String key : jaasConfig.stringPropertyNames()) {
+                if (key.startsWith(optionPrefix)) {
+                    String optionVal = jaasConfig.getProperty(key);
+                    if (optionVal != null) {
+                        optionVal = optionVal.trim();
+
+                        try {
+                            if (key.equalsIgnoreCase(principalOptionKey)) {
+                                optionVal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionVal, (String) null);
+                            }
+                        } catch (IOException e) {
+                            LOG.warn("Failed to build serverPrincipal. Using provided value:[{}]", optionVal);
+                        }
+
+                        optionVal = surroundWithQuotes(optionVal);
+                        optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal));
+                    }
+                }
+            }
+
+            String newJaasProperty = String.format("%s %s %s ;", loginModuleName.trim(), controlFlag, optionStringBuffer.toString());
+            kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY, newJaasProperty);
+        }
+
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("<== KafkaUtils.setKafkaJAASProperties()");
+        }
+    }
+
+    static boolean isLoginKeytabBased() {
+        boolean ret = false;
+
+        try {
+            ret = UserGroupInformation.isLoginKeytabBased();
+        } catch (Exception excp) {
+            LOG.warn("Error in determining keytab for KafkaClient-JAAS config", excp);
+        }
+
+        return ret;
+    }
+
+    public static boolean isLoginTicketBased() {
+        boolean ret = false;
+
+        try {
+            ret = UserGroupInformation.isLoginTicketBased();
+        } catch (Exception excp) {
+            LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", excp);
+        }
+
+        return ret;
+    }
+
+    static String surroundWithQuotes(String optionVal) {
+        if(StringUtils.isEmpty(optionVal)) {
+            return optionVal;
+        }
+        String ret = optionVal;
+
+        // For property values which have special chars like "@" or "/", we need to enclose it in
+        // double quotes, so that Kafka can parse it
+        // If the property is already enclosed in double quotes, then do nothing.
+        if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') {
+            // If the string as special characters like except _,-
+            final String SPECIAL_CHAR_LIST = "/!@#%^&*";
+            if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
+                ret = String.format("\"%s\"", optionVal);
+            }
+        }
+
+        return ret;
+    }
+
+}
diff --git a/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
new file mode 100644
index 0000000..14739cd
--- /dev/null
+++ b/common/src/test/java/org/apache/atlas/utils/KafkaUtilsTest.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.utils;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class KafkaUtilsTest {
+
+    @Test
+    public void testSetKafkaJAASPropertiesForAllProperValues() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+        KafkaUtils.setKafkaJAASProperties(configuration, properties);
+        String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+        assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+        assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+        assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+        assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
+        assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesForMissingControlFlag() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+        KafkaUtils.setKafkaJAASProperties(configuration, properties);
+        String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+        assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+        assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+        assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+        assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
+        assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesForMissingLoginModuleName() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+        KafkaUtils.setKafkaJAASProperties(configuration, properties);
+        String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+
+        assertNull(newPropertyValue);
+
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesWithSpecialCharacters() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionKeyTabPath = "/path/to/file.keytab";
+        final String optionPrincipal = "test/_HOST@EXAMPLE.COM";
+
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.keyTabPath", optionKeyTabPath);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.principal", optionPrincipal);
+
+        try {
+            KafkaUtils.setKafkaJAASProperties(configuration, properties);
+            String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+            String updatedPrincipalValue = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionPrincipal, (String) null);
+
+            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("keyTabPath=\"" + optionKeyTabPath + "\""));
+            assertTrue(newPropertyValue.contains("principal=\""+ updatedPrincipalValue + "\""));
+
+        } catch (IOException e) {
+            fail("Failed while getting updated principal value with exception : " + e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesForTicketBasedLoginConfig() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        configuration.setProperty("atlas.kafka.bootstrap.servers", "localhost:9100");
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.serviceName",optionServiceName);
+
+        try (MockedStatic mockedKafkaUtilsClass = Mockito.mockStatic(KafkaUtils.class)) {
+            mockedKafkaUtilsClass.when(KafkaUtils::isLoginKeytabBased).thenReturn(false);
+            mockedKafkaUtilsClass.when(KafkaUtils::isLoginTicketBased).thenReturn(true);
+            mockedKafkaUtilsClass.when(() -> KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod();
+            mockedKafkaUtilsClass.when(() -> KafkaUtils.setKafkaJAASProperties(configuration, properties)).thenCallRealMethod();
+
+            KafkaUtils.setKafkaJAASProperties(configuration, properties);
+
+            String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag), "loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey=" + optionStoreKey), "storeKey not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+        }
+    }
+
+    @Test
+    public void testSetKafkaJAASPropertiesForTicketBasedLoginFallback() {
+        Properties properties = new Properties();
+        Configuration configuration = new PropertiesConfiguration();
+
+        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+        final String loginModuleControlFlag = "required";
+        final String optionUseKeyTab = "false";
+        final String optionStoreKey = "true";
+        final String optionServiceName = "kafka";
+
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
+        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
+        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
+
+        try (MockedStatic mockedKafkaUtilsClass = Mockito.mockStatic(KafkaUtils.class)) {
+            mockedKafkaUtilsClass.when(KafkaUtils::isLoginKeytabBased).thenReturn(false);
+            mockedKafkaUtilsClass.when(KafkaUtils::isLoginTicketBased).thenReturn(true);
+            mockedKafkaUtilsClass.when(() -> KafkaUtils.surroundWithQuotes(Mockito.anyString())).thenCallRealMethod();
+            mockedKafkaUtilsClass.when(() -> KafkaUtils.setKafkaJAASProperties(configuration, properties)).thenCallRealMethod();
+
+            KafkaUtils.setKafkaJAASProperties(configuration, properties);
+
+            String newPropertyValue = properties.getProperty(KafkaUtils.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
+            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
+            assertTrue(newPropertyValue.contains(loginModuleControlFlag), "loginModuleControlFlag not present in new property");
+            assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("storeKey=" + optionStoreKey), "storeKey not present in new property or value doesn't match");
+            assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
+        }
+    }
+
+
+}
+
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java b/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
index c695741..80a12ac 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
@@ -19,24 +19,19 @@
 package org.apache.atlas.hook;
 
 import com.google.common.annotations.VisibleForTesting;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.atlas.utils.KafkaUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
 import java.io.IOException;
-import java.util.Properties;
+import java.util.Arrays;
 
 /**
  * A class to create Kafka topics used by Atlas components.
@@ -66,20 +61,13 @@ public class AtlasTopicCreator {
             if (!handleSecurity(atlasProperties)) {
                 return;
             }
-            ZkUtils zkUtils = createZkUtils(atlasProperties);
-            for (String topicName : topicNames) {
-                try {
-                    LOG.warn("Attempting to create topic {}", topicName);
-                    if (!ifTopicExists(topicName, zkUtils)) {
-                        createTopic(atlasProperties, topicName, zkUtils);
-                    } else {
-                        LOG.warn("Ignoring call to create topic {}, as it already exists.", topicName);
-                    }
-                } catch (Throwable t) {
-                    LOG.error("Failed while creating topic {}", topicName, t);
-                }
+            try(KafkaUtils kafkaUtils = getKafkaUtils(atlasProperties)) {
+                int numPartitions = atlasProperties.getInt("atlas.notification.partitions", 1);
+                int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);
+                kafkaUtils.createTopics(Arrays.asList(topicNames), numPartitions, numReplicas);
+            } catch (Exception e) {
+                LOG.error("Error while creating topics e :" + e.getMessage(), e);
             }
-            zkUtils.close();
         } else {
             LOG.info("Not creating topics {} as {} is false", StringUtils.join(topicNames, ","),
                     ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
@@ -105,28 +93,9 @@ public class AtlasTopicCreator {
         return true;
     }
 
-    @VisibleForTesting
-    protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
-        return AdminUtils.topicExists(zkUtils, topicName);
-    }
-
-    @VisibleForTesting
-    protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
-        int numPartitions = atlasProperties.getInt("atlas.notification.hook.numthreads", 1);
-        int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);
-        AdminUtils.createTopic(zkUtils, topicName,  numPartitions, numReplicas,
-                new Properties(), RackAwareMode.Enforced$.MODULE$);
-        LOG.warn("Created topic {} with partitions {} and replicas {}", topicName, numPartitions, numReplicas);
-    }
-
-    @VisibleForTesting
-    protected ZkUtils createZkUtils(Configuration atlasProperties) {
-        String zkConnect = atlasProperties.getString("atlas.kafka.zookeeper.connect");
-        int sessionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.session.timeout.ms", 400);
-        int connectionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.connection.timeout.ms", 200);
-        Tuple2<ZkClient, ZkConnection> zkClientAndConnection = ZkUtils.createZkClientAndConnection(
-                zkConnect, sessionTimeout, connectionTimeout);
-        return new ZkUtils(zkClientAndConnection._1(), zkClientAndConnection._2(), false);
+    // This method is added to mock the creation of kafkaUtils object while writing the test cases
+    KafkaUtils getKafkaUtils(Configuration configuration) {
+        return new KafkaUtils(configuration);
     }
 
     public static void main(String[] args) throws AtlasException {
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 05fd977..3d1b3cc 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -25,10 +25,10 @@ import org.apache.atlas.notification.AbstractNotification;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.service.Service;
+import org.apache.atlas.utils.KafkaUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -42,7 +42,6 @@ import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
-import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.Future;
 
@@ -63,17 +62,6 @@ public class KafkaNotification extends AbstractNotification implements Service {
     public    static final String ATLAS_ENTITIES_TOPIC       = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString();
     protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id";
 
-    static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
-    private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
-    private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName";
-    private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag";
-    private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required";
-    private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS = "optional|requisite|sufficient|required";
-    private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
-    private static final String JAAS_PRINCIPAL_PROP = "principal";
-    private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
-    private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
-
     private   static final String[] ATLAS_HOOK_CONSUMER_TOPICS     = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC);
     private   static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
 
@@ -144,7 +132,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
         // if no value is specified for max.poll.records, set to 1
         properties.put("max.poll.records", kafkaConf.getInt("max.poll.records", 1));
 
-        setKafkaJAASProperties(applicationProperties, properties);
+        KafkaUtils.setKafkaJAASProperties(applicationProperties, properties);
 
         LOG.info("<== KafkaNotification()");
     }
@@ -414,127 +402,4 @@ public class KafkaNotification extends AbstractNotification implements Service {
         return ret;
     }
 
-    void setKafkaJAASProperties(Configuration configuration, Properties kafkaProperties) {
-        LOG.debug("==> KafkaNotification.setKafkaJAASProperties()");
-
-        if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
-            LOG.debug("JAAS config is already set, returning");
-            return;
-        }
-
-        Properties jaasConfig = ApplicationProperties.getSubsetAsProperties(configuration, JAAS_CONFIG_PREFIX_PARAM);
-        // JAAS Configuration is present then update set those properties in sasl.jaas.config
-        if(jaasConfig != null && !jaasConfig.isEmpty()) {
-            String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
-
-            // Required for backward compatability for Hive CLI
-            if (!isLoginKeytabBased() && isLoginTicketBased()) {
-                LOG.debug("Checking if ticketBased-KafkaClient is set");
-                // if ticketBased-KafkaClient property is not specified then use the default client name
-                String        ticketBasedConfigPrefix = JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
-                Configuration ticketBasedConfig       = configuration.subset(ticketBasedConfigPrefix);
-
-                if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
-                    LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
-
-                    jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
-                } else {
-                    LOG.info("UserGroupInformation.isLoginTicketBased is true, but no JAAS configuration found for client {}. Will use JAAS configuration of client {}", JAAS_TICKET_BASED_CLIENT_NAME, jaasClientName);
-                }
-            }
-
-            String keyPrefix       = jaasClientName + ".";
-            String keyParam        = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM;
-            String loginModuleName = jaasConfig.getProperty(keyParam);
-
-            if (loginModuleName == null) {
-                LOG.error("Unable to add JAAS configuration for client [{}] as it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName, keyParam, jaasClientName);
-                return;
-            }
-
-            keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
-            String controlFlag = jaasConfig.getProperty(keyParam);
-
-            if(StringUtils.isEmpty(controlFlag)) {
-                String validValues = JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
-                controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
-                LOG.warn("Unknown JAAS configuration value for ({}) = [{}], valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag, validValues);
-            }
-            String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
-            String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
-            int optionPrefixLen = optionPrefix.length();
-            StringBuffer optionStringBuffer = new StringBuffer();
-            for (String key : jaasConfig.stringPropertyNames()) {
-                if (key.startsWith(optionPrefix)) {
-                    String optionVal = jaasConfig.getProperty(key);
-                    if (optionVal != null) {
-                        optionVal = optionVal.trim();
-
-                        try {
-                            if (key.equalsIgnoreCase(principalOptionKey)) {
-                                optionVal = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionVal, (String) null);
-                            }
-                        } catch (IOException e) {
-                            LOG.warn("Failed to build serverPrincipal. Using provided value:[{}]", optionVal);
-                        }
-
-                        optionVal = surroundWithQuotes(optionVal);
-                        optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal));
-                    }
-                }
-            }
-
-            String newJaasProperty = String.format("%s %s %s ;", loginModuleName.trim(), controlFlag, optionStringBuffer.toString());
-            kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY, newJaasProperty);
-        }
-
-        LOG.debug("<== KafkaNotification.setKafkaJAASProperties()");
-    }
-
-    @VisibleForTesting
-    boolean isLoginKeytabBased() {
-        boolean ret = false;
-
-        try {
-            ret = UserGroupInformation.isLoginKeytabBased();
-        } catch (Exception excp) {
-            LOG.warn("Error in determining keytab for KafkaClient-JAAS config", excp);
-        }
-
-        return ret;
-    }
-
-    @VisibleForTesting
-    boolean isLoginTicketBased() {
-        boolean ret = false;
-
-        try {
-            ret = UserGroupInformation.isLoginTicketBased();
-        } catch (Exception excp) {
-            LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", excp);
-        }
-
-        return ret;
-    }
-
-    private static String surroundWithQuotes(String optionVal) {
-        if(StringUtils.isEmpty(optionVal)) {
-            return optionVal;
-        }
-        String ret = optionVal;
-
-        // For property values which have special chars like "@" or "/", we need to enclose it in
-        // double quotes, so that Kafka can parse it
-        // If the property is already enclosed in double quotes, then do nothing.
-        if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') {
-            // If the string as special characters like except _,-
-            final String SPECIAL_CHAR_LIST = "/!@#%^&*";
-            if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
-                ret = String.format("\"%s\"", optionVal);
-            }
-        }
-
-        return ret;
-    }
-
 }
diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
index 2937847..6d1a5b6 100644
--- a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
+++ b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
@@ -18,20 +18,21 @@
 
 package org.apache.atlas.hook;
 
-import kafka.utils.ZkUtils;
 import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.utils.KafkaUtils;
 import org.apache.commons.configuration.Configuration;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
 
 public class AtlasTopicCreatorTest {
 
@@ -44,179 +45,35 @@ public class AtlasTopicCreatorTest {
         Configuration configuration = mock(Configuration.class);
         when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)).
                 thenReturn(false);
-        when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
-        final boolean[] topicExistsCalled = new boolean[] {false};
-        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
-            @Override
-            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
-                topicExistsCalled[0] = true;
-                return false;
-            }
-        };
-        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
-        assertFalse(topicExistsCalled[0]);
-    }
-
-    @Test
-    public void shouldNotCreateTopicIfItAlreadyExists() {
-        Configuration configuration = mock(Configuration.class);
-        when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)).
-                thenReturn(true);
-        when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
-        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
-        final boolean[] topicExistsCalled = new boolean[]{false};
-        final boolean[] createTopicCalled = new boolean[]{false};
-
-        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
-            @Override
-            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
-                topicExistsCalled[0] = true;
-                return true;
-            }
-
-            @Override
-            protected ZkUtils createZkUtils(Configuration atlasProperties) {
-                return zookeeperUtils;
-            }
-
-            @Override
-            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
-                createTopicCalled[0] = true;
-            }
-        };
-        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
-        assertTrue(topicExistsCalled[0]);
-        assertFalse(createTopicCalled[0]);
-    }
-
-    @Test
-    public void shouldCreateTopicIfItDoesNotExist() {
-        Configuration configuration = mock(Configuration.class);
-        when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)).
-                thenReturn(true);
-        when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
-        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
-
-        final boolean[] createdTopic = new boolean[]{false};
-
-        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
-            @Override
-            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
-                return false;
-            }
-
-            @Override
-            protected ZkUtils createZkUtils(Configuration atlasProperties) {
-                return zookeeperUtils;
-            }
-
-            @Override
-            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
-                createdTopic[0] = true;
-            }
-        };
-        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
-        assertTrue(createdTopic[0]);
-    }
 
-    @Test
-    public void shouldNotFailIfExceptionOccursDuringCreatingTopic() {
-        Configuration configuration = mock(Configuration.class);
-        when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)).
-                thenReturn(true);
-        when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
-        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
-        final boolean[] createTopicCalled = new boolean[]{false};
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
+        AtlasTopicCreator spyAtlasTopicCreator = Mockito.spy(atlasTopicCreator);
+        spyAtlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
+        Mockito.verify(spyAtlasTopicCreator, times(0)).handleSecurity(configuration);
 
-        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
-            @Override
-            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
-                return false;
-            }
-
-            @Override
-            protected ZkUtils createZkUtils(Configuration atlasProperties) {
-                return zookeeperUtils;
-            }
-
-            @Override
-            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
-                createTopicCalled[0] = true;
-                throw new RuntimeException("Simulating failure during creating topic");
-            }
-        };
-        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
-        assertTrue(createTopicCalled[0]);
     }
 
     @Test
-    public void shouldCreateMultipleTopics() {
+    public void shouldCreateTopicIfConfiguredToDoSo() {
         Configuration configuration = mock(Configuration.class);
-        when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)).
-                thenReturn(true);
-        when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
-        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
 
-        final Map<String, Boolean> createdTopics = new HashMap<>();
-        createdTopics.put(ATLAS_HOOK_TOPIC, false);
-        createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
-
-        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
-
-            @Override
-            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
-                return false;
-            }
-
-            @Override
-            protected ZkUtils createZkUtils(Configuration atlasProperties) {
-                return zookeeperUtils;
-            }
-
-            @Override
-            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
-                createdTopics.put(topicName, true);
-            }
-        };
-        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC);
-        assertTrue(createdTopics.get(ATLAS_HOOK_TOPIC));
-        assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC));
-    }
-
-    @Test
-    public void shouldCreateTopicEvenIfEarlierOneFails() {
-        Configuration configuration = mock(Configuration.class);
+        KafkaUtils mockKafkaUtils = Mockito.mock(KafkaUtils.class);
         when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)).
                 thenReturn(true);
         when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
-        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
-
-        final Map<String, Boolean> createdTopics = new HashMap<>();
-        createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
 
-        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+        AtlasTopicCreator spyAtlasTopicCreator = Mockito.spy(atlasTopicCreator);
+        Mockito.doReturn(mockKafkaUtils).when(spyAtlasTopicCreator).getKafkaUtils(configuration);
 
-            @Override
-            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
-                return false;
-            }
+        spyAtlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
 
-            @Override
-            protected ZkUtils createZkUtils(Configuration atlasProperties) {
-                return zookeeperUtils;
-            }
+        try {
+            verify(mockKafkaUtils).createTopics(anyList(), anyInt(), anyInt());
+        } catch (ExecutionException | InterruptedException e) {
+            Assert.fail("Caught exception while verifying createTopics: " + e.getMessage());
+        }
 
-            @Override
-            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
-                if (topicName.equals(ATLAS_HOOK_TOPIC)) {
-                    throw new RuntimeException("Simulating failure when creating ATLAS_HOOK topic");
-                } else {
-                    createdTopics.put(topicName, true);
-                }
-            }
-        };
-        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC);
-        assertTrue(createdTopics.get(ATLAS_ENTITIES_TOPIC));
     }
 
     @Test
@@ -225,26 +82,17 @@ public class AtlasTopicCreatorTest {
         when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)).
                 thenReturn(true);
         when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
-        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+        KafkaUtils mockKafkaUtils = Mockito.mock(KafkaUtils.class);
 
-        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
-            @Override
-            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
-                return false;
-            }
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
 
-            @Override
-            protected ZkUtils createZkUtils(Configuration atlasProperties) {
-                return zookeeperUtils;
-            }
+        AtlasTopicCreator spyAtlasTopicCreator = Mockito.spy(atlasTopicCreator);
+        Mockito.doReturn(mockKafkaUtils).when(spyAtlasTopicCreator).getKafkaUtils(configuration);
 
-            @Override
-            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
-            }
-        };
-        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC);
+        spyAtlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC);
+
+        verify(mockKafkaUtils).close();
 
-        verify(zookeeperUtils, times(1)).close();
     }
 
     @Test
@@ -252,34 +100,19 @@ public class AtlasTopicCreatorTest {
         Configuration configuration = mock(Configuration.class);
         when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)).
                 thenReturn(true);
-        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
-        final Map<String, Boolean> createdTopics = new HashMap<>();
-        createdTopics.put(ATLAS_HOOK_TOPIC, false);
-        createdTopics.put(ATLAS_ENTITIES_TOPIC, false);
-
-        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
-            @Override
-            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
-                return false;
-            }
+        KafkaUtils mockKafkaUtils = Mockito.mock(KafkaUtils.class);
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
 
-            @Override
-            protected ZkUtils createZkUtils(Configuration atlasProperties) {
-                return zookeeperUtils;
-            }
+        AtlasTopicCreator spyAtlasTopicCreator = Mockito.spy(atlasTopicCreator);
+        Mockito.doReturn(mockKafkaUtils).when(spyAtlasTopicCreator).getKafkaUtils(configuration);
+        Mockito.doReturn(false).when(spyAtlasTopicCreator).handleSecurity(configuration);
 
-            @Override
-            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
-                createdTopics.put(topicName, true);
-            }
+        spyAtlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC);
 
-            @Override
-            protected boolean handleSecurity(Configuration atlasProperties) {
-                return false;
-            }
-        };
-        atlasTopicCreator.createAtlasTopic(configuration, ATLAS_HOOK_TOPIC, ATLAS_ENTITIES_TOPIC);
-        assertFalse(createdTopics.get(ATLAS_HOOK_TOPIC));
-        assertFalse(createdTopics.get(ATLAS_ENTITIES_TOPIC));
+        try {
+            verify(mockKafkaUtils, times(0)).createTopics(anyList(), anyInt(), anyInt());
+        } catch (ExecutionException | InterruptedException e) {
+            Assert.fail("Caught exception while verifying createTopics: " + e.getMessage());
+        }
     }
 }
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index 51c5a0d..24b6aa9 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -151,202 +151,6 @@ public class KafkaNotificationMockTest {
         }
     }
 
-    @Test
-    public void testSetKafkaJAASPropertiesForAllProperValues() {
-        Properties properties = new Properties();
-        Configuration configuration = new PropertiesConfiguration();
-
-        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
-        final String loginModuleControlFlag = "required";
-        final String optionUseKeyTab = "false";
-        final String optionStoreKey = "true";
-        final String optionServiceName = "kafka";
-
-        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
-        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
-
-        try {
-            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
-            kafkaNotification.setKafkaJAASProperties(configuration, properties);
-            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
-
-            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
-            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
-            assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
-            assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
-            assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
-        } catch (AtlasException e) {
-            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
-        }
-
-    }
-
-    @Test
-    public void testSetKafkaJAASPropertiesForMissingControlFlag() {
-        Properties properties = new Properties();
-        Configuration configuration = new PropertiesConfiguration();
-
-        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
-        final String loginModuleControlFlag = "required";
-        final String optionUseKeyTab = "false";
-        final String optionStoreKey = "true";
-        final String optionServiceName = "kafka";
-
-        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
-
-        try {
-            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
-            kafkaNotification.setKafkaJAASProperties(configuration, properties);
-            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
-
-            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
-            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
-            assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
-            assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
-            assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
-        } catch (AtlasException e) {
-            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
-        }
-
-    }
-
-    @Test
-    public void testSetKafkaJAASPropertiesForMissingLoginModuleName() {
-        Properties properties = new Properties();
-        Configuration configuration = new PropertiesConfiguration();
-
-        final String loginModuleControlFlag = "required";
-        final String optionUseKeyTab = "false";
-        final String optionStoreKey = "true";
-        final String optionServiceName = "kafka";
-
-        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
-
-        try {
-            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
-            kafkaNotification.setKafkaJAASProperties(configuration, properties);
-            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
-
-            assertNull(newPropertyValue);
-        } catch (AtlasException e) {
-            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
-        }
-
-    }
-
-    @Test
-    public void testSetKafkaJAASPropertiesWithSpecialCharacters() {
-        Properties properties = new Properties();
-        Configuration configuration = new PropertiesConfiguration();
-
-        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
-        final String loginModuleControlFlag = "required";
-        final String optionKeyTabPath = "/path/to/file.keytab";
-        final String optionPrincipal = "test/_HOST@EXAMPLE.COM";
-
-        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
-        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.keyTabPath", optionKeyTabPath);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.principal", optionPrincipal);
-
-        try {
-            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
-            kafkaNotification.setKafkaJAASProperties(configuration, properties);
-            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
-            String updatedPrincipalValue = org.apache.hadoop.security.SecurityUtil.getServerPrincipal(optionPrincipal, (String) null);
-
-            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
-            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
-            assertTrue(newPropertyValue.contains("keyTabPath=\"" + optionKeyTabPath + "\""));
-            assertTrue(newPropertyValue.contains("principal=\""+ updatedPrincipalValue + "\""));
-
-        } catch (AtlasException e) {
-            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
-        } catch (IOException e) {
-            fail("Failed while getting updated principal value with exception : " + e.getMessage());
-        }
-
-    }
-
-    @Test
-    public void testSetKafkaJAASPropertiesForTicketBasedLoginConfig() {
-        Properties properties = new Properties();
-        Configuration configuration = new PropertiesConfiguration();
-
-        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
-        final String loginModuleControlFlag = "required";
-        final String optionUseKeyTab = "false";
-        final String optionStoreKey = "true";
-        final String optionServiceName = "kafka";
-
-        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleName",loginModuleName);
-        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
-        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.useKeyTab", optionUseKeyTab);
-        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.storeKey", optionStoreKey);
-        configuration.setProperty("atlas.jaas.ticketBased-KafkaClient.option.serviceName",optionServiceName);
-
-        try {
-            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
-            KafkaNotification spyKafkaNotification = Mockito.spy(kafkaNotification);
-            when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
-            when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
-            spyKafkaNotification.setKafkaJAASProperties(configuration, properties);
-            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
-
-            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
-            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
-            assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
-            assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
-            assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
-        } catch (AtlasException e) {
-            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
-        }
-    }
-
-    @Test
-    public void testSetKafkaJAASPropertiesForTicketBasedLoginFallback() {
-        Properties properties = new Properties();
-        Configuration configuration = new PropertiesConfiguration();
-
-        final String loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
-        final String loginModuleControlFlag = "required";
-        final String optionUseKeyTab = "false";
-        final String optionStoreKey = "true";
-        final String optionServiceName = "kafka";
-
-        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleName",loginModuleName);
-        configuration.setProperty("atlas.jaas.KafkaClient.loginModuleControlFlag", loginModuleControlFlag);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.useKeyTab", optionUseKeyTab);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.storeKey", optionStoreKey);
-        configuration.setProperty("atlas.jaas.KafkaClient.option.serviceName",optionServiceName);
-
-        try {
-            KafkaNotification kafkaNotification = new KafkaNotification(configuration);
-            KafkaNotification spyKafkaNotification = Mockito.spy(kafkaNotification);
-            when(spyKafkaNotification.isLoginKeytabBased()).thenReturn(false);
-            when(spyKafkaNotification.isLoginTicketBased()).thenReturn(true);
-            spyKafkaNotification.setKafkaJAASProperties(configuration, properties);
-            String newPropertyValue = properties.getProperty(KafkaNotification.KAFKA_SASL_JAAS_CONFIG_PROPERTY);
-
-            assertTrue(newPropertyValue.contains(loginModuleName), "loginModuleName not present in new property");
-            assertTrue(newPropertyValue.contains(loginModuleControlFlag),"loginModuleControlFlag not present in new property");
-            assertTrue(newPropertyValue.contains("useKeyTab=" + optionUseKeyTab), "useKeyTab not present in new property or value doesn't match");
-            assertTrue(newPropertyValue.contains("storeKey="+ optionStoreKey), "storeKey not present in new property or value doesn't match");
-            assertTrue(newPropertyValue.contains("serviceName=" + optionServiceName), "serviceName not present in new property or value doesn't match");
-        } catch (AtlasException e) {
-            fail("Failed while creating KafkaNotification object with exception : " + e.getMessage());
-        }
-    }
-
     class TestKafkaNotification extends KafkaNotification {
 
         private final AtlasKafkaConsumer consumer1;
diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/BulkImportPercentTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/BulkImportPercentTest.java
index 1ae98ce..2d2775b 100644
--- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/BulkImportPercentTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/BulkImportPercentTest.java
@@ -55,7 +55,7 @@ public class BulkImportPercentTest {
                 percentHolder.add(d.intValue());
                 return null;
             }
-        }).when(log).info(anyString(), anyFloat(), anyInt(), anyString());
+        }).when(log).info(anyString(), anyInt(), anyLong(), anyString());
     }
 
     @Test
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 3774064..15a1900 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -140,7 +140,7 @@ public class NotificationHookConsumerTest {
 
         hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1));
 
-        verify(consumer).commit(any(TopicPartition.class), anyInt());
+        verify(consumer).commit(any(TopicPartition.class), anyLong());
     }
 
     @Test
diff --git a/webapp/src/test/java/org/apache/atlas/web/service/CuratorFactoryTest.java b/webapp/src/test/java/org/apache/atlas/web/service/CuratorFactoryTest.java
index 0e48509..385f250 100644
--- a/webapp/src/test/java/org/apache/atlas/web/service/CuratorFactoryTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/service/CuratorFactoryTest.java
@@ -82,8 +82,7 @@ public class CuratorFactoryTest {
         curatorFactory.enhanceBuilderWithSecurityParameters(zookeeperProperties, builder);
         verify(builder).aclProvider(argThat(new ArgumentMatcher<ACLProvider>() {
             @Override
-            public boolean matches(Object o) {
-                ACLProvider aclProvider = (ACLProvider) o;
+            public boolean matches(ACLProvider aclProvider) {
                 ACL acl = aclProvider.getDefaultAcl().get(0);
                 return acl.getId().getId().equals("myclient@EXAMPLE.COM")
                         && acl.getId().getScheme().equals("sasl");