You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2020/12/16 06:16:36 UTC

[atlas] branch branch-2.0 updated (6076f02 -> 66ed1b6)

This is an automated email from the ASF dual-hosted git repository.

nixon pushed a change to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git.


    from 6076f02  ATLAS-4082: Added more attributes like create or last modify time, contentLength etc. to ADLS Gen2 types
     new afb6b2b  ATLAS-4063: EmbeddedKafkaServer simplification
     new f9d4b22  ATLAS-3864: Follow up change, removed remaining usage of zkclient even from poms
     new 66ed1b6  Atlas-4078 master:UI - Lineage tab not shown on UI for entity with type Dataset or Process

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 addons/kafka-bridge/pom.xml                        |  19 ---
 .../org/apache/atlas/kafka/bridge/KafkaBridge.java |  21 +--
 .../apache/atlas/kafka/bridge/KafkaBridgeTest.java | 170 +++++++++++----------
 .../js/views/detail_page/DetailPageLayoutView.js   |   6 +-
 .../js/views/detail_page/DetailPageLayoutView.js   |   8 +-
 notification/pom.xml                               |  10 --
 .../apache/atlas/kafka/EmbeddedKafkaServer.java    |  37 +----
 pom.xml                                            |   7 -
 8 files changed, 112 insertions(+), 166 deletions(-)


[atlas] 01/03: ATLAS-4063: EmbeddedKafkaServer simplification

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit afb6b2b6292cdaf81b3468737247dcce4074e8b2
Author: Andras Katona <ak...@cloudera.com>
AuthorDate: Mon Dec 7 09:25:42 2020 +0100

    ATLAS-4063: EmbeddedKafkaServer simplification
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
    (cherry picked from commit 1907f6408c4c499a0cc7b4bf016571a4b752a7f8)
---
 .../apache/atlas/kafka/EmbeddedKafkaServer.java    | 37 ++--------------------
 1 file changed, 2 insertions(+), 35 deletions(-)

diff --git a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
index 235b7ce..b793b9a 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/EmbeddedKafkaServer.java
@@ -17,7 +17,6 @@
  */
 package org.apache.atlas.kafka;
 
-import kafka.metrics.KafkaMetricsReporter;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import org.apache.atlas.ApplicationProperties;
@@ -35,7 +34,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
 import scala.Option;
-import scala.collection.mutable.Buffer;
+import scala.collection.mutable.ArrayBuffer;
 
 import javax.inject.Inject;
 import java.io.File;
@@ -45,7 +44,6 @@ import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.*;
-import java.util.concurrent.TimeUnit;
 
 
 @Component
@@ -138,10 +136,7 @@ public class EmbeddedKafkaServer implements Service {
         brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
         brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
 
-        List<KafkaMetricsReporter>   metrics          = new ArrayList<>();
-        Buffer<KafkaMetricsReporter> metricsReporters = scala.collection.JavaConversions.asScalaBuffer(metrics);
-
-        kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), new SystemTime(), Option.apply(this.getClass().getName()), metricsReporters);
+        kafkaServer = new KafkaServer(KafkaConfig.fromProps(brokerConfig), Time.SYSTEM, Option.apply(this.getClass().getName()), new ArrayBuffer<>());
 
         kafkaServer.startup();
 
@@ -165,32 +160,4 @@ public class EmbeddedKafkaServer implements Service {
             return new URL("http://" + url);
         }
     }
-
-
-    // ----- inner class : SystemTime ----------------------------------------
-    private static class SystemTime implements Time {
-        @Override
-        public long milliseconds() {
-            return System.currentTimeMillis();
-        }
-
-        @Override
-        public long nanoseconds() {
-            return System.nanoTime();
-        }
-
-        @Override
-        public long hiResClockMs() {
-            return TimeUnit.NANOSECONDS.toMillis(nanoseconds());
-        }
-
-        @Override
-        public void sleep(long arg0) {
-            try {
-                Thread.sleep(arg0);
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
 }


[atlas] 03/03: Atlas-4078 master:UI - Lineage tab not shown on UI for entity with type Dataset or Process

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 66ed1b6d83407f98c4a12c39600f6f6d9f97ef1f
Author: prasad pawar <pr...@freestoneinfotech.com>
AuthorDate: Tue Dec 15 11:29:53 2020 +0530

    Atlas-4078 master:UI - Lineage tab not shown on UI for entity with type Dataset or Process
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
    (cherry picked from commit 2d91fc1ba393265b8ec6bc3a5e29280ff716c98f)
---
 dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js | 6 +++++-
 dashboardv3/public/js/views/detail_page/DetailPageLayoutView.js | 8 ++++++--
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js b/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
index a6fcd8c..b878f6d 100644
--- a/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
+++ b/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
@@ -167,6 +167,7 @@ define(['require',
 
                     // check if entity is process
                     var isProcess = false,
+                        typeName = Utils.getName(collectionJSON, 'typeName'),
                         superTypes = Utils.getNestedSuperTypes({ data: this.activeEntityDef.toJSON(), collection: this.entityDefCollection }),
                         isLineageRender = _.find(superTypes, function(type) {
                             if (type === "DataSet" || type === "Process") {
@@ -176,6 +177,9 @@ define(['require',
                                 return true;
                             }
                         });
+                    if (!isLineageRender) {
+                        isLineageRender = (typeName === "DataSet" || typeName === "Process") ? true : null;
+                    }
 
                     if (collectionJSON && collectionJSON.guid) {
                         var tagGuid = collectionJSON.guid;
@@ -393,7 +397,7 @@ define(['require',
                     case "raudits":
                         regionRef = this.RReplicationAuditTableLayoutView;
                         break;
-                     case "profile":
+                    case "profile":
                         regionRef = this.RProfileLayoutView;
                         break;
                 }
diff --git a/dashboardv3/public/js/views/detail_page/DetailPageLayoutView.js b/dashboardv3/public/js/views/detail_page/DetailPageLayoutView.js
index 4a7cc3a..dbbbcde 100644
--- a/dashboardv3/public/js/views/detail_page/DetailPageLayoutView.js
+++ b/dashboardv3/public/js/views/detail_page/DetailPageLayoutView.js
@@ -171,6 +171,7 @@ define(['require',
 
                     // check if entity is process
                     var isProcess = false,
+                        typeName = Utils.getName(collectionJSON, 'typeName'),
                         superTypes = Utils.getNestedSuperTypes({ data: this.activeEntityDef.toJSON(), collection: this.entityDefCollection }),
                         isLineageRender = _.find(superTypes, function(type) {
                             if (type === "DataSet" || type === "Process") {
@@ -180,7 +181,9 @@ define(['require',
                                 return true;
                             }
                         });
-
+                    if (!isLineageRender) {
+                        isLineageRender = (typeName === "DataSet" || typeName === "Process") ? true : null;
+                    }
                     if (collectionJSON && collectionJSON.guid) {
                         var tagGuid = collectionJSON.guid;
                         this.readOnly = Enums.entityStateReadOnly[collectionJSON.status];
@@ -194,6 +197,7 @@ define(['require',
                     }
                     if (collectionJSON) {
                         this.name = Utils.getName(collectionJSON);
+
                         if (collectionJSON.attributes) {
                             if (collectionJSON.typeName) {
                                 collectionJSON.attributes.typeName = _.escape(collectionJSON.typeName);
@@ -397,7 +401,7 @@ define(['require',
                     case "raudits":
                         regionRef = this.RReplicationAuditTableLayoutView;
                         break;
-                     case "profile":
+                    case "profile":
                         regionRef = this.RProfileLayoutView;
                         break;
                 }


[atlas] 02/03: ATLAS-3864: Follow up change, removed remaining usage of zkclient even from poms

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit f9d4b22d4f07bdcd4df3cce1e9b1c34c764bec2f
Author: Andras Katona <ak...@cloudera.com>
AuthorDate: Thu Dec 3 16:48:14 2020 +0100

    ATLAS-3864: Follow up change, removed remaining usage of zkclient even from poms
    
    Also removing kafka core dependency from kafka-bridge since it's not used any more
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
    (cherry picked from commit ab92cf1e6db021e83d8b42ac559f077ab17afa65)
---
 addons/kafka-bridge/pom.xml                        |  19 ---
 .../org/apache/atlas/kafka/bridge/KafkaBridge.java |  21 +--
 .../apache/atlas/kafka/bridge/KafkaBridgeTest.java | 170 +++++++++++----------
 notification/pom.xml                               |  10 --
 pom.xml                                            |   7 -
 5 files changed, 99 insertions(+), 128 deletions(-)

diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml
index 8c252d9..2f12c5e 100644
--- a/addons/kafka-bridge/pom.xml
+++ b/addons/kafka-bridge/pom.xml
@@ -103,20 +103,6 @@
         </dependency>
 
         <dependency>
-            <groupId>com.101tec</groupId>
-            <artifactId>zkclient</artifactId>
-            <version>${zkclient.version}</version>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_${kafka.scala.binary.version}</artifactId>
-            <version>${kafka.version}</version>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-webapp</artifactId>
             <version>${jetty.version}</version>
@@ -206,11 +192,6 @@
                                         </artifactItem>
                                         <artifactItem>
                                             <groupId>org.apache.kafka</groupId>
-                                            <artifactId>kafka_${kafka.scala.binary.version}</artifactId>
-                                            <version>${kafka.version}</version>
-                                        </artifactItem>
-                                        <artifactItem>
-                                            <groupId>org.apache.kafka</groupId>
                                             <artifactId>kafka-clients</artifactId>
                                             <version>${kafka.version}</version>
                                         </artifactItem>
diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index 833a077..d22010d 100644
--- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -82,7 +82,7 @@ public class KafkaBridge {
     public static void main(String[] args) {
         int exitCode = EXIT_CODE_FAILED;
         AtlasClientV2 atlasClientV2 = null;
-        KafkaBridge importer = null;
+        KafkaUtils kafkaUtils = null;
 
         try {
             Options options = new Options();
@@ -111,8 +111,9 @@ public class KafkaBridge {
                 atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls);
             }
 
-            importer = new KafkaBridge(atlasConf, atlasClientV2);
+            kafkaUtils = new KafkaUtils(atlasConf);
 
+            KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2, kafkaUtils);
             if (StringUtils.isNotEmpty(fileToImport)) {
                 File f = new File(fileToImport);
 
@@ -146,25 +147,19 @@ public class KafkaBridge {
             if (atlasClientV2 != null) {
                 atlasClientV2.close();
             }
-            if (importer != null) {
-                importer.close();
+            if (kafkaUtils != null) {
+                kafkaUtils.close();
             }
         }
 
         System.exit(exitCode);
     }
 
-    public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception {
+    public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2, KafkaUtils kafkaUtils) throws Exception {
         this.atlasClientV2     = atlasClientV2;
         this.metadataNamespace = getMetadataNamespace(atlasConf);
-        this.kafkaUtils        = new KafkaUtils(atlasConf);
-        this.availableTopics   = kafkaUtils.listAllTopics();
-    }
-
-    public void close() {
-        if (this.kafkaUtils != null) {
-            this.kafkaUtils.close();
-        }
+        this.kafkaUtils        = kafkaUtils;
+        this.availableTopics   = this.kafkaUtils.listAllTopics();
     }
 
     private String getMetadataNamespace(Configuration config) {
diff --git a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
index c8cc85c..f86ceb5 100644
--- a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
+++ b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
@@ -18,115 +18,127 @@
 
 package org.apache.atlas.kafka.bridge;
 
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.atlas.AtlasClient;
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClientV2;
-import org.apache.atlas.AtlasServiceException;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.atlas.kafka.bridge.KafkaBridge;
 import org.apache.atlas.kafka.model.KafkaDataTypes;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.mockito.Mock;
+import org.apache.atlas.utils.KafkaUtils;
+import org.mockito.ArgumentCaptor;
 import org.mockito.MockitoAnnotations;
-import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import scala.Option;
-import scala.collection.JavaConverters;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 
 public class KafkaBridgeTest {
 
     private static final String TEST_TOPIC_NAME = "test_topic";
-    public static final String CLUSTER_NAME = "primary";
-
-    @Mock
-    private ZkClient zkClient;
-
-    @Mock
-    private ZkConnection zkConnection;
-
-    @Mock
-    private AtlasClient atlasClient;
-
-    @Mock
-    private AtlasClientV2 atlasClientV2;
-
-    @Mock
-    private AtlasEntity atlasEntity;
-
-    @Mock
-    EntityMutationResponse entityMutationResponse;
-
-    @Mock
-    KafkaBridge kafkaBridge;
+    public static final AtlasEntity.AtlasEntityWithExtInfo TOPIC_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo(
+            getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
+    private static final String CLUSTER_NAME = "primary";
+    private static final String TOPIC_QUALIFIED_NAME = KafkaBridge.getTopicQualifiedName(CLUSTER_NAME, TEST_TOPIC_NAME);
 
     @BeforeMethod
     public void initializeMocks() {
         MockitoAnnotations.initMocks(this);
     }
 
-
-    @Test
-    public void testImportTopic() throws Exception {
-
-        List<String> topics = setupTopic(zkClient, TEST_TOPIC_NAME);
-
-        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo(
-                getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
-        KafkaBridge kafkaBridge = mock(KafkaBridge.class);
-        when(kafkaBridge.createEntityInAtlas(atlasEntityWithExtInfo)).thenReturn(atlasEntityWithExtInfo);
-
-        try {
-            kafkaBridge.importTopic(TEST_TOPIC_NAME);
-        } catch (Exception e) {
-            Assert.fail("KafkaBridge import failed ", e);
-        }
-    }
-
-    private void returnExistingTopic(String topicName, AtlasClientV2 atlasClientV2, String clusterName)
-            throws AtlasServiceException {
-
-        when(atlasClientV2.getEntityByAttribute(KafkaDataTypes.KAFKA_TOPIC.getName(),
-                Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                        getTopicQualifiedName(TEST_TOPIC_NAME,CLUSTER_NAME))))
-                .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo(
-                        getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"))));
-
-    }
-
-    private List<String> setupTopic(ZkClient zkClient, String topicName) {
-        List<String> topics = new ArrayList<>();
-        topics.add(topicName);
-        ZkUtils zkUtils = mock(ZkUtils.class);
-        when(zkUtils.getAllTopics()).thenReturn(JavaConverters.asScalaIteratorConverter(topics.iterator()).asScala().toSeq());
-        return topics;
-    }
-
-    private AtlasEntity getTopicEntityWithGuid(String guid) {
+    private static AtlasEntity getTopicEntityWithGuid(String guid) {
         AtlasEntity ret = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
         ret.setGuid(guid);
         return ret;
     }
 
-    private AtlasEntity createTopicReference() {
-        AtlasEntity topicEntity = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
-        return topicEntity;
+    @Test
+    public void testImportTopic() throws Exception {
+        KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+        when(mockKafkaUtils.listAllTopics())
+                .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+                .thenReturn(3);
+
+        EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class);
+        AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+        when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid());
+        when(mockCreateResponse.getCreatedEntities())
+                .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+        AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+        when(mockAtlasClientV2.createEntity(any()))
+                .thenReturn(mockCreateResponse);
+        when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid()))
+                .thenReturn(TOPIC_WITH_EXT_INFO);
+
+        KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+        bridge.importTopic(TEST_TOPIC_NAME);
+
+        ArgumentCaptor<AtlasEntity.AtlasEntityWithExtInfo> argumentCaptor = ArgumentCaptor.forClass(AtlasEntity.AtlasEntityWithExtInfo.class);
+        verify(mockAtlasClientV2).createEntity(argumentCaptor.capture());
+        AtlasEntity.AtlasEntityWithExtInfo entity = argumentCaptor.getValue();
+        assertEquals(entity.getEntity().getAttribute("qualifiedName"), TOPIC_QUALIFIED_NAME);
     }
 
-    private String createTestTopic(String testTopic) {
-        return new String(testTopic);
+    @Test
+    public void testCreateTopic() throws Exception {
+        KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+        when(mockKafkaUtils.listAllTopics())
+                .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+                .thenReturn(3);
+
+        EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class);
+        AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+        when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid());
+        when(mockCreateResponse.getCreatedEntities())
+                .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+        AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+        when(mockAtlasClientV2.createEntity(any()))
+                .thenReturn(mockCreateResponse);
+        when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid()))
+                .thenReturn(TOPIC_WITH_EXT_INFO);
+
+        KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+        AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateTopic(TEST_TOPIC_NAME);
+
+        assertEquals(TOPIC_WITH_EXT_INFO, ret);
     }
 
-    private static String getTopicQualifiedName(String clusterName, String topic) {
-        return String.format("%s@%s", topic.toLowerCase(), clusterName);
+    @Test
+    public void testUpdateTopic() throws Exception {
+        KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+        when(mockKafkaUtils.listAllTopics())
+                .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+                .thenReturn(3);
+
+        EntityMutationResponse mockUpdateResponse = mock(EntityMutationResponse.class);
+        AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+        when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid());
+        when(mockUpdateResponse.getUpdatedEntities())
+                .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+        AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+        when(mockAtlasClientV2.getEntityByAttribute(eq(KafkaDataTypes.KAFKA_TOPIC.getName()), any()))
+                .thenReturn(TOPIC_WITH_EXT_INFO);
+        when(mockAtlasClientV2.updateEntity(any()))
+                .thenReturn(mockUpdateResponse);
+        when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid()))
+                .thenReturn(TOPIC_WITH_EXT_INFO);
+
+        KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+        AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateTopic(TEST_TOPIC_NAME);
+
+        assertEquals(TOPIC_WITH_EXT_INFO, ret);
     }
 }
\ No newline at end of file
diff --git a/notification/pom.xml b/notification/pom.xml
index 6afa22f..69ed245 100644
--- a/notification/pom.xml
+++ b/notification/pom.xml
@@ -83,11 +83,6 @@
         </dependency>
 
         <dependency>
-            <groupId>com.101tec</groupId>
-            <artifactId>zkclient</artifactId>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-intg</artifactId>
             <classifier>tests</classifier>
@@ -179,11 +174,6 @@
                                     <version>${kafka.version}</version>
                                 </artifactItem>
                                 <artifactItem>
-                                    <groupId>com.101tec</groupId>
-                                    <artifactId>zkclient</artifactId>
-                                    <version>${zkclient.version}</version>
-                                </artifactItem>
-                                <artifactItem>
                                     <groupId>org.apache.zookeeper</groupId>
                                     <artifactId>zookeeper</artifactId>
                                     <version>3.4.6</version>
diff --git a/pom.xml b/pom.xml
index 81719e5..34fbe6f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -747,7 +747,6 @@
         <testng.version>6.9.4</testng.version>
         <tinkerpop.version>3.4.6</tinkerpop.version>
         <woodstox-core.version>5.0.3</woodstox-core.version>
-        <zkclient.version>0.8</zkclient.version>
         <zookeeper.version>3.4.6</zookeeper.version>
     </properties>
 
@@ -1657,12 +1656,6 @@
                 <version>${project.version}</version>
             </dependency>
 
-            <dependency>
-                <groupId>com.101tec</groupId>
-                <artifactId>zkclient</artifactId>
-                <version>${zkclient.version}</version>
-            </dependency>
-          
           <!-- Fix for cassandra-all tranitive dependency CVE-2017-18640 : https://nvd.nist.gov/vuln/detail/CVE-2017-18640 -->
             <dependency>
                 <groupId>org.yaml</groupId>