You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/03/20 14:57:12 UTC

[nifi] 01/02: NIFI-7221 Initial work

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 1fe79021b59fb8763b4af3e911658800bed3d41a
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Wed Mar 4 19:25:26 2020 +0100

    NIFI-7221 Initial work
---
 .../org/apache/nifi/schema/access/SchemaField.java |  1 +
 .../serialization/record/SchemaIdentifier.java     | 16 ++++-
 .../record/StandardSchemaIdentifier.java           | 53 +++++++++++++--
 ...ortonworksAttributeSchemaReferenceStrategy.java | 52 ++++++++-------
 .../HortonworksAttributeSchemaReferenceWriter.java | 29 +++++----
 .../HortonworksEncodedSchemaReferenceStrategy.java | 75 ++++++++++++++++------
 .../HortonworksEncodedSchemaReferenceWriter.java   | 65 ++++++++++++-------
 .../hortonworks/HortonworksSchemaRegistry.java     | 50 ++++++++++++++-
 8 files changed, 257 insertions(+), 84 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
index 1844eea..f577f05 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
@@ -22,6 +22,7 @@ public enum SchemaField {
     SCHEMA_TEXT_FORMAT("Schema Text Format"),
     SCHEMA_NAME("Schema Name"),
     SCHEMA_IDENTIFIER("Schema Identifier"),
+    SCHEMA_VERSION_ID("Schema-Version Identifier"),
     SCHEMA_VERSION("Schema Version"),
     SCHEMA_BRANCH_NAME("Schema Branch Name");
 
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
index bca408a..4fb5371 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
@@ -39,12 +39,22 @@ public interface SchemaIdentifier {
     OptionalInt getVersion();
 
     /**
+     * @return the schema version ID of the schema, if one has been defined.
+     */
+    OptionalLong getSchemaVersionId();
+
+    /**
      * @return the name of the branch where the schema is located, if one has been defined
      */
     Optional<String> getBranch();
 
+    /**
+     * @return the protocol used to get this schema identifier
+     */
+    Integer getProtocol();
+
 
-    SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null);
+    SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null, -1);
 
     static Builder builder() {
         return new StandardSchemaIdentifier.Builder();
@@ -61,8 +71,12 @@ public interface SchemaIdentifier {
 
         Builder version(Integer version);
 
+        Builder schemaVersionId(Long schemaVersionId);
+
         Builder branch(String branch);
 
+        Builder protocol(Integer protocol);
+
         SchemaIdentifier build();
 
     }
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
index 712486b..0800982 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
@@ -25,15 +25,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
     private final Optional<String> name;
     private final OptionalLong identifier;
     private final OptionalInt version;
+    private final OptionalLong schemaVersionId;
     private final Optional<String> branch;
+    private final int protocol;
 
-    StandardSchemaIdentifier(final String name, final Long identifier, final Integer version, final String branch) {
+    StandardSchemaIdentifier(final String name, final Long identifier, final Integer version,
+            final Long schemaVersionId, final String branch, final int protocol) {
         this.name = Optional.ofNullable(name);
-        this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);;
-        this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);;
+        this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);
+        this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);
+        this.schemaVersionId = schemaVersionId == null ? OptionalLong.empty() : OptionalLong.of(schemaVersionId);
         this.branch = Optional.ofNullable(branch);
+        this.protocol = protocol;
 
-        if (this.name == null && this.identifier == null) {
+        if ((this.name == null && this.identifier == null) || this.schemaVersionId == null) {
             throw new IllegalStateException("Name or Identifier must be provided");
         }
     }
@@ -54,13 +59,24 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
     }
 
     @Override
+    public OptionalLong getSchemaVersionId() {
+        return schemaVersionId;
+    }
+
+    @Override
     public Optional<String> getBranch() {
         return branch;
     }
 
     @Override
+    public Integer getProtocol() {
+        return protocol;
+    }
+
+    @Override
     public int hashCode() {
-        return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode() + 41 * getBranch().hashCode();
+        return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode()
+                + 41 * getSchemaVersionId().hashCode() + 41 * getBranch().hashCode();
     }
 
     @Override
@@ -78,9 +94,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
         return getName().equals(other.getName())
                 && getIdentifier().equals(other.getIdentifier())
                 && getVersion().equals(other.getVersion())
+                && getSchemaVersionId().equals(other.getSchemaVersionId())
                 && getBranch().equals(other.getBranch());
     }
 
+    @Override
+    public String toString() {
+        return "[ name = " + name + ", "
+                + "identifier = " + identifier + ", "
+                + "version = " + version + ", "
+                + "schemaVersionId = " + schemaVersionId + ", "
+                + "branch = " + branch + ", "
+                + "protocol = " + protocol + " ]";
+    }
+
     /**
      * Builder to create instances of SchemaIdentifier.
      */
@@ -90,6 +117,8 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
         private String branch;
         private Long identifier;
         private Integer version;
+        private Long schemaVersionId;
+        private Integer protocol;
 
         @Override
         public SchemaIdentifier.Builder name(final String name) {
@@ -116,8 +145,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
         }
 
         @Override
+        public SchemaIdentifier.Builder schemaVersionId(final Long schemaVersionId) {
+            this.schemaVersionId = schemaVersionId;
+            return this;
+        }
+
+        @Override
+        public SchemaIdentifier.Builder protocol(final Integer protocol) {
+            this.protocol = protocol;
+            return this;
+        }
+
+        @Override
         public SchemaIdentifier build() {
-            return new StandardSchemaIdentifier(name, identifier, version, branch);
+            return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch, protocol);
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
index f4fdfcb..b20d683 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
@@ -17,10 +17,6 @@
 
 package org.apache.nifi.schema.access;
 
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
@@ -28,16 +24,20 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy {
     private final Set<SchemaField> schemaFields;
 
     public static final String SCHEMA_ID_ATTRIBUTE = "schema.identifier";
     public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version";
     public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = "schema.protocol.version";
-
+    public static final String SCHEMA_VERSION_ID_ATTRIBUTE = "schema.version.id";
 
     private final SchemaRegistry schemaRegistry;
-
+    static final int LATEST_PROTOCOL_VERSION = 3;
 
     public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) {
         this.schemaRegistry = schemaRegistry;
@@ -45,6 +45,7 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
         schemaFields = new HashSet<>();
         schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
         schemaFields.add(SchemaField.SCHEMA_VERSION);
+        schemaFields.add(SchemaField.SCHEMA_VERSION_ID);
         schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
     }
 
@@ -57,7 +58,8 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
         final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
         final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
         final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
-        if (schemaIdentifier == null || schemaVersion == null || schemaProtocol == null) {
+        final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
+        if ((schemaVersionId == null && (schemaIdentifier == null || schemaVersion == null)) || schemaProtocol == null) {
             throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because it is missing one of the following three required attributes: "
                 + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
         }
@@ -68,28 +70,34 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
         }
 
         final int protocol = Integer.parseInt(schemaProtocol);
-        if (protocol != 1) {
-            throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
-                + schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be 1.");
+        if (protocol > LATEST_PROTOCOL_VERSION) {
+            throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+                    + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocol + " or was not encoded with this data format");
         }
 
-        if (!isNumber(schemaIdentifier)) {
-            throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
-                + schemaProtocol + "', which is not a valid Schema Identifier number");
-        }
+        SchemaIdentifier identifier;
+        if (!isNumber(schemaVersionId)) {
+            if (!isNumber(schemaIdentifier)) {
+                throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+                    + schemaProtocol + "', which is not a valid Schema Identifier number");
+            }
 
-        if (!isNumber(schemaVersion)) {
-            throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
-                + schemaProtocol + "', which is not a valid Schema Version number");
-        }
+            if (!isNumber(schemaVersion)) {
+                throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+                    + schemaProtocol + "', which is not a valid Schema Version number");
+            }
 
-        final long schemaId = Long.parseLong(schemaIdentifier);
-        final int version = Integer.parseInt(schemaVersion);
+            final long schemaId = Long.parseLong(schemaIdentifier);
+            final int version = Integer.parseInt(schemaVersion);
+            identifier = SchemaIdentifier.builder().id(schemaId).version(version).protocol(protocol).build();
+        } else {
+            final long svi = Long.parseLong(schemaVersionId);
+            identifier = SchemaIdentifier.builder().schemaVersionId(svi).protocol(protocol).build();
+        }
 
-        final SchemaIdentifier identifier = SchemaIdentifier.builder().id(schemaId).version(version).build();
         final RecordSchema schema = schemaRegistry.retrieveSchema(identifier);
         if (schema == null) {
-            throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + version + "'");
+            throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + identifier.toString() + "'");
         }
 
         return schema;
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
index bea2f96..ad4558f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
@@ -17,9 +17,6 @@
 
 package org.apache.nifi.schema.access;
 
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.EnumSet;
@@ -27,9 +24,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter {
     private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
-    static final int LATEST_PROTOCOL_VERSION = 1;
+    static final int LATEST_PROTOCOL_VERSION = 3;
     static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
 
     @Override
@@ -46,23 +46,30 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
 
         attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
         attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
-        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(LATEST_PROTOCOL_VERSION));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(id.getProtocol()));
 
         if (id.getBranch().isPresent()) {
             attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get());
         }
 
+        if (id.getSchemaVersionId().isPresent()) {
+            attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(id.getSchemaVersionId().getAsLong()));
+        }
+
         return attributes;
     }
 
     @Override
     public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
-        final SchemaIdentifier id = schema.getIdentifier();
-        if (!id.getIdentifier().isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Identifier");
-        }
-        if (!id.getVersion().isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Version");
+        final SchemaIdentifier identifier = schema.getIdentifier();
+
+        if(!identifier.getSchemaVersionId().isPresent()) {
+            if (!identifier.getIdentifier().isPresent()) {
+                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
+            }
+            if (!identifier.getVersion().isPresent()) {
+                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
+            }
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
index 077016a..8f3c1b4 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
@@ -17,11 +17,6 @@
 
 package org.apache.nifi.schema.access;
 
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-import org.apache.nifi.stream.io.StreamUtils;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -30,8 +25,13 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.stream.io.StreamUtils;
+
 public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy {
-    private static final int LATEST_PROTOCOL_VERSION = 1;
+    private static final int LATEST_PROTOCOL_VERSION = 3;
 
     private final Set<SchemaField> schemaFields;
     private final SchemaRegistry schemaRegistry;
@@ -47,28 +47,67 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
 
     @Override
     public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
-        final byte[] buffer = new byte[13];
+        final byte[] buffer = new byte[1];
         try {
             StreamUtils.fillBuffer(contentStream, buffer);
         } catch (final IOException ioe) {
-            throw new SchemaNotFoundException("Could not read first 13 bytes from stream", ioe);
+            throw new SchemaNotFoundException("Could not read first byte from stream", ioe);
         }
 
         // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
-        // as it is provided at:
-        // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
+        // See: https://registry-project.readthedocs.io/en/latest/serdes.html#
         final ByteBuffer bb = ByteBuffer.wrap(buffer);
         final int protocolVersion = bb.get();
-        if (protocolVersion != 1) {
-            throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
-                + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
-        }
+        SchemaIdentifier schemaIdentifier;
+
+        switch(protocolVersion) {
+            case 1:
+                final byte[] bufferv1 = new byte[12];
+
+                try {
+                    StreamUtils.fillBuffer(contentStream, bufferv1);
+                } catch (final IOException ioe) {
+                    throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
+                }
+                final ByteBuffer bbv1 = ByteBuffer.wrap(buffer);
+
+                final long schemaId = bbv1.getLong();
+                final int schemaVersion = bbv1.getInt();
+                schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).protocol(protocolVersion).build();
+                return schemaRegistry.retrieveSchema(schemaIdentifier);
+
+            case 2:
+                final byte[] bufferv2 = new byte[8];
 
-        final long schemaId = bb.getLong();
-        final int schemaVersion = bb.getInt();
+                try {
+                    StreamUtils.fillBuffer(contentStream, bufferv2);
+                } catch (final IOException ioe) {
+                    throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
+                }
+                final ByteBuffer bbv2 = ByteBuffer.wrap(buffer);
 
-        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build();
-        return schemaRegistry.retrieveSchema(schemaIdentifier);
+                final long sviLong = bbv2.getLong();
+                schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).protocol(protocolVersion).build();
+                return schemaRegistry.retrieveSchema(schemaIdentifier);
+
+            case 3:
+                final byte[] bufferv3 = new byte[4];
+
+                try {
+                    StreamUtils.fillBuffer(contentStream, bufferv3);
+                } catch (final IOException ioe) {
+                    throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
+                }
+                final ByteBuffer bbv3 = ByteBuffer.wrap(buffer);
+
+                final int sviInt = bbv3.getInt();
+                schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).protocol(protocolVersion).build();
+                return schemaRegistry.retrieveSchema(schemaIdentifier);
+
+            default:
+                throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+                        + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
+        }
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
index cb4ed4e..99dbd1f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
@@ -17,38 +17,57 @@
 
 package org.apache.nifi.schema.access;
 
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Map;
-import java.util.OptionalInt;
-import java.util.OptionalLong;
 import java.util.Set;
 
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter {
     private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
-    private static final int LATEST_PROTOCOL_VERSION = 1;
+    private static final int LATEST_PROTOCOL_VERSION = 3;
 
     @Override
     public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
         final SchemaIdentifier identifier = schema.getIdentifier();
-        final Long id = identifier.getIdentifier().getAsLong();
-        final Integer version = identifier.getVersion().getAsInt();
 
-        // This decoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
-        // as it is provided at:
-        // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
-        final ByteBuffer bb = ByteBuffer.allocate(13);
-        bb.put((byte) LATEST_PROTOCOL_VERSION);
-        bb.putLong(id);
-        bb.putInt(version);
+        // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
+        // See: https://registry-project.readthedocs.io/en/latest/serdes.html#
+        switch(identifier.getProtocol()) {
+            case 1:
+                final Long id = identifier.getIdentifier().getAsLong();
+                final Integer version = identifier.getVersion().getAsInt();
+                final ByteBuffer bbv1 = ByteBuffer.allocate(13);
+                bbv1.put((byte) 1);
+                bbv1.putLong(id);
+                bbv1.putInt(version);
+                out.write(bbv1.array());
+                return;
+            case 2:
+                final Long sviV2 = identifier.getIdentifier().getAsLong();
+                final ByteBuffer bbv2 = ByteBuffer.allocate(9);
+                bbv2.put((byte) 2);
+                bbv2.putLong(sviV2);
+                out.write(bbv2.array());
+                return;
+            case 3:
+                final Long sviV3 = identifier.getIdentifier().getAsLong();
+                final ByteBuffer bbv3 = ByteBuffer.allocate(5);
+                bbv3.put((byte) 3);
+                bbv3.putInt(sviV3.intValue());
+                out.write(bbv3.array());
+                return;
+            default:
+                throw new IOException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+                        + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + identifier.getProtocol() + " or was not encoded with this data format");
+        }
+
 
-        out.write(bb.array());
     }
 
     @Override
@@ -59,14 +78,14 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
     @Override
     public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
         final SchemaIdentifier identifier = schema.getIdentifier();
-        final OptionalLong identifierOption = identifier.getIdentifier();
-        if (!identifierOption.isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
-        }
 
-        final OptionalInt versionOption = identifier.getVersion();
-        if (!versionOption.isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
+        if(!identifier.getSchemaVersionId().isPresent()) {
+            if (!identifier.getIdentifier().isPresent()) {
+                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
+            }
+            if (!identifier.getVersion().isPresent()) {
+                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
+            }
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index 0e42afc..c26c55a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.schemaregistry.hortonworks;
 
 import com.google.common.collect.ImmutableMap;
+import com.hortonworks.registries.schemaregistry.SchemaIdVersion;
 import com.hortonworks.registries.schemaregistry.SchemaMetadata;
 import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
 import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
@@ -64,8 +65,13 @@ import java.util.concurrent.TimeUnit;
 @Tags({"schema", "registry", "avro", "hortonworks", "hwx"})
 @CapabilityDescription("Provides a Schema Registry Service that interacts with a Hortonworks Schema Registry, available at https://github.com/hortonworks/registry")
 public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
-    private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_BRANCH_NAME, SchemaField.SCHEMA_TEXT,
-        SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
+    private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME,
+            SchemaField.SCHEMA_BRANCH_NAME,
+            SchemaField.SCHEMA_TEXT,
+            SchemaField.SCHEMA_TEXT_FORMAT,
+            SchemaField.SCHEMA_IDENTIFIER,
+            SchemaField.SCHEMA_VERSION,
+            SchemaField.SCHEMA_VERSION_ID);
 
     private static final String CLIENT_SSL_PROPERTY_PREFIX = "schema.registry.client.ssl";
 
@@ -420,6 +426,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
                 .name(schemaName.get())
                 .branch(schemaBranchName.orElse(null))
                 .version(versionInfo.getVersion())
+                .protocol(schemaIdentifier.getProtocol())
                 .build();
 
         final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@@ -470,6 +477,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
                 .name(schemaName)
                 .id(schemaId.getAsLong())
                 .version(version.getAsInt())
+                .protocol(schemaIdentifier.getProtocol())
                 .build();
 
         final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@@ -481,13 +489,49 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
 
     @Override
     public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
-        if (schemaIdentifier.getIdentifier().isPresent()) {
+        if (schemaIdentifier.getSchemaVersionId().isPresent()) {
+            return retrieveSchemaBySchemaVersionId(schemaIdentifier);
+        } else if (schemaIdentifier.getIdentifier().isPresent()) {
             return retrieveSchemaByIdAndVersion(schemaIdentifier);
         } else {
             return retrieveSchemaByName(schemaIdentifier);
         }
     }
 
+    private RecordSchema retrieveSchemaBySchemaVersionId(final SchemaIdentifier schemaIdentifier) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
+        final SchemaRegistryClient client = getClient();
+        final OptionalLong schemaVersionId = schemaIdentifier.getSchemaVersionId();
+
+        final SchemaIdVersion svi = new SchemaIdVersion(schemaVersionId.getAsLong());
+
+        final String schemaName;
+        final SchemaVersionInfo versionInfo;
+
+        try {
+            versionInfo = client.getSchemaVersionInfo(svi);
+            schemaName = versionInfo.getName();
+        } catch (final Exception e) {
+            handleException("Failed to retrieve schema with Schema Version ID '" + schemaVersionId.getAsLong() + "'", e);
+            return null;
+        }
+
+        final String schemaText = versionInfo.getSchemaText();
+
+        final SchemaIdentifier resultSchemaIdentifier = SchemaIdentifier.builder()
+                .name(schemaName)
+                .id(versionInfo.getSchemaMetadataId())
+                .version(versionInfo.getVersion())
+                .schemaVersionId(schemaVersionId.getAsLong())
+                .protocol(schemaIdentifier.getProtocol())
+                .build();
+
+        final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
+        return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
+            final Schema schema = new Schema.Parser().parse(schemaText);
+            return AvroTypeUtil.createSchema(schema, schemaText, resultSchemaIdentifier);
+        });
+    }
+
     private String createErrorMessage(final String baseMessage, final Optional<String> schemaName, final Optional<String> branchName, final OptionalInt version) {
         final StringBuilder builder = new StringBuilder(baseMessage)
                 .append(" with name '")