You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2020/12/02 19:35:07 UTC

[atlas] branch branch-2.0 updated: ATLAS-4043: Added option to list internal kafka topic as well through Kafka AdminClient API

This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new ee1f84a  ATLAS-4043: Added option to list internal kafka topic as well through Kafka AdminClient API
ee1f84a is described below

commit ee1f84ac29bc24ee2c55abcd7f213b56c080482c
Author: Deep Singh <de...@gmail.com>
AuthorDate: Mon Nov 30 12:57:39 2020 -0600

    ATLAS-4043: Added option to list internal kafka topic as well through Kafka AdminClient API
    
    Signed-off-by: Sarath Subramanian <sa...@apache.org>
    (cherry picked from commit cbc3427133bfc1f119685b33f6d61f8d34f9f3ab)
---
 common/src/main/java/org/apache/atlas/utils/KafkaUtils.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
index 7a397b1..7e9f789 100644
--- a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
+++ b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.errors.TopicExistsException;
@@ -57,6 +58,7 @@ public class KafkaUtils implements AutoCloseable {
     private static final String JAAS_PRINCIPAL_PROP = "principal";
     private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
     private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
+    private static final String IMPORT_INTERNAL_TOPICS = "atlas.hook.kafka.import.internal.topics";
 
     public static final String ATLAS_KAFKA_PROPERTY_PREFIX   = "atlas.kafka";
 
@@ -64,6 +66,8 @@ public class KafkaUtils implements AutoCloseable {
 
     final protected AdminClient adminClient;
 
+    final protected boolean importInternalTopics;
+
     public KafkaUtils(Configuration atlasConfiguration) {
         if(LOG.isDebugEnabled()) {
             LOG.debug("==> KafkaUtils() ");
@@ -72,6 +76,7 @@ public class KafkaUtils implements AutoCloseable {
 
         setKafkaJAASProperties(atlasConfiguration, kafkaConfiguration);
         adminClient = AdminClient.create(this.kafkaConfiguration);
+        importInternalTopics = atlasConfiguration.getBoolean(IMPORT_INTERNAL_TOPICS, false);
 
         if(LOG.isDebugEnabled()) {
             LOG.debug("<== KafkaUtils() ");
@@ -106,7 +111,7 @@ public class KafkaUtils implements AutoCloseable {
         if(LOG.isDebugEnabled()) {
             LOG.debug("==> KafkaUtils.listAllTopics() ");
         }
-        ListTopicsResult listTopicsResult = adminClient.listTopics();
+        ListTopicsResult listTopicsResult = adminClient.listTopics((new ListTopicsOptions()).listInternal(importInternalTopics));
         List<String> topicNameList = new ArrayList<>(listTopicsResult.names().get());
 
         if(LOG.isDebugEnabled()) {