You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ni...@apache.org on 2020/12/16 06:12:28 UTC
[atlas] 02/03: ATLAS-3864: Follow up change,
removed remaining usage of zkclient even from poms
This is an automated email from the ASF dual-hosted git repository.
nixon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit ab92cf1e6db021e83d8b42ac559f077ab17afa65
Author: Andras Katona <ak...@cloudera.com>
AuthorDate: Thu Dec 3 16:48:14 2020 +0100
ATLAS-3864: Follow up change, removed remaining usage of zkclient even from poms
Also removing kafka core dependency from kafka-bridge since it's not used any more
Signed-off-by: nixonrodrigues <ni...@apache.org>
---
addons/kafka-bridge/pom.xml | 19 ---
.../org/apache/atlas/kafka/bridge/KafkaBridge.java | 21 +--
.../apache/atlas/kafka/bridge/KafkaBridgeTest.java | 170 +++++++++++----------
notification/pom.xml | 10 --
pom.xml | 7 -
5 files changed, 99 insertions(+), 128 deletions(-)
diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml
index b4846de..7fe97eb 100644
--- a/addons/kafka-bridge/pom.xml
+++ b/addons/kafka-bridge/pom.xml
@@ -103,20 +103,6 @@
</dependency>
<dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>${zkclient.version}</version>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${kafka.scala.binary.version}</artifactId>
- <version>${kafka.version}</version>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jetty.version}</version>
@@ -206,11 +192,6 @@
</artifactItem>
<artifactItem>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_${kafka.scala.binary.version}</artifactId>
- <version>${kafka.version}</version>
- </artifactItem>
- <artifactItem>
- <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</artifactItem>
diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index 833a077..d22010d 100644
--- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -82,7 +82,7 @@ public class KafkaBridge {
public static void main(String[] args) {
int exitCode = EXIT_CODE_FAILED;
AtlasClientV2 atlasClientV2 = null;
- KafkaBridge importer = null;
+ KafkaUtils kafkaUtils = null;
try {
Options options = new Options();
@@ -111,8 +111,9 @@ public class KafkaBridge {
atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), urls);
}
- importer = new KafkaBridge(atlasConf, atlasClientV2);
+ kafkaUtils = new KafkaUtils(atlasConf);
+ KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2, kafkaUtils);
if (StringUtils.isNotEmpty(fileToImport)) {
File f = new File(fileToImport);
@@ -146,25 +147,19 @@ public class KafkaBridge {
if (atlasClientV2 != null) {
atlasClientV2.close();
}
- if (importer != null) {
- importer.close();
+ if (kafkaUtils != null) {
+ kafkaUtils.close();
}
}
System.exit(exitCode);
}
- public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2) throws Exception {
+ public KafkaBridge(Configuration atlasConf, AtlasClientV2 atlasClientV2, KafkaUtils kafkaUtils) throws Exception {
this.atlasClientV2 = atlasClientV2;
this.metadataNamespace = getMetadataNamespace(atlasConf);
- this.kafkaUtils = new KafkaUtils(atlasConf);
- this.availableTopics = kafkaUtils.listAllTopics();
- }
-
- public void close() {
- if (this.kafkaUtils != null) {
- this.kafkaUtils.close();
- }
+ this.kafkaUtils = kafkaUtils;
+ this.availableTopics = this.kafkaUtils.listAllTopics();
}
private String getMetadataNamespace(Configuration config) {
diff --git a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
index c8cc85c..f86ceb5 100644
--- a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
+++ b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
@@ -18,115 +18,127 @@
package org.apache.atlas.kafka.bridge;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.atlas.AtlasClient;
+import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
-import org.apache.atlas.AtlasServiceException;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.atlas.kafka.bridge.KafkaBridge;
import org.apache.atlas.kafka.model.KafkaDataTypes;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.mockito.Mock;
+import org.apache.atlas.utils.KafkaUtils;
+import org.mockito.ArgumentCaptor;
import org.mockito.MockitoAnnotations;
-import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import scala.Option;
-import scala.collection.JavaConverters;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
public class KafkaBridgeTest {
private static final String TEST_TOPIC_NAME = "test_topic";
- public static final String CLUSTER_NAME = "primary";
-
- @Mock
- private ZkClient zkClient;
-
- @Mock
- private ZkConnection zkConnection;
-
- @Mock
- private AtlasClient atlasClient;
-
- @Mock
- private AtlasClientV2 atlasClientV2;
-
- @Mock
- private AtlasEntity atlasEntity;
-
- @Mock
- EntityMutationResponse entityMutationResponse;
-
- @Mock
- KafkaBridge kafkaBridge;
+ public static final AtlasEntity.AtlasEntityWithExtInfo TOPIC_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo(
+ getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
+ private static final String CLUSTER_NAME = "primary";
+ private static final String TOPIC_QUALIFIED_NAME = KafkaBridge.getTopicQualifiedName(CLUSTER_NAME, TEST_TOPIC_NAME);
@BeforeMethod
public void initializeMocks() {
MockitoAnnotations.initMocks(this);
}
-
- @Test
- public void testImportTopic() throws Exception {
-
- List<String> topics = setupTopic(zkClient, TEST_TOPIC_NAME);
-
- AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = new AtlasEntity.AtlasEntityWithExtInfo(
- getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
- KafkaBridge kafkaBridge = mock(KafkaBridge.class);
- when(kafkaBridge.createEntityInAtlas(atlasEntityWithExtInfo)).thenReturn(atlasEntityWithExtInfo);
-
- try {
- kafkaBridge.importTopic(TEST_TOPIC_NAME);
- } catch (Exception e) {
- Assert.fail("KafkaBridge import failed ", e);
- }
- }
-
- private void returnExistingTopic(String topicName, AtlasClientV2 atlasClientV2, String clusterName)
- throws AtlasServiceException {
-
- when(atlasClientV2.getEntityByAttribute(KafkaDataTypes.KAFKA_TOPIC.getName(),
- Collections.singletonMap(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
- getTopicQualifiedName(TEST_TOPIC_NAME,CLUSTER_NAME))))
- .thenReturn((new AtlasEntity.AtlasEntityWithExtInfo(
- getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"))));
-
- }
-
- private List<String> setupTopic(ZkClient zkClient, String topicName) {
- List<String> topics = new ArrayList<>();
- topics.add(topicName);
- ZkUtils zkUtils = mock(ZkUtils.class);
- when(zkUtils.getAllTopics()).thenReturn(JavaConverters.asScalaIteratorConverter(topics.iterator()).asScala().toSeq());
- return topics;
- }
-
- private AtlasEntity getTopicEntityWithGuid(String guid) {
+ private static AtlasEntity getTopicEntityWithGuid(String guid) {
AtlasEntity ret = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
ret.setGuid(guid);
return ret;
}
- private AtlasEntity createTopicReference() {
- AtlasEntity topicEntity = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
- return topicEntity;
+ @Test
+ public void testImportTopic() throws Exception {
+ KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+ when(mockKafkaUtils.listAllTopics())
+ .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+ when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+ .thenReturn(3);
+
+ EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class);
+ AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+ when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid());
+ when(mockCreateResponse.getCreatedEntities())
+ .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+ AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+ when(mockAtlasClientV2.createEntity(any()))
+ .thenReturn(mockCreateResponse);
+ when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid()))
+ .thenReturn(TOPIC_WITH_EXT_INFO);
+
+ KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+ bridge.importTopic(TEST_TOPIC_NAME);
+
+ ArgumentCaptor<AtlasEntity.AtlasEntityWithExtInfo> argumentCaptor = ArgumentCaptor.forClass(AtlasEntity.AtlasEntityWithExtInfo.class);
+ verify(mockAtlasClientV2).createEntity(argumentCaptor.capture());
+ AtlasEntity.AtlasEntityWithExtInfo entity = argumentCaptor.getValue();
+ assertEquals(entity.getEntity().getAttribute("qualifiedName"), TOPIC_QUALIFIED_NAME);
}
- private String createTestTopic(String testTopic) {
- return new String(testTopic);
+ @Test
+ public void testCreateTopic() throws Exception {
+ KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+ when(mockKafkaUtils.listAllTopics())
+ .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+ when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+ .thenReturn(3);
+
+ EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class);
+ AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+ when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid());
+ when(mockCreateResponse.getCreatedEntities())
+ .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+ AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+ when(mockAtlasClientV2.createEntity(any()))
+ .thenReturn(mockCreateResponse);
+ when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid()))
+ .thenReturn(TOPIC_WITH_EXT_INFO);
+
+ KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+ AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateTopic(TEST_TOPIC_NAME);
+
+ assertEquals(TOPIC_WITH_EXT_INFO, ret);
}
- private static String getTopicQualifiedName(String clusterName, String topic) {
- return String.format("%s@%s", topic.toLowerCase(), clusterName);
+ @Test
+ public void testUpdateTopic() throws Exception {
+ KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+ when(mockKafkaUtils.listAllTopics())
+ .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+ when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+ .thenReturn(3);
+
+ EntityMutationResponse mockUpdateResponse = mock(EntityMutationResponse.class);
+ AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+ when(mockAtlasEntityHeader.getGuid()).thenReturn(TOPIC_WITH_EXT_INFO.getEntity().getGuid());
+ when(mockUpdateResponse.getUpdatedEntities())
+ .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+ AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+ when(mockAtlasClientV2.getEntityByAttribute(eq(KafkaDataTypes.KAFKA_TOPIC.getName()), any()))
+ .thenReturn(TOPIC_WITH_EXT_INFO);
+ when(mockAtlasClientV2.updateEntity(any()))
+ .thenReturn(mockUpdateResponse);
+ when(mockAtlasClientV2.getEntityByGuid(TOPIC_WITH_EXT_INFO.getEntity().getGuid()))
+ .thenReturn(TOPIC_WITH_EXT_INFO);
+
+ KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+ AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateTopic(TEST_TOPIC_NAME);
+
+ assertEquals(TOPIC_WITH_EXT_INFO, ret);
}
}
\ No newline at end of file
diff --git a/notification/pom.xml b/notification/pom.xml
index 971116f..28d13bc 100644
--- a/notification/pom.xml
+++ b/notification/pom.xml
@@ -83,11 +83,6 @@
</dependency>
<dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-intg</artifactId>
<classifier>tests</classifier>
@@ -179,11 +174,6 @@
<version>${kafka.version}</version>
</artifactItem>
<artifactItem>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>${zkclient.version}</version>
- </artifactItem>
- <artifactItem>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
diff --git a/pom.xml b/pom.xml
index 8cb5778..b1951f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -747,7 +747,6 @@
<testng.version>6.9.4</testng.version>
<tinkerpop.version>3.4.6</tinkerpop.version>
<woodstox-core.version>5.0.3</woodstox-core.version>
- <zkclient.version>0.8</zkclient.version>
<zookeeper.version>3.4.6</zookeeper.version>
</properties>
@@ -1657,12 +1656,6 @@
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>${zkclient.version}</version>
- </dependency>
-
<!-- Fix for cassandra-all tranitive dependency CVE-2017-18640 : https://nvd.nist.gov/vuln/detail/CVE-2017-18640 -->
<dependency>
<groupId>org.yaml</groupId>