You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by sa...@apache.org on 2021/01/28 18:58:07 UTC
[atlas] branch master updated: ATLAS-4111: Add 'replicationFactor'
attribute to kafka_topic entity type + formatting changes
This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new ce1342d ATLAS-4111: Add 'replicationFactor' attribute to kafka_topic entity type + formatting changes
ce1342d is described below
commit ce1342dd6dd7cbc4d1e35bea3b26d08dc24bd67e
Author: Barnabas Maidics <b....@gmail.com>
AuthorDate: Mon Jan 25 14:26:45 2021 +0100
ATLAS-4111: Add 'replicationFactor' attribute to kafka_topic entity type + formatting changes
Signed-off-by: Sarath Subramanian <sa...@apache.org>
---
.../org/apache/atlas/kafka/bridge/KafkaBridge.java | 66 ++++----
.../patches/018-kafka_topic_add_rf_attribute.json | 23 +++
.../java/org/apache/atlas/utils/KafkaUtils.java | 173 ++++++++++++++-------
3 files changed, 174 insertions(+), 88 deletions(-)
diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index d22010d..bf74c67 100644
--- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -54,24 +54,23 @@ import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
public class KafkaBridge {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class);
-
- private static final int EXIT_CODE_SUCCESS = 0;
- private static final int EXIT_CODE_FAILED = 1;
- private static final String ATLAS_ENDPOINT = "atlas.rest.address";
- private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
- private static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
- private static final String KAFKA_METADATA_NAMESPACE = "atlas.metadata.namespace";
- private static final String DEFAULT_CLUSTER_NAME = "primary";
- private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
- private static final String DESCRIPTION_ATTR = "description";
- private static final String PARTITION_COUNT = "partitionCount";
- private static final String NAME = "name";
- private static final String URI = "uri";
- private static final String CLUSTERNAME = "clusterName";
- private static final String TOPIC = "topic";
-
- private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class);
+ private static final int EXIT_CODE_SUCCESS = 0;
+ private static final int EXIT_CODE_FAILED = 1;
+ private static final String ATLAS_ENDPOINT = "atlas.rest.address";
+ private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
+ private static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
+ private static final String KAFKA_METADATA_NAMESPACE = "atlas.metadata.namespace";
+ private static final String DEFAULT_CLUSTER_NAME = "primary";
+ private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ private static final String DESCRIPTION_ATTR = "description";
+ private static final String PARTITION_COUNT = "partitionCount";
+ private static final String REPLICATION_FACTOR = "replicationFactor";
+ private static final String NAME = "name";
+ private static final String URI = "uri";
+ private static final String CLUSTERNAME = "clusterName";
+ private static final String TOPIC = "topic";
+ private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
private final List<String> availableTopics;
private final String metadataNamespace;
@@ -80,9 +79,9 @@ public class KafkaBridge {
public static void main(String[] args) {
- int exitCode = EXIT_CODE_FAILED;
+ int exitCode = EXIT_CODE_FAILED;
AtlasClientV2 atlasClientV2 = null;
- KafkaUtils kafkaUtils = null;
+ KafkaUtils kafkaUtils = null;
try {
Options options = new Options();
@@ -114,14 +113,15 @@ public class KafkaBridge {
kafkaUtils = new KafkaUtils(atlasConf);
KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2, kafkaUtils);
+
if (StringUtils.isNotEmpty(fileToImport)) {
File f = new File(fileToImport);
if (f.exists() && f.canRead()) {
BufferedReader br = new BufferedReader(new FileReader(f));
- String line = null;
+ String line;
- while((line = br.readLine()) != null) {
+ while ((line = br.readLine()) != null) {
topicToImport = line.trim();
importer.importTopic(topicToImport);
@@ -138,15 +138,19 @@ public class KafkaBridge {
}
} catch(ParseException e) {
LOG.error("Failed to parse arguments. Error: ", e.getMessage());
+
printUsage();
} catch(Exception e) {
System.out.println("ImportKafkaEntities failed. Please check the log file for the detailed error message");
+
e.printStackTrace();
+
LOG.error("ImportKafkaEntities failed", e);
} finally {
if (atlasClientV2 != null) {
atlasClientV2.close();
}
+
if (kafkaUtils != null) {
kafkaUtils.close();
}
@@ -175,16 +179,18 @@ public class KafkaBridge {
if (StringUtils.isNotEmpty(topicToImport)) {
List<String> topics_subset = new ArrayList<>();
- for(String topic : topics) {
+
+ for (String topic : topics) {
if (Pattern.compile(topicToImport).matcher(topic).matches()) {
topics_subset.add(topic);
}
}
+
topics = topics_subset;
}
if (CollectionUtils.isNotEmpty(topics)) {
- for(String topic : topics) {
+ for (String topic : topics) {
createOrUpdateTopic(topic);
}
}
@@ -234,11 +240,14 @@ public class KafkaBridge {
ret.setAttribute(NAME,topic);
ret.setAttribute(DESCRIPTION_ATTR, topic);
ret.setAttribute(URI, topic);
+
try {
ret.setAttribute(PARTITION_COUNT, kafkaUtils.getPartitionCount(topic));
+ ret.setAttribute(REPLICATION_FACTOR, kafkaUtils.getReplicationFactor(topic));
} catch (ExecutionException | InterruptedException e) {
- LOG.error("Error while getting partition count for topic :" + topic, e);
- throw new Exception("Error while getting partition count for topic :" + topic, e);
+ LOG.error("Error while getting partition data for topic :" + topic, e);
+
+ throw new Exception("Error while getting partition data for topic :" + topic, e);
}
return ret;
@@ -254,6 +263,7 @@ public class KafkaBridge {
try {
ret = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(), topicQualifiedName);
+
clearRelationshipAttributes(ret);
} catch (Exception e) {
ret = null; // entity doesn't exist in Atlas
@@ -288,7 +298,7 @@ public class KafkaBridge {
@VisibleForTesting
AtlasEntityWithExtInfo updateEntityInAtlas(AtlasEntityWithExtInfo entity) throws Exception {
- AtlasEntityWithExtInfo ret = null;
+ AtlasEntityWithExtInfo ret;
EntityMutationResponse response = atlasClientV2.updateEntity(entity);
if (response != null) {
@@ -348,4 +358,4 @@ public class KafkaBridge {
entity.getRelationshipAttributes().clear();
}
}
-}
+}
\ No newline at end of file
diff --git a/addons/models/1000-Hadoop/patches/018-kafka_topic_add_rf_attribute.json b/addons/models/1000-Hadoop/patches/018-kafka_topic_add_rf_attribute.json
new file mode 100644
index 0000000..cc74bad
--- /dev/null
+++ b/addons/models/1000-Hadoop/patches/018-kafka_topic_add_rf_attribute.json
@@ -0,0 +1,23 @@
+{
+ "patches": [
+ {
+ "id": "TYPEDEF_PATCH_1000_018_001",
+ "description": "Add 'replicationFactor' attribute to kafka_topic",
+ "action": "ADD_ATTRIBUTE",
+ "typeName": "kafka_topic",
+ "applyToVersion": "1.7",
+ "updateToVersion": "1.8",
+ "params": null,
+ "attributeDefs": [
+ {
+ "name": "replicationFactor",
+ "typeName": "int",
+ "cardinality": "SINGLE",
+ "isIndexable": true,
+ "isOptional": true,
+ "isUnique": false
+ }
+ ]
+ }
+ ]
+}
diff --git a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
index 14e205a..eea3311 100644
--- a/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
+++ b/common/src/main/java/org/apache/atlas/utils/KafkaUtils.java
@@ -48,37 +48,37 @@ public class KafkaUtils implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
- static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
- private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
- private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName";
- private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag";
- private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required";
+ private static final String JAAS_CONFIG_PREFIX_PARAM = "atlas.jaas";
+ private static final String JAAS_CONFIG_LOGIN_MODULE_NAME_PARAM = "loginModuleName";
+ private static final String JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM = "loginModuleControlFlag";
+ private static final String JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG = "required";
private static final String JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS = "optional|requisite|sufficient|required";
- private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
- private static final String JAAS_PRINCIPAL_PROP = "principal";
- private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
- private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
- private static final String IMPORT_INTERNAL_TOPICS = "atlas.kafka.bridge.enable.internal.topics.import";
+ private static final String JAAS_CONFIG_LOGIN_OPTIONS_PREFIX = "option";
+ private static final String JAAS_PRINCIPAL_PROP = "principal";
+ private static final String JAAS_DEFAULT_CLIENT_NAME = "KafkaClient";
+ private static final String JAAS_TICKET_BASED_CLIENT_NAME = "ticketBased-KafkaClient";
+ private static final String IMPORT_INTERNAL_TOPICS = "atlas.kafka.bridge.enable.internal.topics.import";
- public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka";
-
- final protected Properties kafkaConfiguration;
+ public static final String ATLAS_KAFKA_PROPERTY_PREFIX = "atlas.kafka";
+ public static final String KAFKA_SASL_JAAS_CONFIG_PROPERTY = "sasl.jaas.config";
+ final protected Properties kafkaConfiguration;
final protected AdminClient adminClient;
-
- final protected boolean importInternalTopics;
+ final protected boolean importInternalTopics;
public KafkaUtils(Configuration atlasConfiguration) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils() ");
}
+
this.kafkaConfiguration = ApplicationProperties.getSubsetAsProperties(atlasConfiguration, ATLAS_KAFKA_PROPERTY_PREFIX);
setKafkaJAASProperties(atlasConfiguration, kafkaConfiguration);
- adminClient = AdminClient.create(this.kafkaConfiguration);
+
+ adminClient = AdminClient.create(this.kafkaConfiguration);
importInternalTopics = atlasConfiguration.getBoolean(IMPORT_INTERNAL_TOPICS, false);
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("<== KafkaUtils() ");
}
}
@@ -86,100 +86,143 @@ public class KafkaUtils implements AutoCloseable {
public void createTopics(List<String> topicNames, int numPartitions, int replicationFactor)
throws TopicExistsException, ExecutionException, InterruptedException {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("==> createTopics() ");
}
List<NewTopic> newTopicList = topicNames.stream()
- .map(topicName -> new NewTopic(topicName, numPartitions, (short) replicationFactor))
- .collect(Collectors.toList());
+ .map(topicName -> new NewTopic(topicName, numPartitions, (short) replicationFactor))
+ .collect(Collectors.toList());
+
+ CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopicList);
+ Map<String, KafkaFuture<Void>> futureMap = createTopicsResult.values();
- CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopicList);
- Map<String, KafkaFuture<Void>> futureMap = createTopicsResult.values();
- for(Map.Entry<String, KafkaFuture<Void>> futureEntry : futureMap.entrySet()) {
- String topicName = futureEntry.getKey();
+ for (Map.Entry<String, KafkaFuture<Void>> futureEntry : futureMap.entrySet()) {
KafkaFuture<Void> future = futureEntry.getValue();
+
future.get();
}
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("<== createTopics() ");
}
}
public List<String> listAllTopics() throws ExecutionException, InterruptedException {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils.listAllTopics() ");
}
+
ListTopicsResult listTopicsResult = adminClient.listTopics((new ListTopicsOptions()).listInternal(importInternalTopics));
- List<String> topicNameList = new ArrayList<>(listTopicsResult.names().get());
+ List<String> ret = new ArrayList<>(listTopicsResult.names().get());
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("<== KafkaUtils.listAllTopics() ");
}
- return topicNameList;
+ return ret;
}
public Integer getPartitionCount(String topicName) throws ExecutionException, InterruptedException {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils.getPartitionCount({})", topicName);
}
- Integer partitionCount = null;
- DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName));
- Map<String, KafkaFuture<TopicDescription>> futureMap = describeTopicsResult.values();
- for(Map.Entry<String, KafkaFuture<TopicDescription>> futureEntry : futureMap.entrySet()) {
- KafkaFuture<TopicDescription> topicDescriptionFuture = futureEntry.getValue();
- TopicDescription topicDescription = topicDescriptionFuture.get();
- List<TopicPartitionInfo> partitionList = topicDescription.partitions();
- partitionCount = partitionList.size();
+ Integer ret = null;
+ List<TopicPartitionInfo> partitionList = getPartitionList(topicName);
+
+ if (partitionList != null) {
+ ret = partitionList.size();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== KafkaUtils.getPartitionCount returning for topic {} with count {}", topicName, ret);
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("<== KafkaUtils.getPartitionCount returning for topic {} with count {}", topicName, partitionCount);
+ return ret;
+ }
+
+ public Integer getReplicationFactor(String topicName) throws ExecutionException, InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> KafkaUtils.getReplicationFactor({})", topicName);
+ }
+
+ Integer ret = null;
+ List<TopicPartitionInfo> partitionList = getPartitionList(topicName);
+
+ if (partitionList != null) {
+ ret = partitionList.stream().mapToInt(x -> x.replicas().size()).max().getAsInt();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== KafkaUtils.getReplicationFactor returning for topic {} with replicationFactor {}", topicName, ret);
+ }
+
+ return ret;
+ }
+
+ private List<TopicPartitionInfo> getPartitionList(String topicName) throws ExecutionException, InterruptedException {
+ List<TopicPartitionInfo> ret = null;
+ DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topicName));
+
+ if (describeTopicsResult != null) {
+ Map<String, KafkaFuture<TopicDescription>> futureMap = describeTopicsResult.values();
+
+ for (Map.Entry<String, KafkaFuture<TopicDescription>> futureEntry : futureMap.entrySet()) {
+ KafkaFuture<TopicDescription> topicDescriptionFuture = futureEntry.getValue();
+ TopicDescription topicDescription = topicDescriptionFuture.get();
+
+ ret = topicDescription.partitions();
+ }
}
- return partitionCount;
+ return ret;
}
public void close() {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils.close()");
}
- if(adminClient != null) {
+ if (adminClient != null) {
adminClient.close();
}
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("<== KafkaUtils.close()");
}
}
public static void setKafkaJAASProperties(Configuration configuration, Properties kafkaProperties) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("==> KafkaUtils.setKafkaJAASProperties()");
}
- if(kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
+ if (kafkaProperties.containsKey(KAFKA_SASL_JAAS_CONFIG_PROPERTY)) {
LOG.debug("JAAS config is already set, returning");
+
return;
}
Properties jaasConfig = ApplicationProperties.getSubsetAsProperties(configuration, JAAS_CONFIG_PREFIX_PARAM);
// JAAS Configuration is present then update set those properties in sasl.jaas.config
- if(jaasConfig != null && !jaasConfig.isEmpty()) {
+
+ if (jaasConfig != null && !jaasConfig.isEmpty()) {
String jaasClientName = JAAS_DEFAULT_CLIENT_NAME;
// Required for backward compatability for Hive CLI
if (!isLoginKeytabBased() && isLoginTicketBased()) {
- LOG.debug("Checking if ticketBased-KafkaClient is set");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking if ticketBased-KafkaClient is set");
+ }
+
// if ticketBased-KafkaClient property is not specified then use the default client name
String ticketBasedConfigPrefix = JAAS_CONFIG_PREFIX_PARAM + "." + JAAS_TICKET_BASED_CLIENT_NAME;
Configuration ticketBasedConfig = configuration.subset(ticketBasedConfigPrefix);
- if(ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
- LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
+ if (ticketBasedConfig != null && !ticketBasedConfig.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ticketBased-KafkaClient JAAS configuration is set, using it");
+ }
jaasClientName = JAAS_TICKET_BASED_CLIENT_NAME;
} else {
@@ -193,24 +236,31 @@ public class KafkaUtils implements AutoCloseable {
if (loginModuleName == null) {
LOG.error("Unable to add JAAS configuration for client [{}] as it is missing param [{}]. Skipping JAAS config for [{}]", jaasClientName, keyParam, jaasClientName);
+
return;
}
keyParam = keyPrefix + JAAS_CONFIG_LOGIN_MODULE_CONTROL_FLAG_PARAM;
+
String controlFlag = jaasConfig.getProperty(keyParam);
- if(StringUtils.isEmpty(controlFlag)) {
+ if (StringUtils.isEmpty(controlFlag)) {
String validValues = JAAS_VALID_LOGIN_MODULE_CONTROL_FLAG_OPTIONS;
+
controlFlag = JAAS_DEFAULT_LOGIN_MODULE_CONTROL_FLAG;
+
LOG.warn("Unknown JAAS configuration value for ({}) = [{}], valid value are [{}] using the default value, REQUIRED", keyParam, controlFlag, validValues);
}
- String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
- String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
- int optionPrefixLen = optionPrefix.length();
+
+ String optionPrefix = keyPrefix + JAAS_CONFIG_LOGIN_OPTIONS_PREFIX + ".";
+ String principalOptionKey = optionPrefix + JAAS_PRINCIPAL_PROP;
+ int optionPrefixLen = optionPrefix.length();
StringBuffer optionStringBuffer = new StringBuffer();
+
for (String key : jaasConfig.stringPropertyNames()) {
if (key.startsWith(optionPrefix)) {
String optionVal = jaasConfig.getProperty(key);
+
if (optionVal != null) {
optionVal = optionVal.trim();
@@ -223,16 +273,18 @@ public class KafkaUtils implements AutoCloseable {
}
optionVal = surroundWithQuotes(optionVal);
+
optionStringBuffer.append(String.format(" %s=%s", key.substring(optionPrefixLen), optionVal));
}
}
}
String newJaasProperty = String.format("%s %s %s ;", loginModuleName.trim(), controlFlag, optionStringBuffer.toString());
+
kafkaProperties.put(KAFKA_SASL_JAAS_CONFIG_PROPERTY, newJaasProperty);
}
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("<== KafkaUtils.setKafkaJAASProperties()");
}
}
@@ -262,17 +314,19 @@ public class KafkaUtils implements AutoCloseable {
}
static String surroundWithQuotes(String optionVal) {
- if(StringUtils.isEmpty(optionVal)) {
+ if (StringUtils.isEmpty(optionVal)) {
return optionVal;
}
+
String ret = optionVal;
// For property values which have special chars like "@" or "/", we need to enclose it in
// double quotes, so that Kafka can parse it
// If the property is already enclosed in double quotes, then do nothing.
- if(optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') {
+ if (optionVal.indexOf(0) != '"' && optionVal.indexOf(optionVal.length() - 1) != '"') {
// If the string as special characters like except _,-
final String SPECIAL_CHAR_LIST = "/!@#%^&*";
+
if (StringUtils.containsAny(optionVal, SPECIAL_CHAR_LIST)) {
ret = String.format("\"%s\"", optionVal);
}
@@ -280,5 +334,4 @@ public class KafkaUtils implements AutoCloseable {
return ret;
}
-
-}
+}
\ No newline at end of file