You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2020/12/02 19:35:07 UTC
[atlas] branch branch-2.0 updated: ATLAS-4043: Added option to list
internal kafka topic as well through Kafka AdminClient API
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new ee1f84a ATLAS-4043: Added option to list internal kafka topic as well through Kafka AdminClient API
ee1f84a is described below
commit ee1f84ac29bc24ee2c55abcd7f213b56c080482c
Author: Deep Singh <de...@gmail.com>
AuthorDate: Mon Nov 30 12:57:39 2020 -0600
ATLAS-4043: Added option to list internal kafka topic as well through Kafka AdminClient API
Signed-off-by: Sarath Subramanian <sa...@apache.org>
(cherry picked from commit cbc3427133bfc1f119685b33f6d61f8d34f9f3ab)
---
common/src/main/java/org/apache/atlas/utils/KafkaUtils.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
index 7a397b1..7e9f789 100644
--- a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
+++ b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TopicExistsException;
@@ -57,6 +58,7 @@ public class KafkaUtils implements AutoCloseable {
private static final String JAAS_PRINCIPAL_PROP = "principal";
private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
+ private static final String IMPORT_INTERNAL_TOPICS = "atlas.hook.kafka.import.internal.topics";
public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka";
@@ -64,6 +66,8 @@ public class KafkaUtils implements AutoCloseable {
final protected AdminClient adminClient;
+ final protected boolean importInternalTopics;
+
public KafkaUtils(Configuration atlasConfiguration) {
if(LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils() ");
@@ -72,6 +76,7 @@ public class KafkaUtils implements AutoCloseable {
setKafkaJAASProperties(atlasConfiguration, kafkaConfiguration);
adminClient = AdminClient.create(this.kafkaConfiguration);
+ importInternalTopics = atlasConfiguration.getBoolean(IMPORT_INTERNAL_TOPICS, false);
if(LOG.isDebugEnabled()) {
LOG.debug("<== KafkaUtils() ");
@@ -106,7 +111,7 @@ public class KafkaUtils implements AutoCloseable {
if(LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils.listAllTopics() ");
}
- ListTopicsResult listTopicsResult = adminClient.listTopics();
+ ListTopicsResult listTopicsResult = adminClient.listTopics((new ListTopicsOptions()).listInternal(importInternalTopics));
List<String> topicNameList = new ArrayList<>(listTopicsResult.names().get());
if(LOG.isDebugEnabled()) {