You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2020/12/16 06:12:28 UTC

[atlas] 02/03: ATLAS-3864: Follow up change, removed remaining usage of zkclient even from poms

This is an automated email from the ASF dual-hosted git repository.

nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit ab92cf1e6db021e83d8b42ac559f077ab17afa65
Author: Andras Katona <ak...@cloudera.com>
AuthorDate: Thu Dec 3 16:48:14 2020 +0100

    ATLAS-3864: Follow up change, removed remaining usage of zkclient even from poms
    
    Also removing kafka core dependency from kafka-bridge since it's not used any more
    
    Signed-off-by: nixonrodrigues <ni...@apache.org>
---
 addons/kafka-bridge/pom.xml                        |  19 ---
 .../org/apache/atlas/kafka/bridge/KafkaBridge.java |  21 +--
 .../apache/atlas/kafka/bridge/KafkaBridgeTest.java | 170 +++++++++++----------
 notification/pom.xml                               |  10 --
 pom.xml                                            |   7 -
 5 files changed, 99 insertions(+), 128 deletions(-)

diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml
index b4846de..7fe97eb 100644
--- a/addons/kafka-bridge/pom.xml
+++ b/addons/kafka-bridge/pom.xml
@@ -103,20 +103,6 @@
         </dependency>
 
         <dependency>
-            <groupId>com.101tec</groupId>
-            <artifactId>zkclient</artifactId>
-            <version>${zkclient.version}</version>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_${kafka.scala.binary.version}</artifactId>
-            <version>${kafka.version}</version>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-webapp</artifactId>
             <version>${jetty.version}</version>
@@ -206,11 +192,6 @@
                                         </artifactItem>
                                         <artifactItem>
                                             <groupId>org.apache.kafka</groupId>
-                                            <artifactId>kafka_${kafka.scala.binary.version}</artifactId>
-                                            <version>${kafka.version}</version>
-                                        </artifactItem>
-                                        <artifactItem>
-                                            <groupId>org.apache.kafka</groupId>
                                             <artifactId>kafka-clients</artifactId>
                                             <version>${kafka.version}</version>
                                         </artifactItem>
diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index 833a077..d22010d 100644
--- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -82,7 +82,7 @@ public class KafkaBridge {
     public static void main(String[] args) {
         int exitCode = EXIT_CODE_FAILED;
         AtlasClientV2 atlasClientV2 = null;
-        KafkaBridge importer = null;
+        KafkaUtils kafkaUtils = null;
 
         try {
             Options options = new Options();
@@ -111,8 +111,9 @@ public class KafkaBridge {
                 atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls);
             }
 
-            importer = new KafkaBridge(atlasConf, atlasClientV2);
+            kafkaUtils = new KafkaUtils(atlasConf);
 
+            KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2, kafkaUtils);
             if (StringUtils.isNotEmpty(fileToImport)) {
                 File f = new File(fileToImport);
 
@@ -146,25 +147,19 @@ public class KafkaBridge {
             if (atlasClientV2 != null) {
                 atlasClientV2.close();
             }
-            if (importer != null) {
-                importer.close();
+            if (kafkaUtils != null) {
+                kafkaUtils.close();
             }
         }
 
         System.exit(exitCode);
     }
 
-    public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception {
+    public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2, KafkaUtils kafkaUtils) throws Exception {
         this.atlasClientV2     = atlasClientV2;
         this.metadataNamespace = getMetadataNamespace(atlasConf);
-        this.kafkaUtils        = new KafkaUtils(atlasConf);
-        this.availableTopics   = kafkaUtils.listAllTopics();
-    }
-
-    public void close() {
-        if (this.kafkaUtils != null) {
-            this.kafkaUtils.close();
-        }
+        this.kafkaUtils        = kafkaUtils;
+        this.availableTopics   = this.kafkaUtils.listAllTopics();
     }
 
     private String getMetadataNamespace(Configuration config) {
diff --git a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
index c8cc85c..f86ceb5 100644
--- a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
+++ b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
@@ -18,115 +18,127 @@
 
 package org.apache.atlas.kafka.bridge;
 
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.atlas.AtlasClient;
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClientV2;
-import org.apache.atlas.AtlasServiceException;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.atlas.kafka.bridge.KafkaBridge;
 import org.apache.atlas.kafka.model.KafkaDataTypes;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.mockito.Mock;
+import org.apache.atlas.utils.KafkaUtils;
+import org.mockito.ArgumentCaptor;
 import org.mockito.MockitoAnnotations;
-import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import scala.Option;
-import scala.collection.JavaConverters;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 
 public class KafkaBridgeTest {
 
     private static final String TEST_TOPIC_NAME = "test_topic";
-    public static final String CLUSTER_NAME = "primary";
-
-    @Mock
-    private ZkClient zkClient;
-
-    @Mock
-    private ZkConnection zkConnection;
-
-    @Mock
-    private AtlasClient atlasClient;
-
-    @Mock
-    private AtlasClientV2 atlasClientV2;
-
-    @Mock
-    private AtlasEntity atlasEntity;
-
-    @Mock
-    EntityMutationResponse entityMutationResponse;
-
-    @Mock
-    KafkaBridge kafkaBridge;
+    public static final AtlasEntity.AtlasEntityWithExtInfo TOPIC_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo(
+            getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
+    private static final String CLUSTER_NAME = "primary";
+    private static final String TOPIC_QUALIFIED_NAME = KafkaBridge.getTopicQualifiedName(CLUSTER_NAME, TEST_TOPIC_NAME);
 
     @BeforeMethod
     public void initializeMocks() {
         MockitoAnnotations.initMocks(this);
     }
 
-
-    @Test
-    public void testImportTopic() throws Exception {
-
-        List<String> topics = setupTopic(zkClient, TEST_TOPIC_NAME);
-
-        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo(
-                getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
-        KafkaBridge kafkaBridge = mock(KafkaBridge.class);
-        when(kafkaBridge.createEntityInAtlas(atlasEntityWithExtInfo)).thenReturn(atlasEntityWithExtInfo);
-
-        try {
-            kafkaBridge.importTopic(TEST_TOPIC_NAME);
-        } catch (Exception e) {
-            Assert.fail("KafkaBridge import failed ", e);
-        }
-    }
-
-    private void returnExistingTopic(String topicName, AtlasClientV2 atlasClientV2, String clusterName)
-            throws AtlasServiceException {
-
-        when(atlasClientV2.getEntityByAttribute(KafkaDataTypes.KAFKA_TOPIC.getName(),
-                Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                        getTopicQualifiedName(TEST_TOPIC_NAME,CLUSTER_NAME))))
-                .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo(
-                        getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"))));
-
-    }
-
-    private List<String> setupTopic(ZkClient zkClient, String topicName) {
-        List<String> topics = new ArrayList<>();
-        topics.add(topicName);
-        ZkUtils zkUtils = mock(ZkUtils.class);
-        when(zkUtils.getAllTopics()).thenReturn(JavaConverters.asScalaIteratorConverter(topics.iterator()).asScala().toSeq());
-        return topics;
-    }
-
-    private AtlasEntity getTopicEntityWithGuid(String guid) {
+    private static AtlasEntity getTopicEntityWithGuid(String guid) {
         AtlasEntity ret = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
         ret.setGuid(guid);
         return ret;
     }
 
-    private AtlasEntity createTopicReference() {
-        AtlasEntity topicEntity = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
-        return topicEntity;
+    @Test
+    public void testImportTopic() throws Exception {
+        KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+        when(mockKafkaUtils.listAllTopics())
+                .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+                .thenReturn(3);
+
+        EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class);
+        AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+        when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid());
+        when(mockCreateResponse.getCreatedEntities())
+                .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+        AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+        when(mockAtlasClientV2.createEntity(any()))
+                .thenReturn(mockCreateResponse);
+        when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid()))
+                .thenReturn(TOPIC_WITH_EXT_INFO);
+
+        KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+        bridge.importTopic(TEST_TOPIC_NAME);
+
+        ArgumentCaptor<AtlasEntity.AtlasEntityWithExtInfo> argumentCaptor = ArgumentCaptor.forClass(AtlasEntity.AtlasEntityWithExtInfo.class);
+        verify(mockAtlasClientV2).createEntity(argumentCaptor.capture());
+        AtlasEntity.AtlasEntityWithExtInfo entity = argumentCaptor.getValue();
+        assertEquals(entity.getEntity().getAttribute("qualifiedName"), TOPIC_QUALIFIED_NAME);
     }
 
-    private String createTestTopic(String testTopic) {
-        return new String(testTopic);
+    @Test
+    public void testCreateTopic() throws Exception {
+        KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+        when(mockKafkaUtils.listAllTopics())
+                .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+                .thenReturn(3);
+
+        EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class);
+        AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+        when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid());
+        when(mockCreateResponse.getCreatedEntities())
+                .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+        AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+        when(mockAtlasClientV2.createEntity(any()))
+                .thenReturn(mockCreateResponse);
+        when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid()))
+                .thenReturn(TOPIC_WITH_EXT_INFO);
+
+        KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+        AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateTopic(TEST_TOPIC_NAME);
+
+        assertEquals(TOPIC_WITH_EXT_INFO, ret);
     }
 
-    private static String getTopicQualifiedName(String clusterName, String topic) {
-        return String.format("%s@%s", topic.toLowerCase(), clusterName);
+    @Test
+    public void testUpdateTopic() throws Exception {
+        KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+        when(mockKafkaUtils.listAllTopics())
+                .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+                .thenReturn(3);
+
+        EntityMutationResponse mockUpdateResponse = mock(EntityMutationResponse.class);
+        AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+        when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid());
+        when(mockUpdateResponse.getUpdatedEntities())
+                .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+        AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+        when(mockAtlasClientV2.getEntityByAttribute(eq(KafkaDataTypes.KAFKA_TOPIC.getName()), any()))
+                .thenReturn(TOPIC_WITH_EXT_INFO);
+        when(mockAtlasClientV2.updateEntity(any()))
+                .thenReturn(mockUpdateResponse);
+        when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid()))
+                .thenReturn(TOPIC_WITH_EXT_INFO);
+
+        KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+        AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateTopic(TEST_TOPIC_NAME);
+
+        assertEquals(TOPIC_WITH_EXT_INFO, ret);
     }
 }
\ No newline at end of file
diff --git a/notification/pom.xml b/notification/pom.xml
index 971116f..28d13bc 100644
--- a/notification/pom.xml
+++ b/notification/pom.xml
@@ -83,11 +83,6 @@
         </dependency>
 
         <dependency>
-            <groupId>com.101tec</groupId>
-            <artifactId>zkclient</artifactId>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-intg</artifactId>
             <classifier>tests</classifier>
@@ -179,11 +174,6 @@
                                     <version>${kafka.version}</version>
                                 </artifactItem>
                                 <artifactItem>
-                                    <groupId>com.101tec</groupId>
-                                    <artifactId>zkclient</artifactId>
-                                    <version>${zkclient.version}</version>
-                                </artifactItem>
-                                <artifactItem>
                                     <groupId>org.apache.zookeeper</groupId>
                                     <artifactId>zookeeper</artifactId>
                                     <version>3.4.6</version>
diff --git a/pom.xml b/pom.xml
index 8cb5778..b1951f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -747,7 +747,6 @@
         <testng.version>6.9.4</testng.version>
         <tinkerpop.version>3.4.6</tinkerpop.version>
         <woodstox-core.version>5.0.3</woodstox-core.version>
-        <zkclient.version>0.8</zkclient.version>
         <zookeeper.version>3.4.6</zookeeper.version>
     </properties>
 
@@ -1657,12 +1656,6 @@
                 <version>${project.version}</version>
             </dependency>
 
-            <dependency>
-                <groupId>com.101tec</groupId>
-                <artifactId>zkclient</artifactId>
-                <version>${zkclient.version}</version>
-            </dependency>
-          
           <!-- Fix for cassandra-all tranitive dependency CVE-2017-18640 : https://nvd.nist.gov/vuln/detail/CVE-2017-18640 -->
             <dependency>
                 <groupId>org.yaml</groupId>