You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by si...@apache.org on 2021/11/15 19:22:04 UTC

[atlas] branch master updated: ATLAS-4246: Make Kafka Interface aware of Kafka Schema Registry

This is an automated email from the ASF dual-hosted git repository.

sidmishra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new b4ee37b  ATLAS-4246: Make Kafka Interface aware of Kafka Schema Registry
b4ee37b is described below

commit b4ee37b6d850baa0803deb39652e6f4ff6d665e7
Author: Aileen Toleikis <ai...@hpe.com>
AuthorDate: Mon Nov 15 11:18:49 2021 -0800

    ATLAS-4246: Make Kafka Interface aware of Kafka Schema Registry
    
    Signed-off-by: Sidharth Mishra <si...@apache.org>
---
 addons/kafka-bridge/pom.xml                        |  17 ++
 .../org/apache/atlas/kafka/bridge/KafkaBridge.java | 294 +++++++++++++++++++--
 .../kafka/bridge/SchemaRegistryConnector.java      | 138 ++++++++++
 .../apache/atlas/kafka/model/KafkaDataTypes.java   |   4 +-
 .../apache/atlas/kafka/bridge/KafkaBridgeTest.java | 208 ++++++++++++++-
 pom.xml                                            |   2 +
 6 files changed, 643 insertions(+), 20 deletions(-)

diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml
index 30fb53d..d3d6a12 100644
--- a/addons/kafka-bridge/pom.xml
+++ b/addons/kafka-bridge/pom.xml
@@ -137,6 +137,23 @@
             <version>${hadoop.version}</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpcomponents-httpclient.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+            <version>${json-simple.version}</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
index f954824..8be2fca 100644
--- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/KafkaBridge.java
@@ -39,6 +39,10 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,12 +53,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
+import org.apache.avro.Schema;
+import java.io.IOException;
 
 public class KafkaBridge {
     private static final Logger LOG                               = LoggerFactory.getLogger(KafkaBridge.class);
+    private static final String KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE= System.getenv("KAFKA_SCHEMA_REGISTRY");
+    public static        String KAFKA_SCHEMA_REGISTRY_HOSTNAME    = "localhost";
     private static final int    EXIT_CODE_SUCCESS                 = 0;
     private static final int    EXIT_CODE_FAILED                  = 1;
     private static final String ATLAS_ENDPOINT                    = "atlas.rest.address";
@@ -70,18 +77,31 @@ public class KafkaBridge {
     private static final String URI                               = "uri";
     private static final String CLUSTERNAME                       = "clusterName";
     private static final String TOPIC                             = "topic";
-    private static final String FORMAT_KAFKA_TOPIC_QUALIFIED_NAME = "%s@%s";
+    private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
+    private static final String TYPE                              = "type";
+    private static final String NAMESPACE                         = "namespace";
+    private static final String FIELDS                            = "fields";
+    private static final String AVRO_SCHEMA                       = "avroSchema";
+    private static final String SCHEMA_VERSION_ID                 = "versionId";
+
+    private static final String FORMAT_KAKFA_SCHEMA_QUALIFIED_NAME      = "%s@%s@%s";
+    private static final String FORMAT_KAKFA_FIELD_QUALIFIED_NAME       = "%s@%s@%s@%s";
 
     private final List<String>  availableTopics;
     private final String        metadataNamespace;
     private final AtlasClientV2 atlasClientV2;
     private final KafkaUtils    kafkaUtils;
-
+    private final CloseableHttpClient httpClient;
 
     public static void main(String[] args) {
         int           exitCode      = EXIT_CODE_FAILED;
         AtlasClientV2 atlasClientV2 = null;
         KafkaUtils    kafkaUtils    = null;
+        CloseableHttpClient httpClient = null;
+
+        System.out.print("\n################################\n");
+        System.out.print("# Custom Kafka bridge #\n");
+        System.out.print("################################\n\n");
 
         try {
             Options options = new Options();
@@ -114,6 +134,10 @@ public class KafkaBridge {
 
             KafkaBridge importer = new KafkaBridge(atlasConf, atlasClientV2, kafkaUtils);
 
+            if(!KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE.isEmpty()){
+                KAFKA_SCHEMA_REGISTRY_HOSTNAME = KAFKA_SCHEMA_REGISTRY_ENV_VARIABLE;
+            }
+
             if (StringUtils.isNotEmpty(fileToImport)) {
                 File f = new File(fileToImport);
 
@@ -154,8 +178,18 @@ public class KafkaBridge {
             if (kafkaUtils != null) {
                 kafkaUtils.close();
             }
+
+            if (httpClient != null) {
+                try {
+                    httpClient.close();
+                } catch (IOException e) {
+                    LOG.error("Could not close http client: ", e);
+                }
+            }
         }
 
+        System.out.print("\n\n");
+
         System.exit(exitCode);
     }
 
@@ -164,6 +198,7 @@ public class KafkaBridge {
         this.metadataNamespace = getMetadataNamespace(atlasConf);
         this.kafkaUtils        = kafkaUtils;
         this.availableTopics   = this.kafkaUtils.listAllTopics();
+        this.httpClient        = HttpClientBuilder.create().build();
     }
 
     private String getMetadataNamespace(Configuration config) {
@@ -199,7 +234,9 @@ public class KafkaBridge {
     @VisibleForTesting
     AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception {
         String                 topicQualifiedName = getTopicQualifiedName(metadataNamespace, topic);
-        AtlasEntityWithExtInfo topicEntity        = findTopicEntityInAtlas(topicQualifiedName);
+        AtlasEntityWithExtInfo topicEntity        = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(),topicQualifiedName);
+
+        System.out.print("\n"); // add a new line for each topic
 
         if (topicEntity == null) {
             System.out.println("Adding Kafka topic " + topic);
@@ -223,8 +260,63 @@ public class KafkaBridge {
     }
 
     @VisibleForTesting
+    AtlasEntityWithExtInfo createOrUpdateSchema(String schema, String schemaName, String namespace, int version) throws Exception {
+        String                 schemaQualifiedName = getSchemaQualifiedName(metadataNamespace, schemaName + "-value", "v" + version);
+        AtlasEntityWithExtInfo schemaEntity        = findEntityInAtlas(KafkaDataTypes.AVRO_SCHEMA.getName(), schemaQualifiedName);
+
+        if (schemaEntity == null) {
+            System.out.println("---Adding Kafka schema " + schema);
+            LOG.info("Importing Kafka schema: {}", schemaQualifiedName);
+
+            AtlasEntity entity = getSchemaEntity(schema, schemaName, namespace, version, null);
+
+            schemaEntity = createEntityInAtlas(new AtlasEntityWithExtInfo(entity));
+        } else {
+            System.out.println("---Updating Kafka schema "  + schema);
+            LOG.info("Kafka schema {} already exists in Atlas. Updating it..", schemaQualifiedName);
+
+            AtlasEntity entity = getSchemaEntity(schema, schemaName, namespace, version, schemaEntity.getEntity());
+
+            schemaEntity.setEntity(entity);
+
+            schemaEntity = updateEntityInAtlas(schemaEntity);
+        }
+
+        return schemaEntity;
+    }
+
+    @VisibleForTesting
+    AtlasEntityWithExtInfo createOrUpdateField(Schema.Field field, String schemaName, String namespace, int version, String fullname) throws Exception {
+        fullname = concatFullname(field.name(), fullname, "");
+        String                 fieldQualifiedName = getFieldQualifiedName(metadataNamespace, fullname, schemaName + "-value", "v" + version);
+        AtlasEntityWithExtInfo fieldEntity        = findEntityInAtlas(KafkaDataTypes.AVRO_FIELD.getName(), fieldQualifiedName);
+
+        if (fieldEntity == null) {
+            System.out.println("---Adding Avro field " + fullname);
+            LOG.info("Importing Avro field: {}", fieldQualifiedName);
+
+            AtlasEntity entity = getFieldEntity(field, schemaName, namespace, version ,null, fullname);
+
+            fieldEntity = createEntityInAtlas(new AtlasEntityWithExtInfo(entity));
+        } else {
+            System.out.println("---Updating Avro field "  + fullname);
+            LOG.info("Avro field {} already exists in Atlas. Updating it..", fieldQualifiedName);
+
+            AtlasEntity entity = getFieldEntity(field, schemaName, namespace, version, fieldEntity.getEntity(), fullname);
+
+            fieldEntity.setEntity(entity);
+
+            fieldEntity = updateEntityInAtlas(fieldEntity);
+        }
+
+        return fieldEntity;
+    }
+
+
+    @VisibleForTesting
     AtlasEntity getTopicEntity(String topic, AtlasEntity topicEntity) throws Exception {
         final AtlasEntity ret;
+        List<AtlasEntity> createdSchemas;
 
         if (topicEntity == null) {
             ret = new AtlasEntity(KafkaDataTypes.KAFKA_TOPIC.getName());
@@ -250,33 +342,140 @@ public class KafkaBridge {
             throw new Exception("Error while getting partition data for topic :" + topic, e);
         }
 
+        createdSchemas = findOrCreateAtlasSchema(topic);
+
+        if(createdSchemas.size() > 0) {
+            ret.setAttribute(AVRO_SCHEMA, createdSchemas);
+            ret.setRelationshipAttribute(AVRO_SCHEMA, createdSchemas);
+        }
+
         return ret;
     }
 
     @VisibleForTesting
-    static String getTopicQualifiedName(String metadataNamespace, String topic) {
-        return String.format(FORMAT_KAFKA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), metadataNamespace);
+    AtlasEntity getSchemaEntity(String schema, String schemaName, String namespace, int version, AtlasEntity schemaEntity) throws Exception {
+        final AtlasEntity ret;
+        List<AtlasEntity> createdFields = new ArrayList<>();
+
+
+        if (schemaEntity == null) {
+            ret = new AtlasEntity(KafkaDataTypes.AVRO_SCHEMA.getName());
+        } else {
+            ret = schemaEntity;
+        }
+
+        Schema parsedSchema = new Schema.Parser().parse(schema);
+
+        String qualifiedName = getSchemaQualifiedName(metadataNamespace, schemaName + "-value", "v" + version);
+
+        if (namespace == null) {
+            namespace = (parsedSchema.getNamespace() != null) ? parsedSchema.getNamespace() : KAFKA_METADATA_NAMESPACE;
+        }
+
+        ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+        ret.setAttribute(TYPE, parsedSchema.getType());
+        ret.setAttribute(NAMESPACE, namespace);
+        ret.setAttribute(NAME,parsedSchema.getName() + "(v" + version + ")");
+        ret.setAttribute(SCHEMA_VERSION_ID, version);
+
+        createdFields = createNestedFields(parsedSchema, schemaName, namespace, version, "");
+
+        if(createdFields.size() > 0) {
+            ret.setRelationshipAttribute(FIELDS, createdFields);
+        }
+
+        return ret;
     }
 
-    private AtlasEntityWithExtInfo findTopicEntityInAtlas(String topicQualifiedName) {
-        AtlasEntityWithExtInfo ret = null;
+    List<AtlasEntity> createNestedFields(Schema parsedSchema, String schemaName, String namespace, int version, String fullname) throws Exception {
+        List<AtlasEntity>  entityArray = new ArrayList<>();
+        AtlasEntityWithExtInfo fieldInAtlas;
+        JSONParser parser = new JSONParser();
 
-        try {
-            ret = findEntityInAtlas(KafkaDataTypes.KAFKA_TOPIC.getName(), topicQualifiedName);
+        for (Schema.Field field:parsedSchema.getFields()) {
+
+            if(field.schema().getType() == Schema.Type.ARRAY){
+                System.out.println("ARRAY DETECTED");
+                String subfields = ((JSONObject) parser.parse(field.schema().toString())).get("items").toString();
+                Schema parsedSubSchema = new Schema.Parser().parse(subfields);
+
+                fullname = concatFullname(field.name(), fullname, parsedSubSchema.getName());
+
+                entityArray.addAll(createNestedFields(parsedSubSchema, schemaName, namespace, version, fullname));
+            }
+
+            else if(field.schema().getType() == Schema.Type.RECORD && !schemaName.equals(field.name())) {
+                    System.out.println("NESTED RECORD DETECTED");
+                    fullname = concatFullname(field.name(), fullname, "");
+                    entityArray.addAll(createNestedFields(field.schema(), schemaName, namespace, version, fullname));
+            }
+
+            else{
+                fieldInAtlas = createOrUpdateField(field, schemaName, namespace, version, fullname);
 
-            clearRelationshipAttributes(ret);
-        } catch (Exception e) {
-            ret = null; // entity doesn't exist in Atlas
+                entityArray.add(fieldInAtlas.getEntity());
+            }
+        }
+        entityArray.sort((o1, o2) -> {
+            if (o1.getAttribute(NAME) != null && o2.getAttribute(NAME) != null) {
+                String str1 = o1.getAttribute(NAME).toString();
+                String str2 = o2.getAttribute(NAME).toString();
+
+                return str1.compareTo(str2);
+            } else {
+                return 0;
+            }
+        });
+
+        return entityArray;
+    }
+
+    @VisibleForTesting
+    AtlasEntity getFieldEntity(Schema.Field field, String schemaName, String namespace, int version, AtlasEntity fieldEntity, String fullname) throws Exception {
+        AtlasEntity ret;
+
+        if (fieldEntity == null) {
+            ret = new AtlasEntity(KafkaDataTypes.AVRO_FIELD.getName());
+        } else {
+            ret = fieldEntity;
         }
 
+        String qualifiedName = getFieldQualifiedName(metadataNamespace, fullname, schemaName + "-value", "v" + version);
+
+        ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+        ret.setAttribute(NAME,fullname + "(v" + version + ")");
+        //ret.setAttribute(field.schema().getType()); --> does not work, since type expects array<avro_type>. Instead setting Description
+        ret.setAttribute(DESCRIPTION_ATTR, field.schema().getType());
         return ret;
     }
 
     @VisibleForTesting
+    static String getTopicQualifiedName(String metadataNamespace, String topic) {
+        return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), metadataNamespace);
+    }
+
+    @VisibleForTesting
+    static String getSchemaQualifiedName(String metadataNamespace, String schema, String version) {
+        return String.format(FORMAT_KAKFA_SCHEMA_QUALIFIED_NAME, schema.toLowerCase(), version, metadataNamespace);
+    }
+
+    @VisibleForTesting
+    static String getFieldQualifiedName(String metadataNamespace, String field, String schemaName, String version) {
+        return String.format(FORMAT_KAKFA_FIELD_QUALIFIED_NAME , field.toLowerCase(), schemaName.toLowerCase(), version, metadataNamespace);
+    }
+
+    @VisibleForTesting
      AtlasEntityWithExtInfo findEntityInAtlas(String typeName, String qualifiedName) throws Exception {
-        Map<String, String> attributes = Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
+        AtlasEntityWithExtInfo ret = null;
+
+        try {
+            ret = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
+        }
+        catch (Exception e){
+            LOG.info("Exception on finding Atlas Entity: {}", e);
+        }
 
-        return atlasClientV2.getEntityByAttribute(typeName, attributes);
+        return ret;
     }
 
     @VisibleForTesting
@@ -284,7 +483,6 @@ public class KafkaBridge {
         AtlasEntityWithExtInfo  ret      = null;
         EntityMutationResponse  response = atlasClientV2.createEntity(entity);
         List<AtlasEntityHeader> entities = response.getCreatedEntities();
-
         if (CollectionUtils.isNotEmpty(entities)) {
             AtlasEntityWithExtInfo getByGuidResponse = atlasClientV2.getEntityByGuid(entities.get(0).getGuid());
 
@@ -358,4 +556,66 @@ public class KafkaBridge {
             entity.getRelationshipAttributes().clear();
         }
     }
-}
+
+    private List<AtlasEntity> findOrCreateAtlasSchema(String schemaName) throws Exception {
+        List<AtlasEntity> entities = new ArrayList<>();
+        // Handling Schemas
+        ArrayList<Integer> versions = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(httpClient,schemaName + "-value");
+
+        for (int version:versions) {
+            String kafkaSchema = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(httpClient, schemaName + "-value", version);
+
+            if(kafkaSchema != null) {
+                // Schema exists in Kafka Schema Registry
+                System.out.println("---Found Schema " + schemaName + "-value in Kafka Schema Registry with Version " + version);
+                LOG.info("Found Schema {}-value in Kafka Schema Registry with Version {}", schemaName, version);
+
+                AtlasEntityWithExtInfo atlasSchemaEntity = findEntityInAtlas(KafkaDataTypes.AVRO_SCHEMA.getName(), getSchemaQualifiedName(metadataNamespace, schemaName  + "-value", "v" + version));
+
+                if(atlasSchemaEntity != null) {
+                    // Schema exists in Kafka Schema Registry AND in Atlas
+
+                    System.out.println("---Found Entity avro_schema " + schemaName + " in Atlas");
+                    LOG.info("Found Entity avro_schema {} in Atlas", schemaName);
+
+                    AtlasEntityWithExtInfo createdSchema = createOrUpdateSchema(kafkaSchema, schemaName, null, version);
+
+                    entities.add(createdSchema.getEntity());
+                }
+                else {
+                    // Schema exists in Kafka Schema Registry but NOT in Atlas
+                    System.out.println("---NOT Found Entity avro_schema " + schemaName + " in Atlas");
+                    LOG.info("NOT Found Entity avro_schema {} in Atlas", schemaName);
+
+                    AtlasEntityWithExtInfo createdSchema = createOrUpdateSchema(kafkaSchema, schemaName, null, version);
+
+                    entities.add(createdSchema.getEntity());
+                }
+            }
+        }
+
+        return entities;
+    }
+
+    private String concatFullname(String fieldName,String fullname, String subSchemaName){
+        if(fullname.isEmpty()){
+            if(subSchemaName.isEmpty()) {
+                fullname = fieldName;
+            }
+            else {
+                fullname = fieldName + "." + subSchemaName;
+            }
+
+        }
+        else{
+            if(subSchemaName.isEmpty()) {
+                fullname = fullname + "." + fieldName;
+            }
+            else {
+                fullname = fullname + "." + subSchemaName + "." + fieldName;
+            }
+        }
+
+        return fullname;
+    }
+}
\ No newline at end of file
diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java
new file mode 100644
index 0000000..d5d85d6
--- /dev/null
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/bridge/SchemaRegistryConnector.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.kafka.bridge;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+
+public class SchemaRegistryConnector {
+    private static final String SCHEMA_KEY = "schema";
+    private static final Logger LOG        = LoggerFactory.getLogger(SchemaRegistryConnector.class);
+
+    static ArrayList<Integer> getVersionsKafkaSchemaRegistry(CloseableHttpClient httpClient, String subject) throws IOException {
+        ArrayList<Integer> list = new ArrayList<>();
+        JSONParser parser = new JSONParser();
+
+        HttpGet getRequest = new HttpGet("http://" + KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME + "/subjects/" + subject + "/versions/");
+        getRequest.addHeader("accept", "application/json");
+        getRequest.addHeader("Content-Type", "application/vnd.schemaregistry.v1+json");
+
+        try {
+            CloseableHttpResponse response = httpClient.execute(getRequest);
+
+            if (response.getStatusLine().getStatusCode() == 200) {
+                //found corresponding Schema version in Registry
+                try {
+                    BufferedReader br = new BufferedReader(
+                            new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8));
+                    JSONArray output = (JSONArray) parser.parse(br.readLine());
+                    int len = output.size();
+                    for (int i = 0; i < len; i++) {
+                        list.add(((Long) output.get(i)).intValue());
+                    }
+
+                    System.out.println("---Found following versions to schema: " + subject + " Versions: " + list.toString());
+                    LOG.info("Found following versions to schema: {} Versions: {}", subject, list.toString());
+
+                    EntityUtils.consumeQuietly(response.getEntity());
+                    response.close(); // close response
+                    return list;
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    System.out.println("---Error reading versions to schema: " + subject + " in Kafka");
+                    LOG.error("Error reading versions to schema: " + subject + " in Kafka: ", e.getMessage());
+                }
+
+            } else if (response.getStatusLine().getStatusCode() == HttpStatus.SC_NOT_FOUND) {
+                // did not find any schema to the topic
+                System.out.println("---No schema versions found for schema: " + subject + " in Schema Registry");
+                LOG.info("No schema versions found for schema: {} in Schema Registry", subject);
+            } else {
+                // no connection to schema registry
+                System.out.println("---Cannot connect to schema registry");
+                LOG.warn("Cannot connect to schema registry");
+            }
+
+            EntityUtils.consumeQuietly(response.getEntity());
+            response.close();
+        }
+        catch(Exception e) {
+            System.out.println("---Error getting versions to schema: " + subject + " from Kafka");
+            LOG.error("Error getting versions to schema: " + subject + " from Kafka: ", e);
+        }
+        return list;
+    }
+
+    static String getSchemaFromKafkaSchemaRegistry(CloseableHttpClient httpClient, String subject, int version) throws IOException {
+        HttpGet getRequest = new HttpGet("http://" + KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME + "/subjects/" + subject + "/versions/" + version);
+        getRequest.addHeader("accept", "application/json");
+        getRequest.addHeader("Content-Type", "application/vnd.schemaregistry.v1+json");
+        JSONParser parser = new JSONParser();
+
+        CloseableHttpResponse response = httpClient.execute(getRequest);
+
+        if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK){
+            //found corresponding Schema in Registry
+            try {
+                BufferedReader br = new BufferedReader(
+                        new InputStreamReader((response.getEntity().getContent()), StandardCharsets.UTF_8));
+                JSONObject output = (JSONObject) parser.parse(br.readLine());
+
+                EntityUtils.consumeQuietly(response.getEntity());
+                response.close();
+                return output.get(SCHEMA_KEY).toString();
+            } catch (Exception e) {
+                e.printStackTrace();
+                System.out.println("---Error reading versions to schema: " + subject + " in Kafka");
+                LOG.error("Error reading versions to schema: " + subject + " in Kafka: ", e);
+            }
+
+        }
+
+        else if (response.getStatusLine().getStatusCode() == 404) {
+            // did not find any schema to the topic
+            System.out.println("---Cannot find the corresponding schema to: " + subject + "in Kafka");
+            LOG.info("Cannot find the corresponding schema to: {} in Kafka", subject);
+        }
+
+        else {
+            // any other error when connecting to schema registry
+            System.out.println("---Cannot connect to schema registry at: " + KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME);
+            LOG.warn("Cannot connect to schema registry at: {}", KafkaBridge.KAFKA_SCHEMA_REGISTRY_HOSTNAME);
+        }
+
+        EntityUtils.consumeQuietly(response.getEntity());
+        response.close();
+        return null;
+    }
+}
diff --git a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/model/KafkaDataTypes.java b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/model/KafkaDataTypes.java
index 0f81b4c..200a759 100644
--- a/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/model/KafkaDataTypes.java
+++ b/addons/kafka-bridge/src/main/java/org/apache/atlas/kafka/model/KafkaDataTypes.java
@@ -23,7 +23,9 @@ package org.apache.atlas.kafka.model;
  */
 public enum KafkaDataTypes {
     // Classes
-    KAFKA_TOPIC;
+    KAFKA_TOPIC,
+    AVRO_SCHEMA,
+    AVRO_FIELD;
 
     public String getName() {
         return name().toLowerCase();
diff --git a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
index f86ceb5..adbaddd 100644
--- a/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
+++ b/addons/kafka-bridge/src/test/java/org/apache/atlas/kafka/bridge/KafkaBridgeTest.java
@@ -25,11 +25,22 @@ import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.utils.KafkaUtils;
+import org.apache.avro.Schema;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.impl.client.CloseableHttpClient;
 import org.mockito.ArgumentCaptor;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 
 
@@ -43,10 +54,21 @@ import static org.testng.Assert.assertEquals;
 public class KafkaBridgeTest {
 
     private static final String TEST_TOPIC_NAME = "test_topic";
-    public static final AtlasEntity.AtlasEntityWithExtInfo TOPIC_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo(
-            getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
     private static final String CLUSTER_NAME = "primary";
     private static final String TOPIC_QUALIFIED_NAME = KafkaBridge.getTopicQualifiedName(CLUSTER_NAME, TEST_TOPIC_NAME);
+    private static final String TEST_SCHEMA_NAME = "test_topic-value";
+    private static final int TEST_SCHEMA_VERSION = 1;
+    private static final String TEST_NAMESPACE = "test_namespace";
+    private static final ArrayList<Integer> TEST_SCHEMA_VERSION_LIST = new ArrayList<>(Arrays.asList(1, 2, 3, 4));
+    private static final String TEST_SCHEMA = "{\"name\":\"test\",\"namespace\":\"testing\",\"type\":\"record\",\"fields\":[{\"name\":\"Field1\",\"type\":\"string\"},{\"name\":\"Field2\",\"type\":\"int\"}]}";
+    private static final Schema.Field TEST_FIELD_NAME = new Schema.Parser().parse(TEST_SCHEMA).getField("Field1");
+    private static final String TEST_FIELD_FULLNAME = "Field1";
+    public static final AtlasEntity.AtlasEntityWithExtInfo TOPIC_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo(
+            getTopicEntityWithGuid("0dd466a4-3838-4537-8969-6abb8b9e9185"));
+    public static final AtlasEntity.AtlasEntityWithExtInfo SCHEMA_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo(
+            getSchemaEntityWithGuid("2a9894bb-e535-4aa1-a00b-a7d21ac20738"));
+    public static final AtlasEntity.AtlasEntityWithExtInfo FIELD_WITH_EXT_INFO = new AtlasEntity.AtlasEntityWithExtInfo(
+            getFieldEntityWithGuid("41d1011f-d428-4f5a-9578-0a0a0439147f"));
 
     @BeforeMethod
     public void initializeMocks() {
@@ -59,6 +81,18 @@ public class KafkaBridgeTest {
         return ret;
     }
 
+    private static AtlasEntity getSchemaEntityWithGuid(String guid) {
+        AtlasEntity ret = new AtlasEntity(KafkaDataTypes.AVRO_SCHEMA.getName());
+        ret.setGuid(guid);
+        return ret;
+    }
+
+    private static AtlasEntity getFieldEntityWithGuid(String guid) {
+        AtlasEntity ret = new AtlasEntity(KafkaDataTypes.AVRO_FIELD.getName());
+        ret.setGuid(guid);
+        return ret;
+    }
+
     @Test
     public void testImportTopic() throws Exception {
         KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
@@ -115,6 +149,58 @@ public class KafkaBridgeTest {
     }
 
     @Test
+    public void testCreateSchema() throws Exception {
+        KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+        when(mockKafkaUtils.listAllTopics())
+                .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+                .thenReturn(3);
+
+        EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class);
+        AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+        when(mockAtlasEntityHeader.getGuid()).thenReturn(SCHEMA_WITH_EXT_INFO.getEntity().getGuid());
+        when(mockCreateResponse.getCreatedEntities())
+                .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+        AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+        when(mockAtlasClientV2.createEntity(any()))
+                .thenReturn(mockCreateResponse);
+        when(mockAtlasClientV2.getEntityByGuid(SCHEMA_WITH_EXT_INFO.getEntity().getGuid()))
+                .thenReturn(SCHEMA_WITH_EXT_INFO);
+
+        KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+        AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateSchema(TEST_SCHEMA, TEST_SCHEMA_NAME, TEST_NAMESPACE, TEST_SCHEMA_VERSION);
+
+        assertEquals(SCHEMA_WITH_EXT_INFO, ret);
+    }
+
+    @Test
+    public void testCreateField() throws Exception {
+        KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+        when(mockKafkaUtils.listAllTopics())
+                .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+                .thenReturn(3);
+
+        EntityMutationResponse mockCreateResponse = mock(EntityMutationResponse.class);
+        AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+        when(mockAtlasEntityHeader.getGuid()).thenReturn(FIELD_WITH_EXT_INFO.getEntity().getGuid());
+        when(mockCreateResponse.getCreatedEntities())
+                .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+        AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+        when(mockAtlasClientV2.createEntity(any()))
+                .thenReturn(mockCreateResponse);
+        when(mockAtlasClientV2.getEntityByGuid(FIELD_WITH_EXT_INFO.getEntity().getGuid()))
+                .thenReturn(FIELD_WITH_EXT_INFO);
+
+        KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+        AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateField(TEST_FIELD_NAME, TEST_SCHEMA_NAME, TEST_NAMESPACE, TEST_SCHEMA_VERSION, TEST_FIELD_FULLNAME);
+
+        assertEquals(FIELD_WITH_EXT_INFO, ret);
+    }
+
+    @Test
     public void testUpdateTopic() throws Exception {
         KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
         when(mockKafkaUtils.listAllTopics())
@@ -141,4 +227,122 @@ public class KafkaBridgeTest {
 
         assertEquals(TOPIC_WITH_EXT_INFO, ret);
     }
+
+    @Test
+    public void testUpdateSchema() throws Exception {
+        KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+        when(mockKafkaUtils.listAllTopics())
+                .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+                .thenReturn(3);
+
+        EntityMutationResponse mockUpdateResponseSchema = mock(EntityMutationResponse.class);
+        AtlasEntityHeader mockAtlasEntityHeaderSchema = mock(AtlasEntityHeader.class);
+        when(mockAtlasEntityHeaderSchema.getGuid()).thenReturn(SCHEMA_WITH_EXT_INFO.getEntity().getGuid());
+        when(mockUpdateResponseSchema.getUpdatedEntities())
+                .thenReturn(Collections.singletonList(mockAtlasEntityHeaderSchema));
+
+        AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+        when(mockAtlasClientV2.getEntityByAttribute(eq(KafkaDataTypes.AVRO_SCHEMA.getName()), any()))
+                .thenReturn(SCHEMA_WITH_EXT_INFO);
+        when(mockAtlasClientV2.updateEntity(SCHEMA_WITH_EXT_INFO))
+                .thenReturn(mockUpdateResponseSchema);
+        when(mockAtlasClientV2.getEntityByGuid(SCHEMA_WITH_EXT_INFO.getEntity().getGuid()))
+                .thenReturn(SCHEMA_WITH_EXT_INFO);
+
+        EntityMutationResponse mockUpdateResponseField = mock(EntityMutationResponse.class);
+        AtlasEntityHeader mockAtlasEntityHeaderField = mock(AtlasEntityHeader.class);
+        when(mockAtlasEntityHeaderField.getGuid()).thenReturn(FIELD_WITH_EXT_INFO.getEntity().getGuid());
+        when(mockUpdateResponseField.getUpdatedEntities())
+                .thenReturn(Collections.singletonList(mockAtlasEntityHeaderField));
+
+        when(mockAtlasClientV2.getEntityByAttribute(eq(KafkaDataTypes.AVRO_FIELD.getName()), any()))
+                .thenReturn(FIELD_WITH_EXT_INFO);
+        when(mockAtlasClientV2.updateEntity(FIELD_WITH_EXT_INFO))
+                .thenReturn(mockUpdateResponseField);
+        when(mockAtlasClientV2.getEntityByGuid(FIELD_WITH_EXT_INFO.getEntity().getGuid()))
+                .thenReturn(FIELD_WITH_EXT_INFO);
+
+        KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+        AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateSchema(TEST_SCHEMA, TEST_SCHEMA_NAME, TEST_NAMESPACE, TEST_SCHEMA_VERSION);
+
+        System.out.println(SCHEMA_WITH_EXT_INFO);
+        System.out.println(ret);
+        assertEquals(SCHEMA_WITH_EXT_INFO, ret);
+    }
+
+    @Test
+    public void testUpdateField() throws Exception {
+        KafkaUtils mockKafkaUtils = mock(KafkaUtils.class);
+        when(mockKafkaUtils.listAllTopics())
+                .thenReturn(Collections.singletonList(TEST_TOPIC_NAME));
+        when(mockKafkaUtils.getPartitionCount(TEST_TOPIC_NAME))
+                .thenReturn(3);
+
+        EntityMutationResponse mockUpdateResponse = mock(EntityMutationResponse.class);
+        AtlasEntityHeader mockAtlasEntityHeader = mock(AtlasEntityHeader.class);
+        when(mockAtlasEntityHeader.getGuid()).thenReturn(FIELD_WITH_EXT_INFO.getEntity().getGuid());
+        when(mockUpdateResponse.getUpdatedEntities())
+                .thenReturn(Collections.singletonList(mockAtlasEntityHeader));
+
+        AtlasClientV2 mockAtlasClientV2 = mock(AtlasClientV2.class);
+        when(mockAtlasClientV2.getEntityByAttribute(eq(KafkaDataTypes.AVRO_FIELD.getName()), any()))
+                .thenReturn(FIELD_WITH_EXT_INFO);
+        when(mockAtlasClientV2.updateEntity(any()))
+                .thenReturn(mockUpdateResponse);
+        when(mockAtlasClientV2.getEntityByGuid(FIELD_WITH_EXT_INFO.getEntity().getGuid()))
+                .thenReturn(FIELD_WITH_EXT_INFO);
+
+        KafkaBridge bridge = new KafkaBridge(ApplicationProperties.get(), mockAtlasClientV2, mockKafkaUtils);
+        AtlasEntity.AtlasEntityWithExtInfo ret = bridge.createOrUpdateField(TEST_FIELD_NAME, TEST_SCHEMA_NAME, TEST_NAMESPACE, TEST_SCHEMA_VERSION, TEST_FIELD_FULLNAME);
+
+        assertEquals(FIELD_WITH_EXT_INFO, ret);
+    }
+
+    @Test
+    public void testGetSchemas() throws Exception {
+        CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+        when(mockResponse.getStatusLine())
+                .thenReturn(mock(StatusLine.class));
+        when(mockResponse.getStatusLine().getStatusCode())
+                .thenReturn(HttpStatus.SC_OK);
+        when(mockResponse.getEntity())
+                .thenReturn(mock(HttpEntity.class));
+        when(mockResponse.getEntity().getContent())
+                .thenReturn(new ByteArrayInputStream(new String("{\"subject\":\"test-value\",\"version\":1,\"id\":1,\"schema\":"+ TEST_SCHEMA +"}").getBytes(StandardCharsets.UTF_8)));
+
+        CloseableHttpClient mockHttpClient = mock(CloseableHttpClient.class);
+        when(mockHttpClient.execute(any()))
+                .thenReturn(mockResponse);
+        when(mockHttpClient.getConnectionManager())
+                .thenReturn(mock(ClientConnectionManager.class));
+
+        String ret = SchemaRegistryConnector.getSchemaFromKafkaSchemaRegistry(mockHttpClient, TEST_SCHEMA_NAME,TEST_SCHEMA_VERSION);
+
+        assertEquals(TEST_SCHEMA, ret);
+    }
+
+    @Test
+    public void testGetSchemaVersions() throws Exception {
+        CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+        when(mockResponse.getStatusLine())
+                .thenReturn(mock(StatusLine.class));
+        when(mockResponse.getStatusLine().getStatusCode())
+                .thenReturn(HttpStatus.SC_OK);
+        when(mockResponse.getEntity())
+                .thenReturn(mock(HttpEntity.class));
+        when(mockResponse.getEntity().getContent())
+                .thenReturn(new ByteArrayInputStream(new String(TEST_SCHEMA_VERSION_LIST.toString()).getBytes(StandardCharsets.UTF_8)));
+
+        CloseableHttpClient mockHttpClient = mock(CloseableHttpClient.class);
+        when(mockHttpClient.execute(any()))
+                .thenReturn(mockResponse);
+        when(mockHttpClient.getConnectionManager())
+                .thenReturn(mock(ClientConnectionManager.class));
+
+        ArrayList<Integer> ret = SchemaRegistryConnector.getVersionsKafkaSchemaRegistry(mockHttpClient, TEST_SCHEMA_NAME);
+
+        assertEquals(TEST_SCHEMA_VERSION_LIST, ret);
+    }
+
 }
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 3c17648..86519da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -680,6 +680,7 @@
         <aopalliance.version>1.0</aopalliance.version>
         <aspectj.runtime.version>1.8.7</aspectj.runtime.version>
         <atlas.surefire.options></atlas.surefire.options>
+        <avro.version>1.7.5</avro.version>
         <calcite.version>1.16.0</calcite.version>
         <checkstyle.failOnViolation>false</checkstyle.failOnViolation>
         <codehaus.woodstox.stax2-api.version>3.1.4</codehaus.woodstox.stax2-api.version>
@@ -728,6 +729,7 @@
         <jetty.version>9.4.31.v20200723</jetty.version>
         <joda-time.version>2.10.6</joda-time.version>
         <json.version>3.2.11</json.version>
+        <json-simple.version>1.1.1</json-simple.version>
         <jsr.version>1.1</jsr.version>
         <junit.version>4.13.1</junit.version>
         <kafka.scala.binary.version>2.12</kafka.scala.binary.version>