You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by si...@apache.org on 2021/11/15 19:22:04 UTC
[atlas] branch master updated: ATLAS-4246: Make Kafka Interface aware of Kafka Schema Registry
This is an automated email from the ASF dual-hosted git repository.
sidmishra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new b4ee37b ATLAS-4246: Make Kafka Interface aware of Kafka Schema Registry
b4ee37b is described below
commit b4ee37b6d850baa0803deb39652e6f4ff6d665e7
Author: Aileen Toleikis <ai...@hpe.com>
AuthorDate: Mon Nov 15 11:18:49 2021 -0800
ATLAS-4246: Make Kafka Interface aware of Kafka Schema Registry
Signed-off-by: Sidharth Mishra <si...@apache.org>
---
addons/kafka-bridge/pom.xml | 17 ++
.../org/apache/atlas/kafka/bridge/KafkaBridge.java | 294 +++++++++++++++++++--
.../kafka/bridge/SchemaRegistryConnector.java | 138 ++++++++++
.../apache/atlas/kafka/model/KafkaDataTypes.java | 4 +-
.../apache/atlas/kafka/bridge/KafkaBridgeTest.java | 208 ++++++++++++++-
pom.xml | 2 +
6 files changed, 643 insertions(+), 20 deletions(-)
diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml
index 30fb53d..d3d6a12 100644
--- a/addons/kafka-bridge/pom.xml
+++ b/addons/kafka-bridge/pom.xml
@@ -137,6 +137,23 @@
<version>${hadoop.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpcomponents-httpclient.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>${json-simple.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index f954824..8be2fca 100644
--- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -39,6 +39,10 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,12 +53,15 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
+import org.apache.avro.Schema;
+import java.io.IOException;
public class KafkaBridge {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class);
+ private static final String KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE= System.getenv("KAFKA_SCHEMA_REGISTRY");
+ public static String KAFKA_SCHEMA_REGISTRY_HOSTNAME = "localhost";
private static final int EXIT_CODE_SUCCESS = 0;
private static final int EXIT_CODE_FAILED = 1;
private static final String ATLAS_ENDPOINT = "atlas.rest.address";
@@ -70,18 +77,31 @@ public class KafkaBridge {
private static final String URI = "uri";
private static final String CLUSTERNAME = "clusterName";
private static final String TOPIC = "topic";
- private static final String FORMAT_KAFKA_TOPIC_QUALIFIED_NAME = "%s@%s";
+ private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
+ private static final String TYPE = "type";
+ private static final String NAMESPACE = "namespace";
+ private static final String FIELDS = "fields";
+ private static final String AVRO_SCHEMA = "avroSchema";
+ private static final String SCHEMA_VERSION_ID = "versionId";
+
+ private static final String FORMAT_KAKFA_SCHEMA_QUALIFIED_NAME = "%s@%s@%s";
+ private static final String FORMAT_KAKFA_FIELD_QUALIFIED_NAME = "%s@%s@%s@%s";
private final List<String> availableTopics;
private final String metadataNamespace;
private final AtlasClientV2 atlasClientV2;
private final KafkaUtils kafkaUtils;
-
+ private final CloseableHttpClient httpClient;
public static void main(String[] args) {
int exitCode = EXIT_CODE_FAILED;
AtlasClientV2 atlasClientV2 = null;
KafkaUtils kafkaUtils = null;
+ CloseableHttpClient httpClient = null;
+
+ System.out.print("\n################################\n");
+ System.out.print("# Custom Kafka bridge #\n");
+ System.out.print("################################\n\n");
try {
Options options = new Options();
@@ -114,6 +134,10 @@ public class KafkaBridge {
KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2, kafkaUtils);
+ if(!KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE.isEmpty()){
+ KAFKA_SCHEMA_REGISTRY_HOSTNAME = KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE;
+ }
+
if (StringUtils.isNotEmpty(fileToImport)) {
File f = new File(fileToImport);
@@ -154,8 +178,18 @@ public class KafkaBridge {
if (kafkaUtils != null) {
kafkaUtils.close();
}
+
+ if (httpClient != null) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ LOG.error("Could not close http client: ", e);
+ }
+ }
}
+ System.out.print("\n\n");
+
System.exit(exitCode);
}
@@ -164,6 +198,7 @@ public class KafkaBridge {
this.metadataNamespace = getMetadataNamespace(atlasConf);
this.kafkaUtils = kafkaUtils;
this.availableTopics = this.kafkaUtils.listAllTopics();
+ this.httpClient = HttpClientBuilder.create().build();
}
private String getMetadataNamespace(Configuration config) {
@@ -199,7 +234,9 @@ public class KafkaBridge {
@VisibleForTesting
AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception {
String topicQualifiedName = getTopicQualifiedName(metadataNamespace, topic);
- AtlasEntityWithExtInfo topicEntity = findTopicEntityInAtlas(topicQualifiedName);
+ AtlasEntityWithExtInfo topicEntity = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(),topicQualifiedName);
+
+ System.out.print("\n"); // add a new line for each topic
if (topicEntity == null) {
System.out.println("Adding Kafka topic " + topic);
@@ -223,8 +260,63 @@ public class KafkaBridge {
}
@VisibleForTesting
+ AtlasEntityWithExtInfo createOrUpdateSchema(String schema, String schemaName, String namespace, int version) throws Exception {
+ String schemaQualifiedName = getSchemaQualifiedName(metadataNamespace, schemaName + "-value", "v" + version);
+ AtlasEntityWithExtInfo schemaEntity = findEntityInAtlas(KafkaDataTypes.AVRO_SCHEMA.getName(), schemaQualifiedName);
+
+ if (schemaEntity == null) {
+ System.out.println("---Adding Kafka schema " + schema);
+ LOG.info("Importing Kafka schema: {}", schemaQualifiedName);
+
+ AtlasEntity entity = getSchemaEntity(schema, schemaName, namespace, version, null);
+
+ schemaEntity = createEntityInAtlas(new AtlasEntityWithExtInfo(entity));
+ } else {
+ System.out.println("---Updating Kafka schema " + schema);
+ LOG.info("Kafka schema {} already exists in Atlas. Updating it..", schemaQualifiedName);
+
+ AtlasEntity entity = getSchemaEntity(schema, schemaName, namespace, version, schemaEntity.getEntity());
+
+ schemaEntity.setEntity(entity);
+
+ schemaEntity = updateEntityInAtlas(schemaEntity);
+ }
+
+ return schemaEntity;
+ }
+
+ @VisibleForTesting
+ AtlasEntityWithExtInfo createOrUpdateField(Schema.Field field, String schemaName, String namespace, int version, String fullname) throws Exception {
+ fullname = concatFullname(field.name(), fullname, "");
+ String fieldQualifiedName = getFieldQualifiedName(metadataNamespace, fullname, schemaName + "-value", "v" + version);
+ AtlasEntityWithExtInfo fieldEntity = findEntityInAtlas(KafkaDataTypes.AVRO_FIELD.getName(), fieldQualifiedName);
+
+ if (fieldEntity == null) {
+ System.out.println("---Adding Avro field " + fullname);
+ LOG.info("Importing Avro field: {}", fieldQualifiedName);
+
+ AtlasEntity entity = getFieldEntity(field, schemaName, namespace, version ,null, fullname);
+
+ fieldEntity = createEntityInAtlas(new AtlasEntityWithExtInfo(entity));
+ } else {
+ System.out.println("---Updating Avro field " + fullname);
+ LOG.info("Avro field {} already exists in Atlas. Updating it..", fieldQualifiedName);
+
+ AtlasEntity entity = getFieldEntity(field, schemaName, namespace, version, fieldEntity.getEntity(), fullname);
+
+ fieldEntity.setEntity(entity);
+
+ fieldEntity = updateEntityInAtlas(fieldEntity);
+ }
+
+ return fieldEntity;
+ }
+
+
+ @VisibleForTesting
AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) throws Exception {
final AtlasEntity ret;
+ List<AtlasEntity> createdSchemas;
if (topicEntity == null) {
ret = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
@@ -250,33 +342,140 @@ public class KafkaBridge {
throw new Exception("Error while getting partition data for topic :" + topic, e);
}
+ createdSchemas = findOrCreateAtlasSchema(topic);
+
+ if(createdSchemas.size() > 0) {
+ ret.setAttribute(AVRO_SCHEMA, createdSchemas);
+ ret.setRelationshipAttribute(AVRO_SCHEMA, createdSchemas);
+ }
+
return ret;
}
@VisibleForTesting
- static String getTopicQualifiedName(String metadataNamespace, String topic) {
- return String.format(FORMAT_KAFKA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), metadataNamespace);
+ AtlasEntity getSchemaEntity(String schema, String schemaName, String namespace, int version, AtlasEntity schemaEntity) throws Exception {
+ final AtlasEntity ret;
+ List<AtlasEntity> createdFields = new ArrayList<>();
+
+
+ if (schemaEntity == null) {
+ ret = new AtlasEntity(KafkaDataTypes.AVRO_SCHEMA.getName());
+ } else {
+ ret = schemaEntity;
+ }
+
+ Schema parsedSchema = new Schema.Parser().parse(schema);
+
+ String qualifiedName = getSchemaQualifiedName(metadataNamespace, schemaName + "-value", "v" + version);
+
+ if (namespace == null) {
+ namespace = (parsedSchema.getNamespace() != null) ? parsedSchema.getNamespace() : KAFKA_METADATA_NAMESPACE;
+ }
+
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+ ret.setAttribute(TYPE, parsedSchema.getType());
+ ret.setAttribute(NAMESPACE, namespace);
+ ret.setAttribute(NAME,parsedSchema.getName() + "(v" + version + ")");
+ ret.setAttribute(SCHEMA_VERSION_ID, version);
+
+ createdFields = createNestedFields(parsedSchema, schemaName, namespace, version, "");
+
+ if(createdFields.size() > 0) {
+ ret.setRelationshipAttribute(FIELDS, createdFields);
+ }
+
+ return ret;
}
- private AtlasEntityWithExtInfo findTopicEntityInAtlas(String topicQualifiedName) {
- AtlasEntityWithExtInfo ret = null;
+ List<AtlasEntity> createNestedFields(Schema parsedSchema, String schemaName, String namespace, int version, String fullname) throws Exception {
+ List<AtlasEntity> entityArray = new ArrayList<>();
+ AtlasEntityWithExtInfo fieldInAtlas;
+ JSONParser parser = new JSONParser();
- try {
- ret = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(), topicQualifiedName);
+ for (Schema.Field field:parsedSchema.getFields()) {
+
+ if(field.schema().getType() == Schema.Type.ARRAY){
+ System.out.println("ARRAY DETECTED");
+ String subfields = ((JSONObject) parser.parse(field.schema().toString())).get("items").toString();
+ Schema parsedSubSchema = new Schema.Parser().parse(subfields);
+
+ fullname = concatFullname(field.name(), fullname, parsedSubSchema.getName());
+
+ entityArray.addAll(createNestedFields(parsedSubSchema, schemaName, namespace, version, fullname));
+ }
+
+ else if(field.schema().getType() == Schema.Type.RECORD && !schemaName.equals(field.name())) {
+ System.out.println("NESTED RECORD DETECTED");
+ fullname = concatFullname(field.name(), fullname, "");
+ entityArray.addAll(createNestedFields(field.schema(), schemaName, namespace, version, fullname));
+ }
+
+ else{
+ fieldInAtlas = createOrUpdateField(field, schemaName, namespace, version, fullname);
- clearRelationshipAttributes(ret);
- } catch (Exception e) {
- ret = null; // entity doesn't exist in Atlas
+ entityArray.add(fieldInAtlas.getEntity());
+ }
+ }
+ entityArray.sort((o1, o2) -> {
+ if (o1.getAttribute(NAME) != null && o2.getAttribute(NAME) != null) {
+ String str1 = o1.getAttribute(NAME).toString();
+ String str2 = o2.getAttribute(NAME).toString();
+
+ return str1.compareTo(str2);
+ } else {
+ return 0;
+ }
+ });
+
+ return entityArray;
+ }
+
+ @VisibleForTesting
+ AtlasEntity getFieldEntity(Schema.Field field, String schemaName, String namespace, int version, AtlasEntity fieldEntity, String fullname) throws Exception {
+ AtlasEntity ret;
+
+ if (fieldEntity == null) {
+ ret = new AtlasEntity(KafkaDataTypes.AVRO_FIELD.getName());
+ } else {
+ ret = fieldEntity;
}
+ String qualifiedName = getFieldQualifiedName(metadataNamespace, fullname, schemaName + "-value", "v" + version);
+
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+ ret.setAttribute(NAME,fullname + "(v" + version + ")");
+ //ret.setAttribute(field.schema().getType()); --> does not work, since type expects array<avro_type>. Instead setting Description
+ ret.setAttribute(DESCRIPTION_ATTR, field.schema().getType());
return ret;
}
@VisibleForTesting
+ static String getTopicQualifiedName(String metadataNamespace, String topic) {
+ return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), metadataNamespace);
+ }
+
+ @VisibleForTesting
+ static String getSchemaQualifiedName(String metadataNamespace, String schema, String version) {
+ return String.format(FORMAT_KAKFA_SCHEMA_QUALIFIED_NAME, schema.toLowerCase(), version, metadataNamespace);
+ }
+
+ @VisibleForTesting
+ static String getFieldQualifiedName(String metadataNamespace, String field, String schemaName, String version) {
+ return String.format(FORMAT_KAKFA_FIELD_QUALIFIED_NAME , field.toLowerCase(), schemaName.toLowerCase(), version, metadataNamespace);
+ }
+
+ @VisibleForTesting
AtlasEntityWithExtInfo findEntityInAtlas(String typeName, String qualifiedName) throws Exception {
- Map<String, String> attributes = Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+ AtlasEntityWithExtInfo ret = null;
+
+ try {
+ ret = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
+ }
+ catch (Exception e){
+ LOG.info("Exception on finding Atlas Entity: {}", e);
+ }
- return atlasClientV2.getEntityByAttribute(typeName, attributes);
+ return ret;
}
@VisibleForTesting
@@ -284,7 +483,6 @@ public class KafkaBridge {
AtlasEntityWithExtInfo ret = null;
EntityMutationResponse response = atlasClientV2.createEntity(entity);
List<AtlasEntityHeader> entities = response.getCreatedEntities();
-
if (CollectionUtils.isNotEmpty(entities)) {
AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid());
@@ -358,4 +556,66 @@ public class KafkaBridge {
entity.getRelationshipAttributes().clear();
}
}
-}
+
+ private List<AtlasEntity> findOrCreateAtlasSchema(String schemaName) throws Exception {
+ List<AtlasEntity> entities = new ArrayList<>();
+ // Handling Schemas
+ ArrayList<Integer> versions = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(httpClient,schemaName + "-value");
+
+ for (int version:versions) {
+ String kafkaSchema = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(httpClient, schemaName + "-value", version);
+
+ if(kafkaSchema != null) {
+ // Schema exists in Kafka Schema Registry
+ System.out.println("---Found Schema " + schemaName + "-value in Kafka Schema Registry with Version " + version);
+ LOG.info("Found Schema {}-value in Kafka Schema Registry with Version {}", schemaName, version);
+
+ AtlasEntityWithExtInfo atlasSchemaEntity = findEntityInAtlas(KafkaDataTypes.AVRO_SCHEMA.getName(), getSchemaQualifiedName(metadataNamespace, schemaName + "-value", "v" + version));
+
+ if(atlasSchemaEntity != null) {
+ // Schema exists in Kafka Schema Registry AND in Atlas
+
+ System.out.println("---Found Entity avro_schema " + schemaName + " in Atlas");
+ LOG.info("Found Entity avro_schema {} in Atlas", schemaName);
+
+ AtlasEntityWithExtInfo createdSchema = createOrUpdateSchema(kafkaSchema, schemaName, null, version);
+
+ entities.add(createdSchema.getEntity());
+ }
+ else {
+ // Schema exists in Kafka Schema Registry but NOT in Atlas
+ System.out.println("---NOT Found Entity avro_schema " + schemaName + " in Atlas");
+ LOG.info("NOT Found Entity avro_schema {} in Atlas", schemaName);
+
+ AtlasEntityWithExtInfo createdSchema = createOrUpdateSchema(kafkaSchema, schemaName, null, version);
+
+ entities.add(createdSchema.getEntity());
+ }
+ }
+ }
+
+ return entities;
+ }
+
+ private String concatFullname(String fieldName,String fullname, String subSchemaName){
+ if(fullname.isEmpty()){
+ if(subSchemaName.isEmpty()) {
+ fullname = fieldName;
+ }
+ else {
+ fullname = fieldName + "." + subSchemaName;
+ }
+
+ }
+ else{
+ if(subSchemaName.isEmpty()) {
+ fullname = fullname + "." + fieldName;
+ }
+ else {
+ fullname = fullname + "." + subSchemaName + "." + fieldName;
+ }
+ }
+
+ return fullname;
+ }
+}
\ No newline at end of file
diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java
new file mode 100644
index 0000000..d5d85d6
--- /dev/null
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.kafka.bridge;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+
+public class SchemaRegistryConnector {
+ private static final String SCHEMA_KEY = "schema";
+ private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryConnector.class);
+
+ static ArrayList<Integer> getVersionsKafkaSchemaRegistry(CloseableHttpClient httpClient, String subject) throws IOException {
+ ArrayList<Integer> list = new ArrayList<>();
+ JSONParser parser = new JSONParser();
+
+ HttpGet getRequest = new HttpGet("http://" + KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME + "/subjects/" + subject + "/versions/");
+ getRequest.addHeader("accept", "application/json");
+ getRequest.addHeader("Content-Type", "application/vnd.schemaregistry.v1+json");
+
+ try {
+ CloseableHttpResponse response = httpClient.execute(getRequest);
+
+ if (response.getStatusLine().getStatusCode() == 200) {
+ //found corresponding Schema version in Registry
+ try {
+ BufferedReader br = new BufferedReader(
+ new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8));
+ JSONArray output = (JSONArray) parser.parse(br.readLine());
+ int len = output.size();
+ for (int i = 0; i < len; i++) {
+ list.add(((Long) output.get(i)).intValue());
+ }
+
+ System.out.println("---Found following versions to schema: " + subject + " Versions: " + list.toString());
+ LOG.info("Found following versions to schema: {} Versions: {}", subject, list.toString());
+
+ EntityUtils.consumeQuietly(response.getEntity());
+ response.close(); // close response
+ return list;
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("---Error reading versions to schema: " + subject + " in Kafka");
+ LOG.error("Error reading versions to schema: " + subject + " in Kafka: ", e.getMessage());
+ }
+
+ } else if (response.getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
+ // did not find any schema to the topic
+ System.out.println("---No schema versions found for schema: " + subject + " in Schema Registry");
+ LOG.info("No schema versions found for schema: {} in Schema Registry", subject);
+ } else {
+ // no connection to schema registry
+ System.out.println("---Cannot connect to schema registry");
+ LOG.warn("Cannot connect to schema registry");
+ }
+
+ EntityUtils.consumeQuietly(response.getEntity());
+ response.close();
+ }
+ catch(Exception e) {
+ System.out.println("---Error getting versions to schema: " + subject + " from Kafka");
+ LOG.error("Error getting versions to schema: " + subject + " from Kafka: ", e);
+ }
+ return list;
+ }
+
+ static String getSchemaFromKafkaSchemaRegistry(CloseableHttpClient httpClient, String subject, int version) throws IOException {
+ HttpGet getRequest = new HttpGet("http://" + KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME + "/subjects/" + subject + "/versions/" + version);
+ getRequest.addHeader("accept", "application/json");
+ getRequest.addHeader("Content-Type", "application/vnd.schemaregistry.v1+json");
+ JSONParser parser = new JSONParser();
+
+ CloseableHttpResponse response = httpClient.execute(getRequest);
+
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK){
+ //found corresponding Schema in Registry
+ try {
+ BufferedReader br = new BufferedReader(
+ new InputStreamReader((response.getEntity().getContent()), StandardCharsets.UTF_8));
+ JSONObject output = (JSONObject) parser.parse(br.readLine());
+
+ EntityUtils.consumeQuietly(response.getEntity());
+ response.close();
+ return output.get(SCHEMA_KEY).toString();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("---Error reading versions to schema: " + subject + " in Kafka");
+ LOG.error("Error reading versions to schema: " + subject + " in Kafka: ", e);
+ }
+
+ }
+
+ else if (response.getStatusLine().getStatusCode() == 404) {
+ // did not find any schema to the topic
+ System.out.println("---Cannot find the corresponding schema to: " + subject + "in Kafka");
+ LOG.info("Cannot find the corresponding schema to: {} in Kafka", subject);
+ }
+
+ else {
+ // any other error when connecting to schema registry
+ System.out.println("---Cannot connect to schema registry at: " + KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME);
+ LOG.warn("Cannot connect to schema registry at: {}", KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME);
+ }
+
+ EntityUtils.consumeQuietly(response.getEntity());
+ response.close();
+ return null;
+ }
+}
diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/model/KafkaDataTypes.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/model/KafkaDataTypes.java
index 0f81b4c..200a759 100644
--- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/model/KafkaDataTypes.java
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/model/KafkaDataTypes.java
@@ -23,7 +23,9 @@ package org.apache.atlas.kafka.model;
*/
public enum KafkaDataTypes {
// Classes
- KAFKA_TOPIC;
+ KAFKA_TOPIC,
+ AVRO_SCHEMA,
+ AVRO_FIELD;
public String getName() {
return name().toLowerCase();
diff --git a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
index f86ceb5..adbaddd 100644
--- a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
+++ b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
@@ -25,11 +25,22 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.utils.KafkaUtils;
+import org.apache.avro.Schema;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.mockito.ArgumentCaptor;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
@@ -43,10 +54,21 @@ import static org.testng.Assert.assertEquals;
public class KafkaBridgeTest {
private static final String TEST_TOPIC_NAME = "test_topic";
- public static final AtlasEntity.AtlasEntityWithExtInfo TOPIC_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo(
- getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
private static final String CLUSTER_NAME = "primary";
private static final String TOPIC_QUALIFIED_NAME = KafkaBridge.getTopicQualifiedName(CLUSTER_NAME, TEST_TOPIC_NAME);
+ private static final String TEST_SCHEMA_NAME = "test_topic-value";
+ private static final int TEST_SCHEMA_VERSION = 1;
+ private static final String TEST_NAMESPACE = "test_namespace";
+ private static final ArrayList<Integer> TEST_SCHEMA_VERSION_LIST = new ArrayList<>(Arrays.asList(1, 2, 3, 4));
+ private static final String TEST_SCHEMA = "{\"name\":\"test\",\"namespace\":\"testing\",\"type\":\"record\",\"fields\":[{\"name\":\"Field1\",\"type\":\"string\"},{\"name\":\"Field2\",\"type\":\"int\"}]}";
+ private static final Schema.Field TEST_FIELD_NAME = new Schema.Parser().parse(TEST_SCHEMA).getField("Field1");
+ private static final String TEST_FIELD_FULLNAME = "Field1";
+ public static final AtlasEntity.AtlasEntityWithExtInfo TOPIC_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo(
+ getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
+ public static final AtlasEntity.AtlasEntityWithExtInfo SCHEMA_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo(
+ getSchemaEntityWithGuid("2a9894bb-e535-4aa1-a00b-a7d21ac20738"));
+ public static final AtlasEntity.AtlasEntityWithExtInfo FIELD_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo(
+ getFieldEntityWithGuid("41d1011f-d428-4f5a-9578-0a0a0439147f"));
@BeforeMethod
public void initializeMocks() {
@@ -59,6 +81,18 @@ public class KafkaBridgeTest {
return ret;
}
+ private static AtlasEntity getSchemaEntityWithGuid(String guid) {
+ AtlasEntity ret = new AtlasEntity(KafkaDataTypes.AVRO_SCHEMA.getName());
+ ret.setGuid(guid);
+ return ret;
+ }
+
+ private static AtlasEntity getFieldEntityWithGuid(String guid) {
+ AtlasEntity ret = new AtlasEntity(KafkaDataTypes.AVRO_FIELD.getName());
+ ret.setGuid(guid);
+ return ret;
+ }
+
@Test
public void testImportTopic() throws Exception {
KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
@@ -115,6 +149,58 @@ public class KafkaBridgeTest {
}
@Test
+ public void testCreateSchema() throws Exception {
+ KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+ when(mockKafkaUtils.listAllTopics())
+ .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+ when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+ .thenReturn(3);
+
+ EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class);
+ AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+ when(mockAtlasEntityHeader.getGuid()).thenReturn(SCHEMA_WITH_EXT_INFO.getEntity().getGuid());
+ when(mockCreateResponse.getCreatedEntities())
+ .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+ AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+ when(mockAtlasClientV2.createEntity(any()))
+ .thenReturn(mockCreateResponse);
+ when(mockAtlasClientV2.getEntityByGuid(SCHEMA_WITH_EXT_INFO.getEntity().getGuid()))
+ .thenReturn(SCHEMA_WITH_EXT_INFO);
+
+ KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+ AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateSchema(TEST_SCHEMA, TEST_SCHEMA_NAME, TEST_NAMESPACE, TEST_SCHEMA_VERSION);
+
+ assertEquals(SCHEMA_WITH_EXT_INFO, ret);
+ }
+
+ @Test
+ public void testCreateField() throws Exception {
+ KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+ when(mockKafkaUtils.listAllTopics())
+ .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+ when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+ .thenReturn(3);
+
+ EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class);
+ AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+ when(mockAtlasEntityHeader.getGuid()).thenReturn(FIELD_WITH_EXT_INFO.getEntity().getGuid());
+ when(mockCreateResponse.getCreatedEntities())
+ .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+ AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+ when(mockAtlasClientV2.createEntity(any()))
+ .thenReturn(mockCreateResponse);
+ when(mockAtlasClientV2.getEntityByGuid(FIELD_WITH_EXT_INFO.getEntity().getGuid()))
+ .thenReturn(FIELD_WITH_EXT_INFO);
+
+ KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+ AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateField(TEST_FIELD_NAME, TEST_SCHEMA_NAME, TEST_NAMESPACE, TEST_SCHEMA_VERSION, TEST_FIELD_FULLNAME);
+
+ assertEquals(FIELD_WITH_EXT_INFO, ret);
+ }
+
+ @Test
public void testUpdateTopic() throws Exception {
KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
when(mockKafkaUtils.listAllTopics())
@@ -141,4 +227,122 @@ public class KafkaBridgeTest {
assertEquals(TOPIC_WITH_EXT_INFO, ret);
}
+
+ @Test
+ public void testUpdateSchema() throws Exception {
+ KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+ when(mockKafkaUtils.listAllTopics())
+ .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+ when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+ .thenReturn(3);
+
+ EntityMutationResponse mockUpdateResponseSchema = mock(EntityMutationResponse.class);
+ AtlasEntityHeader mockAtlasEntityHeaderSchema = mock(AtlasEntityHeader.class);
+ when(mockAtlasEntityHeaderSchema.getGuid()).thenReturn(SCHEMA_WITH_EXT_INFO.getEntity().getGuid());
+ when(mockUpdateResponseSchema.getUpdatedEntities())
+ .thenReturn(Collections.singletonList(mockAtlasEntityHeaderSchema));
+
+ AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+ when(mockAtlasClientV2.getEntityByAttribute(eq(KafkaDataTypes.AVRO_SCHEMA.getName()), any()))
+ .thenReturn(SCHEMA_WITH_EXT_INFO);
+ when(mockAtlasClientV2.updateEntity(SCHEMA_WITH_EXT_INFO))
+ .thenReturn(mockUpdateResponseSchema);
+ when(mockAtlasClientV2.getEntityByGuid(SCHEMA_WITH_EXT_INFO.getEntity().getGuid()))
+ .thenReturn(SCHEMA_WITH_EXT_INFO);
+
+ EntityMutationResponse mockUpdateResponseField = mock(EntityMutationResponse.class);
+ AtlasEntityHeader mockAtlasEntityHeaderField = mock(AtlasEntityHeader.class);
+ when(mockAtlasEntityHeaderField.getGuid()).thenReturn(FIELD_WITH_EXT_INFO.getEntity().getGuid());
+ when(mockUpdateResponseField.getUpdatedEntities())
+ .thenReturn(Collections.singletonList(mockAtlasEntityHeaderField));
+
+ when(mockAtlasClientV2.getEntityByAttribute(eq(KafkaDataTypes.AVRO_FIELD.getName()), any()))
+ .thenReturn(FIELD_WITH_EXT_INFO);
+ when(mockAtlasClientV2.updateEntity(FIELD_WITH_EXT_INFO))
+ .thenReturn(mockUpdateResponseField);
+ when(mockAtlasClientV2.getEntityByGuid(FIELD_WITH_EXT_INFO.getEntity().getGuid()))
+ .thenReturn(FIELD_WITH_EXT_INFO);
+
+ KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+ AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateSchema(TEST_SCHEMA, TEST_SCHEMA_NAME, TEST_NAMESPACE, TEST_SCHEMA_VERSION);
+
+ System.out.println(SCHEMA_WITH_EXT_INFO);
+ System.out.println(ret);
+ assertEquals(SCHEMA_WITH_EXT_INFO, ret);
+ }
+
+ @Test
+ public void testUpdateField() throws Exception {
+ KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+ when(mockKafkaUtils.listAllTopics())
+ .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+ when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+ .thenReturn(3);
+
+ EntityMutationResponse mockUpdateResponse = mock(EntityMutationResponse.class);
+ AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+ when(mockAtlasEntityHeader.getGuid()).thenReturn(FIELD_WITH_EXT_INFO.getEntity().getGuid());
+ when(mockUpdateResponse.getUpdatedEntities())
+ .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+ AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+ when(mockAtlasClientV2.getEntityByAttribute(eq(KafkaDataTypes.AVRO_FIELD.getName()), any()))
+ .thenReturn(FIELD_WITH_EXT_INFO);
+ when(mockAtlasClientV2.updateEntity(any()))
+ .thenReturn(mockUpdateResponse);
+ when(mockAtlasClientV2.getEntityByGuid(FIELD_WITH_EXT_INFO.getEntity().getGuid()))
+ .thenReturn(FIELD_WITH_EXT_INFO);
+
+ KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+ AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateField(TEST_FIELD_NAME, TEST_SCHEMA_NAME, TEST_NAMESPACE, TEST_SCHEMA_VERSION, TEST_FIELD_FULLNAME);
+
+ assertEquals(FIELD_WITH_EXT_INFO, ret);
+ }
+
+ @Test
+ public void testGetSchemas() throws Exception {
+ CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+ when(mockResponse.getStatusLine())
+ .thenReturn(mock(StatusLine.class));
+ when(mockResponse.getStatusLine().getStatusCode())
+ .thenReturn(HttpStatus.SC_OK);
+ when(mockResponse.getEntity())
+ .thenReturn(mock(HttpEntity.class));
+ when(mockResponse.getEntity().getContent())
+ .thenReturn(new ByteArrayInputStream(new String("{\"subject\":\"test-value\",\"version\":1,\"id\":1,\"schema\":"+ TEST_SCHEMA +"}").getBytes(StandardCharsets.UTF_8)));
+
+ CloseableHttpClient mockHttpClient = mock(CloseableHttpClient.class);
+ when(mockHttpClient.execute(any()))
+ .thenReturn(mockResponse);
+ when(mockHttpClient.getConnectionManager())
+ .thenReturn(mock(ClientConnectionManager.class));
+
+ String ret = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, TEST_SCHEMA_NAME,TEST_SCHEMA_VERSION);
+
+ assertEquals(TEST_SCHEMA, ret);
+ }
+
+ @Test
+ public void testGetSchemaVersions() throws Exception {
+ CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+ when(mockResponse.getStatusLine())
+ .thenReturn(mock(StatusLine.class));
+ when(mockResponse.getStatusLine().getStatusCode())
+ .thenReturn(HttpStatus.SC_OK);
+ when(mockResponse.getEntity())
+ .thenReturn(mock(HttpEntity.class));
+ when(mockResponse.getEntity().getContent())
+ .thenReturn(new ByteArrayInputStream(new String(TEST_SCHEMA_VERSION_LIST.toString()).getBytes(StandardCharsets.UTF_8)));
+
+ CloseableHttpClient mockHttpClient = mock(CloseableHttpClient.class);
+ when(mockHttpClient.execute(any()))
+ .thenReturn(mockResponse);
+ when(mockHttpClient.getConnectionManager())
+ .thenReturn(mock(ClientConnectionManager.class));
+
+ ArrayList<Integer> ret = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, TEST_SCHEMA_NAME);
+
+ assertEquals(TEST_SCHEMA_VERSION_LIST, ret);
+ }
+
}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 3c17648..86519da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -680,6 +680,7 @@
<aopalliance.version>1.0</aopalliance.version>
<aspectj.runtime.version>1.8.7</aspectj.runtime.version>
<atlas.surefire.options></atlas.surefire.options>
+ <avro.version>1.7.5</avro.version>
<calcite.version>1.16.0</calcite.version>
<checkstyle.failOnViolation>false</checkstyle.failOnViolation>
<codehaus.woodstox.stax2-api.version>3.1.4</codehaus.woodstox.stax2-api.version>
@@ -728,6 +729,7 @@
<jetty.version>9.4.31.v20200723</jetty.version>
<joda-time.version>2.10.6</joda-time.version>
<json.version>3.2.11</json.version>
+ <json-simple.version>1.1.1</json-simple.version>
<jsr.version>1.1</jsr.version>
<junit.version>4.13.1</junit.version>
<kafka.scala.binary.version>2.12</kafka.scala.binary.version>