You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/03/20 14:57:11 UTC

[nifi] branch master updated (f694e64 -> 2feeb57)

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from f694e64  NIFI-7187 adding missing version strings from accumulo bundle pom - Removed Cat X JSON.org dep inclusion which seems to not be necessary - Updated a ton of easier/safer looking deps - Updated tika due to CVE
     new 1fe7902  NIFI-7221 Initial work
     new 2feeb57  NIFI-7221 Support v2 and v3 protocol version for Hortonworks Schema Registry - Update nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java - Addressing review feedback

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/nifi/schema/access/SchemaField.java |   1 +
 .../serialization/record/SchemaIdentifier.java     |   9 +-
 .../record/StandardSchemaIdentifier.java           |  40 ++++--
 .../SchemaRegistryRecordSetWriter.java             |  72 +++++++++--
 ...ortonworksAttributeSchemaReferenceStrategy.java |  63 ++++++----
 .../HortonworksAttributeSchemaReferenceWriter.java |  68 ++++++++---
 .../HortonworksEncodedSchemaReferenceStrategy.java |  67 ++++++++--
 .../HortonworksEncodedSchemaReferenceWriter.java   |  87 +++++++++----
 .../schema/access/HortonworksProtocolVersions.java |  56 +++++++++
 .../access/AbstractSchemaAccessStrategyTest.java   |   6 +-
 ...ortonworksAttributeSchemaReferenceStrategy.java | 110 ++++++++++++++++-
 ...tHortonworksAttributeSchemaReferenceWriter.java | 111 +++++++++++++++--
 ...estHortonworksEncodedSchemaReferenceWriter.java | 136 ++++++++++++++++++++-
 .../hortonworks/HortonworksSchemaRegistry.java     |  49 +++++++-
 .../org/apache/nifi/avro/AvroRecordSetWriter.java  |   2 +-
 .../org/apache/nifi/csv/CSVRecordSetWriter.java    |  16 +--
 .../org/apache/nifi/json/JsonRecordSetWriter.java  |  18 +--
 .../org/apache/nifi/xml/XMLRecordSetWriter.java    |   2 +-
 18 files changed, 778 insertions(+), 135 deletions(-)
 create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java


[nifi] 01/02: NIFI-7221 Initial work

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 1fe79021b59fb8763b4af3e911658800bed3d41a
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Wed Mar 4 19:25:26 2020 +0100

    NIFI-7221 Initial work
---
 .../org/apache/nifi/schema/access/SchemaField.java |  1 +
 .../serialization/record/SchemaIdentifier.java     | 16 ++++-
 .../record/StandardSchemaIdentifier.java           | 53 +++++++++++++--
 ...ortonworksAttributeSchemaReferenceStrategy.java | 52 ++++++++-------
 .../HortonworksAttributeSchemaReferenceWriter.java | 29 +++++----
 .../HortonworksEncodedSchemaReferenceStrategy.java | 75 ++++++++++++++++------
 .../HortonworksEncodedSchemaReferenceWriter.java   | 65 ++++++++++++-------
 .../hortonworks/HortonworksSchemaRegistry.java     | 50 ++++++++++++++-
 8 files changed, 257 insertions(+), 84 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
index 1844eea..f577f05 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
@@ -22,6 +22,7 @@ public enum SchemaField {
     SCHEMA_TEXT_FORMAT("Schema Text Format"),
     SCHEMA_NAME("Schema Name"),
     SCHEMA_IDENTIFIER("Schema Identifier"),
+    SCHEMA_VERSION_ID("Schema-Version Identifier"),
     SCHEMA_VERSION("Schema Version"),
     SCHEMA_BRANCH_NAME("Schema Branch Name");
 
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
index bca408a..4fb5371 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
@@ -39,12 +39,22 @@ public interface SchemaIdentifier {
     OptionalInt getVersion();
 
     /**
+     * @return the schema version ID of the schema, if one has been defined.
+     */
+    OptionalLong getSchemaVersionId();
+
+    /**
      * @return the name of the branch where the schema is located, if one has been defined
      */
     Optional<String> getBranch();
 
+    /**
+     * @return the protocol used to get this schema identifier
+     */
+    Integer getProtocol();
+
 
-    SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null);
+    SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null, -1);
 
     static Builder builder() {
         return new StandardSchemaIdentifier.Builder();
@@ -61,8 +71,12 @@ public interface SchemaIdentifier {
 
         Builder version(Integer version);
 
+        Builder schemaVersionId(Long schemaVersionId);
+
         Builder branch(String branch);
 
+        Builder protocol(Integer protocol);
+
         SchemaIdentifier build();
 
     }
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
index 712486b..0800982 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
@@ -25,15 +25,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
     private final Optional<String> name;
     private final OptionalLong identifier;
     private final OptionalInt version;
+    private final OptionalLong schemaVersionId;
     private final Optional<String> branch;
+    private final int protocol;
 
-    StandardSchemaIdentifier(final String name, final Long identifier, final Integer version, final String branch) {
+    StandardSchemaIdentifier(final String name, final Long identifier, final Integer version,
+            final Long schemaVersionId, final String branch, final int protocol) {
         this.name = Optional.ofNullable(name);
-        this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);;
-        this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);;
+        this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);
+        this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);
+        this.schemaVersionId = schemaVersionId == null ? OptionalLong.empty() : OptionalLong.of(schemaVersionId);
         this.branch = Optional.ofNullable(branch);
+        this.protocol = protocol;
 
-        if (this.name == null && this.identifier == null) {
+        if ((this.name == null && this.identifier == null) || this.schemaVersionId == null) {
             throw new IllegalStateException("Name or Identifier must be provided");
         }
     }
@@ -54,13 +59,24 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
     }
 
     @Override
+    public OptionalLong getSchemaVersionId() {
+        return schemaVersionId;
+    }
+
+    @Override
     public Optional<String> getBranch() {
         return branch;
     }
 
     @Override
+    public Integer getProtocol() {
+        return protocol;
+    }
+
+    @Override
     public int hashCode() {
-        return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode() + 41 * getBranch().hashCode();
+        return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode()
+                + 41 * getSchemaVersionId().hashCode() + 41 * getBranch().hashCode();
     }
 
     @Override
@@ -78,9 +94,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
         return getName().equals(other.getName())
                 && getIdentifier().equals(other.getIdentifier())
                 && getVersion().equals(other.getVersion())
+                && getSchemaVersionId().equals(other.getSchemaVersionId())
                 && getBranch().equals(other.getBranch());
     }
 
+    @Override
+    public String toString() {
+        return "[ name = " + name + ", "
+                + "identifier = " + identifier + ", "
+                + "version = " + version + ", "
+                + "schemaVersionId = " + schemaVersionId + ", "
+                + "branch = " + branch + ", "
+                + "protocol = " + protocol + " ]";
+    }
+
     /**
      * Builder to create instances of SchemaIdentifier.
      */
@@ -90,6 +117,8 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
         private String branch;
         private Long identifier;
         private Integer version;
+        private Long schemaVersionId;
+        private Integer protocol;
 
         @Override
         public SchemaIdentifier.Builder name(final String name) {
@@ -116,8 +145,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
         }
 
         @Override
+        public SchemaIdentifier.Builder schemaVersionId(final Long schemaVersionId) {
+            this.schemaVersionId = schemaVersionId;
+            return this;
+        }
+
+        @Override
+        public SchemaIdentifier.Builder protocol(final Integer protocol) {
+            this.protocol = protocol;
+            return this;
+        }
+
+        @Override
         public SchemaIdentifier build() {
-            return new StandardSchemaIdentifier(name, identifier, version, branch);
+            return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch, protocol);
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
index f4fdfcb..b20d683 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
@@ -17,10 +17,6 @@
 
 package org.apache.nifi.schema.access;
 
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
@@ -28,16 +24,20 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy {
     private final Set<SchemaField> schemaFields;
 
     public static final String SCHEMA_ID_ATTRIBUTE = "schema.identifier";
     public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version";
     public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = "schema.protocol.version";
-
+    public static final String SCHEMA_VERSION_ID_ATTRIBUTE = "schema.version.id";
 
     private final SchemaRegistry schemaRegistry;
-
+    static final int LATEST_PROTOCOL_VERSION = 3;
 
     public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) {
         this.schemaRegistry = schemaRegistry;
@@ -45,6 +45,7 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
         schemaFields = new HashSet<>();
         schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
         schemaFields.add(SchemaField.SCHEMA_VERSION);
+        schemaFields.add(SchemaField.SCHEMA_VERSION_ID);
         schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
     }
 
@@ -57,7 +58,8 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
         final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
         final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
         final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
-        if (schemaIdentifier == null || schemaVersion == null || schemaProtocol == null) {
+        final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
+        if ((schemaVersionId == null && (schemaIdentifier == null || schemaVersion == null)) || schemaProtocol == null) {
             throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because it is missing one of the following three required attributes: "
                 + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
         }
@@ -68,28 +70,34 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
         }
 
         final int protocol = Integer.parseInt(schemaProtocol);
-        if (protocol != 1) {
-            throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
-                + schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be 1.");
+        if (protocol > LATEST_PROTOCOL_VERSION) {
+            throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+                    + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocol + " or was not encoded with this data format");
         }
 
-        if (!isNumber(schemaIdentifier)) {
-            throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
-                + schemaProtocol + "', which is not a valid Schema Identifier number");
-        }
+        SchemaIdentifier identifier;
+        if (!isNumber(schemaVersionId)) {
+            if (!isNumber(schemaIdentifier)) {
+                throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+                    + schemaProtocol + "', which is not a valid Schema Identifier number");
+            }
 
-        if (!isNumber(schemaVersion)) {
-            throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
-                + schemaProtocol + "', which is not a valid Schema Version number");
-        }
+            if (!isNumber(schemaVersion)) {
+                throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+                    + schemaProtocol + "', which is not a valid Schema Version number");
+            }
 
-        final long schemaId = Long.parseLong(schemaIdentifier);
-        final int version = Integer.parseInt(schemaVersion);
+            final long schemaId = Long.parseLong(schemaIdentifier);
+            final int version = Integer.parseInt(schemaVersion);
+            identifier = SchemaIdentifier.builder().id(schemaId).version(version).protocol(protocol).build();
+        } else {
+            final long svi = Long.parseLong(schemaVersionId);
+            identifier = SchemaIdentifier.builder().schemaVersionId(svi).protocol(protocol).build();
+        }
 
-        final SchemaIdentifier identifier = SchemaIdentifier.builder().id(schemaId).version(version).build();
         final RecordSchema schema = schemaRegistry.retrieveSchema(identifier);
         if (schema == null) {
-            throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + version + "'");
+            throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + identifier.toString() + "'");
         }
 
         return schema;
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
index bea2f96..ad4558f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
@@ -17,9 +17,6 @@
 
 package org.apache.nifi.schema.access;
 
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.EnumSet;
@@ -27,9 +24,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter {
     private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
-    static final int LATEST_PROTOCOL_VERSION = 1;
+    static final int LATEST_PROTOCOL_VERSION = 3;
     static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
 
     @Override
@@ -46,23 +46,30 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
 
         attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
         attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
-        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(LATEST_PROTOCOL_VERSION));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(id.getProtocol()));
 
         if (id.getBranch().isPresent()) {
             attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get());
         }
 
+        if (id.getSchemaVersionId().isPresent()) {
+            attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(id.getSchemaVersionId().getAsLong()));
+        }
+
         return attributes;
     }
 
     @Override
     public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
-        final SchemaIdentifier id = schema.getIdentifier();
-        if (!id.getIdentifier().isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Identifier");
-        }
-        if (!id.getVersion().isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Version");
+        final SchemaIdentifier identifier = schema.getIdentifier();
+
+        if(!identifier.getSchemaVersionId().isPresent()) {
+            if (!identifier.getIdentifier().isPresent()) {
+                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
+            }
+            if (!identifier.getVersion().isPresent()) {
+                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
+            }
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
index 077016a..8f3c1b4 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
@@ -17,11 +17,6 @@
 
 package org.apache.nifi.schema.access;
 
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-import org.apache.nifi.stream.io.StreamUtils;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -30,8 +25,13 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.stream.io.StreamUtils;
+
 public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy {
-    private static final int LATEST_PROTOCOL_VERSION = 1;
+    private static final int LATEST_PROTOCOL_VERSION = 3;
 
     private final Set<SchemaField> schemaFields;
     private final SchemaRegistry schemaRegistry;
@@ -47,28 +47,67 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
 
     @Override
     public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
-        final byte[] buffer = new byte[13];
+        final byte[] buffer = new byte[1];
         try {
             StreamUtils.fillBuffer(contentStream, buffer);
         } catch (final IOException ioe) {
-            throw new SchemaNotFoundException("Could not read first 13 bytes from stream", ioe);
+            throw new SchemaNotFoundException("Could not read first byte from stream", ioe);
         }
 
         // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
-        // as it is provided at:
-        // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
+        // See: https://registry-project.readthedocs.io/en/latest/serdes.html#
         final ByteBuffer bb = ByteBuffer.wrap(buffer);
         final int protocolVersion = bb.get();
-        if (protocolVersion != 1) {
-            throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
-                + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
-        }
+        SchemaIdentifier schemaIdentifier;
+
+        switch(protocolVersion) {
+            case 1:
+                final byte[] bufferv1 = new byte[12];
+
+                try {
+                    StreamUtils.fillBuffer(contentStream, bufferv1);
+                } catch (final IOException ioe) {
+                    throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
+                }
+                final ByteBuffer bbv1 = ByteBuffer.wrap(buffer);
+
+                final long schemaId = bbv1.getLong();
+                final int schemaVersion = bbv1.getInt();
+                schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).protocol(protocolVersion).build();
+                return schemaRegistry.retrieveSchema(schemaIdentifier);
+
+            case 2:
+                final byte[] bufferv2 = new byte[8];
 
-        final long schemaId = bb.getLong();
-        final int schemaVersion = bb.getInt();
+                try {
+                    StreamUtils.fillBuffer(contentStream, bufferv2);
+                } catch (final IOException ioe) {
+                    throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
+                }
+                final ByteBuffer bbv2 = ByteBuffer.wrap(buffer);
 
-        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build();
-        return schemaRegistry.retrieveSchema(schemaIdentifier);
+                final long sviLong = bbv2.getLong();
+                schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).protocol(protocolVersion).build();
+                return schemaRegistry.retrieveSchema(schemaIdentifier);
+
+            case 3:
+                final byte[] bufferv3 = new byte[4];
+
+                try {
+                    StreamUtils.fillBuffer(contentStream, bufferv3);
+                } catch (final IOException ioe) {
+                    throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
+                }
+                final ByteBuffer bbv3 = ByteBuffer.wrap(buffer);
+
+                final int sviInt = bbv3.getInt();
+                schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).protocol(protocolVersion).build();
+                return schemaRegistry.retrieveSchema(schemaIdentifier);
+
+            default:
+                throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+                        + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
+        }
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
index cb4ed4e..99dbd1f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
@@ -17,38 +17,57 @@
 
 package org.apache.nifi.schema.access;
 
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Map;
-import java.util.OptionalInt;
-import java.util.OptionalLong;
 import java.util.Set;
 
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter {
     private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
-    private static final int LATEST_PROTOCOL_VERSION = 1;
+    private static final int LATEST_PROTOCOL_VERSION = 3;
 
     @Override
     public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
         final SchemaIdentifier identifier = schema.getIdentifier();
-        final Long id = identifier.getIdentifier().getAsLong();
-        final Integer version = identifier.getVersion().getAsInt();
 
-        // This decoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
-        // as it is provided at:
-        // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
-        final ByteBuffer bb = ByteBuffer.allocate(13);
-        bb.put((byte) LATEST_PROTOCOL_VERSION);
-        bb.putLong(id);
-        bb.putInt(version);
+        // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
+        // See: https://registry-project.readthedocs.io/en/latest/serdes.html#
+        switch(identifier.getProtocol()) {
+            case 1:
+                final Long id = identifier.getIdentifier().getAsLong();
+                final Integer version = identifier.getVersion().getAsInt();
+                final ByteBuffer bbv1 = ByteBuffer.allocate(13);
+                bbv1.put((byte) 1);
+                bbv1.putLong(id);
+                bbv1.putInt(version);
+                out.write(bbv1.array());
+                return;
+            case 2:
+                final Long sviV2 = identifier.getIdentifier().getAsLong();
+                final ByteBuffer bbv2 = ByteBuffer.allocate(9);
+                bbv2.put((byte) 2);
+                bbv2.putLong(sviV2);
+                out.write(bbv2.array());
+                return;
+            case 3:
+                final Long sviV3 = identifier.getIdentifier().getAsLong();
+                final ByteBuffer bbv3 = ByteBuffer.allocate(5);
+                bbv3.put((byte) 3);
+                bbv3.putInt(sviV3.intValue());
+                out.write(bbv3.array());
+                return;
+            default:
+                throw new IOException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+                        + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + identifier.getProtocol() + " or was not encoded with this data format");
+        }
+
 
-        out.write(bb.array());
     }
 
     @Override
@@ -59,14 +78,14 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
     @Override
     public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
         final SchemaIdentifier identifier = schema.getIdentifier();
-        final OptionalLong identifierOption = identifier.getIdentifier();
-        if (!identifierOption.isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
-        }
 
-        final OptionalInt versionOption = identifier.getVersion();
-        if (!versionOption.isPresent()) {
-            throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
+        if(!identifier.getSchemaVersionId().isPresent()) {
+            if (!identifier.getIdentifier().isPresent()) {
+                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
+            }
+            if (!identifier.getVersion().isPresent()) {
+                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
+            }
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index 0e42afc..c26c55a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.schemaregistry.hortonworks;
 
 import com.google.common.collect.ImmutableMap;
+import com.hortonworks.registries.schemaregistry.SchemaIdVersion;
 import com.hortonworks.registries.schemaregistry.SchemaMetadata;
 import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
 import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
@@ -64,8 +65,13 @@ import java.util.concurrent.TimeUnit;
 @Tags({"schema", "registry", "avro", "hortonworks", "hwx"})
 @CapabilityDescription("Provides a Schema Registry Service that interacts with a Hortonworks Schema Registry, available at https://github.com/hortonworks/registry")
 public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
-    private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_BRANCH_NAME, SchemaField.SCHEMA_TEXT,
-        SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
+    private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME,
+            SchemaField.SCHEMA_BRANCH_NAME,
+            SchemaField.SCHEMA_TEXT,
+            SchemaField.SCHEMA_TEXT_FORMAT,
+            SchemaField.SCHEMA_IDENTIFIER,
+            SchemaField.SCHEMA_VERSION,
+            SchemaField.SCHEMA_VERSION_ID);
 
     private static final String CLIENT_SSL_PROPERTY_PREFIX = "schema.registry.client.ssl";
 
@@ -420,6 +426,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
                 .name(schemaName.get())
                 .branch(schemaBranchName.orElse(null))
                 .version(versionInfo.getVersion())
+                .protocol(schemaIdentifier.getProtocol())
                 .build();
 
         final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@@ -470,6 +477,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
                 .name(schemaName)
                 .id(schemaId.getAsLong())
                 .version(version.getAsInt())
+                .protocol(schemaIdentifier.getProtocol())
                 .build();
 
         final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@@ -481,13 +489,49 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
 
     @Override
     public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
-        if (schemaIdentifier.getIdentifier().isPresent()) {
+        if (schemaIdentifier.getSchemaVersionId().isPresent()) {
+            return retrieveSchemaBySchemaVersionId(schemaIdentifier);
+        } else if (schemaIdentifier.getIdentifier().isPresent()) {
             return retrieveSchemaByIdAndVersion(schemaIdentifier);
         } else {
             return retrieveSchemaByName(schemaIdentifier);
         }
     }
 
+    private RecordSchema retrieveSchemaBySchemaVersionId(final SchemaIdentifier schemaIdentifier) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
+        final SchemaRegistryClient client = getClient();
+        final OptionalLong schemaVersionId = schemaIdentifier.getSchemaVersionId();
+
+        final SchemaIdVersion svi = new SchemaIdVersion(schemaVersionId.getAsLong());
+
+        final String schemaName;
+        final SchemaVersionInfo versionInfo;
+
+        try {
+            versionInfo = client.getSchemaVersionInfo(svi);
+            schemaName = versionInfo.getName();
+        } catch (final Exception e) {
+            handleException("Failed to retrieve schema with Schema Version ID '" + schemaVersionId.getAsLong() + "'", e);
+            return null;
+        }
+
+        final String schemaText = versionInfo.getSchemaText();
+
+        final SchemaIdentifier resultSchemaIdentifier = SchemaIdentifier.builder()
+                .name(schemaName)
+                .id(versionInfo.getSchemaMetadataId())
+                .version(versionInfo.getVersion())
+                .schemaVersionId(schemaVersionId.getAsLong())
+                .protocol(schemaIdentifier.getProtocol())
+                .build();
+
+        final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
+        return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
+            final Schema schema = new Schema.Parser().parse(schemaText);
+            return AvroTypeUtil.createSchema(schema, schemaText, resultSchemaIdentifier);
+        });
+    }
+
     private String createErrorMessage(final String baseMessage, final Optional<String> schemaName, final Optional<String> branchName, final OptionalInt version) {
         final StringBuilder builder = new StringBuilder(baseMessage)
                 .append(" with name '")


[nifi] 02/02: NIFI-7221 Support v2 and v3 protocol version for Hortonworks Schema Registry - Update nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java - Addressing review feedback

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 2feeb57159799928bdaebd3bc272f9e924a29a2f
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Thu Mar 5 15:52:08 2020 -0500

    NIFI-7221 Support v2 and v3 protocol version for Hortonworks Schema Registry
    - Update nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
    - Addressing review feedback
    
    This closes #4120.
---
 .../serialization/record/SchemaIdentifier.java     |   9 +-
 .../record/StandardSchemaIdentifier.java           |  25 +---
 .../SchemaRegistryRecordSetWriter.java             |  72 +++++++++--
 ...ortonworksAttributeSchemaReferenceStrategy.java |  71 ++++++-----
 .../HortonworksAttributeSchemaReferenceWriter.java |  77 ++++++++----
 .../HortonworksEncodedSchemaReferenceStrategy.java |  30 ++---
 .../HortonworksEncodedSchemaReferenceWriter.java   |  62 ++++++----
 .../schema/access/HortonworksProtocolVersions.java |  56 +++++++++
 .../access/AbstractSchemaAccessStrategyTest.java   |   6 +-
 ...ortonworksAttributeSchemaReferenceStrategy.java | 110 ++++++++++++++++-
 ...tHortonworksAttributeSchemaReferenceWriter.java | 111 +++++++++++++++--
 ...estHortonworksEncodedSchemaReferenceWriter.java | 136 ++++++++++++++++++++-
 .../hortonworks/HortonworksSchemaRegistry.java     |   5 +-
 .../org/apache/nifi/avro/AvroRecordSetWriter.java  |   2 +-
 .../org/apache/nifi/csv/CSVRecordSetWriter.java    |  16 +--
 .../org/apache/nifi/json/JsonRecordSetWriter.java  |  18 +--
 .../org/apache/nifi/xml/XMLRecordSetWriter.java    |   2 +-
 17 files changed, 639 insertions(+), 169 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
index 4fb5371..78cef12 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
@@ -48,13 +48,8 @@ public interface SchemaIdentifier {
      */
     Optional<String> getBranch();
 
-    /**
-     * @return the protocol used to get this schema identifier
-     */
-    Integer getProtocol();
 
-
-    SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null, -1);
+    SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null);
 
     static Builder builder() {
         return new StandardSchemaIdentifier.Builder();
@@ -75,8 +70,6 @@ public interface SchemaIdentifier {
 
         Builder branch(String branch);
 
-        Builder protocol(Integer protocol);
-
         SchemaIdentifier build();
 
     }
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
index 0800982..a9f859b 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
@@ -27,20 +27,14 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
     private final OptionalInt version;
     private final OptionalLong schemaVersionId;
     private final Optional<String> branch;
-    private final int protocol;
 
     StandardSchemaIdentifier(final String name, final Long identifier, final Integer version,
-            final Long schemaVersionId, final String branch, final int protocol) {
+            final Long schemaVersionId, final String branch) {
         this.name = Optional.ofNullable(name);
         this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);
         this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);
         this.schemaVersionId = schemaVersionId == null ? OptionalLong.empty() : OptionalLong.of(schemaVersionId);
         this.branch = Optional.ofNullable(branch);
-        this.protocol = protocol;
-
-        if ((this.name == null && this.identifier == null) || this.schemaVersionId == null) {
-            throw new IllegalStateException("Name or Identifier must be provided");
-        }
     }
 
     @Override
@@ -69,11 +63,6 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
     }
 
     @Override
-    public Integer getProtocol() {
-        return protocol;
-    }
-
-    @Override
     public int hashCode() {
         return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode()
                 + 41 * getSchemaVersionId().hashCode() + 41 * getBranch().hashCode();
@@ -104,8 +93,7 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
                 + "identifier = " + identifier + ", "
                 + "version = " + version + ", "
                 + "schemaVersionId = " + schemaVersionId + ", "
-                + "branch = " + branch + ", "
-                + "protocol = " + protocol + " ]";
+                + "branch = " + branch + " ]";
     }
 
     /**
@@ -118,7 +106,6 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
         private Long identifier;
         private Integer version;
         private Long schemaVersionId;
-        private Integer protocol;
 
         @Override
         public SchemaIdentifier.Builder name(final String name) {
@@ -151,14 +138,8 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
         }
 
         @Override
-        public SchemaIdentifier.Builder protocol(final Integer protocol) {
-            this.protocol = protocol;
-            return this;
-        }
-
-        @Override
         public SchemaIdentifier build() {
-            return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch, protocol);
+            return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch);
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
index 57ec05a..567d860 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
@@ -21,9 +21,12 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.schema.access.ConfluentSchemaRegistryWriter;
 import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter;
 import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter;
@@ -42,6 +45,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.apache.nifi.schema.access.SchemaAccessUtils.INHERIT_RECORD_SCHEMA;
@@ -80,6 +84,17 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
         .identifiesControllerService(RecordSchemaCacheService.class)
         .build();
 
+    static final PropertyDescriptor SCHEMA_PROTOCOL_VERSION = new Builder()
+            .name("schema-protocol-version")
+            .displayName("Schema Protocol Version")
+            .description("The protocol version to be used for Schema Write Strategies that require a protocol version, such as Hortonworks Schema Registry strategies. " +
+                    "Valid protocol versions for Hortonworks Schema Registry are integer values 1, 2, or 3.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .build();
+
     /**
      * This constant is just a base spec for the actual PropertyDescriptor.
      * As it can be overridden by subclasses with different AllowableValues and default value,
@@ -97,9 +112,12 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
 
     private final List<AllowableValue> schemaWriteStrategyList = Collections.unmodifiableList(Arrays.asList(
         SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA, NO_SCHEMA));
+
     private final List<AllowableValue> schemaAccessStrategyList = Collections.unmodifiableList(Arrays.asList(
         SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY));
 
+    private final Set<String> schemaWriteStrategiesRequiringProtocolVersion = new HashSet<>(Arrays.asList(
+            HWX_CONTENT_ENCODED_SCHEMA.getValue(), HWX_SCHEMA_REF_ATTRIBUTES.getValue()));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -112,6 +130,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
             .allowableValues(strategies)
             .build());
         properties.add(SCHEMA_CACHE);
+        properties.add(SCHEMA_PROTOCOL_VERSION);
         properties.addAll(super.getSupportedPropertyDescriptors());
 
         return properties;
@@ -134,10 +153,18 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
     public void storeSchemaWriteStrategy(final ConfigurationContext context) {
         this.configurationContext = context;
 
+        // If Schema Protocol Version is specified without EL then we can create it up front, otherwise when
+        // EL is present we will re-create it later so we can re-evaluate the EL against the incoming variables
+
         final String strategy = context.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
         if (strategy != null) {
             final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
-            this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService);
+
+            final PropertyValue protocolVersionValue = getConfigurationContext().getProperty(SCHEMA_PROTOCOL_VERSION);
+            if (!protocolVersionValue.isExpressionLanguagePresent()) {
+                final int protocolVersion = context.getProperty(SCHEMA_PROTOCOL_VERSION).asInteger();
+                this.schemaAccessWriter = createSchemaWriteStrategy(strategy, protocolVersion, recordSchemaCacheService);
+            }
         }
     }
 
@@ -146,7 +173,27 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
         return configurationContext;
     }
 
-    protected SchemaAccessWriter getSchemaAccessWriter(final RecordSchema schema) throws SchemaNotFoundException {
+    protected SchemaAccessWriter getSchemaAccessWriter(final RecordSchema schema, final Map<String,String> variables) throws SchemaNotFoundException {
+        // If Schema Protocol Version is using expression language, then we reevaluate against the passed in variables
+        final PropertyValue protocolVersionValue = getConfigurationContext().getProperty(SCHEMA_PROTOCOL_VERSION);
+        if (protocolVersionValue.isExpressionLanguagePresent()) {
+            final int protocolVersion;
+            final String protocolVersionString = protocolVersionValue.evaluateAttributeExpressions(variables).getValue();
+            try {
+                protocolVersion = Integer.parseInt(protocolVersionString);
+            } catch (NumberFormatException nfe) {
+                throw new SchemaNotFoundException("Unable to create Schema Write Strategy because " + SCHEMA_PROTOCOL_VERSION.getDisplayName()
+                        + " must be a positive integer, but was '" + protocolVersionString + "'", nfe);
+            }
+
+            // Now recreate the SchemaAccessWriter since we may have a new value for Schema Protocol Version
+            final String strategy = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
+            if (strategy != null) {
+                final RecordSchemaCacheService recordSchemaCacheService = getConfigurationContext().getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
+                schemaAccessWriter = createSchemaWriteStrategy(strategy, protocolVersion, recordSchemaCacheService);
+            }
+        }
+
         schemaAccessWriter.validateSchema(schema);
         return schemaAccessWriter;
     }
@@ -164,8 +211,8 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
         return schemaAccessWriter;
     }
 
-    private SchemaAccessWriter createSchemaWriteStrategy(final String strategy, final RecordSchemaCacheService recordSchemaCacheService) {
-        final SchemaAccessWriter writer = createRawSchemaWriteStrategy(strategy);
+    private SchemaAccessWriter createSchemaWriteStrategy(final String strategy, final Integer protocolVersion, final RecordSchemaCacheService recordSchemaCacheService) {
+        final SchemaAccessWriter writer = createRawSchemaWriteStrategy(strategy, protocolVersion);
         if (recordSchemaCacheService == null) {
             return writer;
         } else {
@@ -173,15 +220,15 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
         }
     }
 
-    private SchemaAccessWriter createRawSchemaWriteStrategy(final String strategy) {
+    private SchemaAccessWriter createRawSchemaWriteStrategy(final String strategy, final Integer protocolVersion) {
         if (strategy.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
             return new SchemaNameAsAttribute();
         } else if (strategy.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
             return new WriteAvroSchemaAttributeStrategy();
         } else if (strategy.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
-            return new HortonworksEncodedSchemaReferenceWriter();
+            return new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
         } else if (strategy.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
-            return new HortonworksAttributeSchemaReferenceWriter();
+            return new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
         } else if (strategy.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) {
             return new ConfluentSchemaRegistryWriter();
         } else if (strategy.equalsIgnoreCase(NO_SCHEMA.getValue())) {
@@ -221,6 +268,17 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
                 .build());
         }
 
+        final String schemaWriteStrategy = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
+        final String protocolVersion = validationContext.getProperty(SCHEMA_PROTOCOL_VERSION).getValue();
+
+        if (schemaWriteStrategy != null && schemaWriteStrategiesRequiringProtocolVersion.contains(schemaWriteStrategy) && protocolVersion == null) {
+            results.add(new ValidationResult.Builder()
+                    .subject(SCHEMA_PROTOCOL_VERSION.getDisplayName())
+                    .valid(false)
+                    .explanation("The configured Schema Write Strategy requires a Schema Protocol Version to be specified.")
+                    .build());
+        }
+
         return results;
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
index b20d683..cb03778 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
@@ -17,6 +17,10 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
@@ -24,10 +28,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy {
     private final Set<SchemaField> schemaFields;
 
@@ -37,7 +37,6 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
     public static final String SCHEMA_VERSION_ID_ATTRIBUTE = "schema.version.id";
 
     private final SchemaRegistry schemaRegistry;
-    static final int LATEST_PROTOCOL_VERSION = 3;
 
     public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) {
         this.schemaRegistry = schemaRegistry;
@@ -55,44 +54,52 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
 
     @Override
     public RecordSchema getSchema(Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
-        final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
-        final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
         final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
-        final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
-        if ((schemaVersionId == null && (schemaIdentifier == null || schemaVersion == null)) || schemaProtocol == null) {
-            throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because it is missing one of the following three required attributes: "
-                + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
-        }
-
         if (!isNumber(schemaProtocol)) {
             throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
                 + schemaProtocol + "', which is not a valid Protocol Version number");
         }
 
         final int protocol = Integer.parseInt(schemaProtocol);
-        if (protocol > LATEST_PROTOCOL_VERSION) {
-            throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
-                    + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocol + " or was not encoded with this data format");
+        if (protocol < HortonworksProtocolVersions.MIN_VERSION || protocol > HortonworksProtocolVersions.MAX_VERSION) {
+            throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
+                    + schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be a value between "
+                    + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
         }
 
         SchemaIdentifier identifier;
-        if (!isNumber(schemaVersionId)) {
-            if (!isNumber(schemaIdentifier)) {
-                throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
-                    + schemaProtocol + "', which is not a valid Schema Identifier number");
-            }
-
-            if (!isNumber(schemaVersion)) {
-                throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
-                    + schemaProtocol + "', which is not a valid Schema Version number");
-            }
 
-            final long schemaId = Long.parseLong(schemaIdentifier);
-            final int version = Integer.parseInt(schemaVersion);
-            identifier = SchemaIdentifier.builder().id(schemaId).version(version).protocol(protocol).build();
-        } else {
-            final long svi = Long.parseLong(schemaVersionId);
-            identifier = SchemaIdentifier.builder().schemaVersionId(svi).protocol(protocol).build();
+        switch (protocol) {
+            case 1:
+                final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
+                if (!isNumber(schemaIdentifier)) {
+                    throw new SchemaNotFoundException("Could not determine Schema because " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+                            + schemaIdentifier + "', which is not a valid Schema Identifier and is required by Protocol Version " + protocol);
+                }
+
+                final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
+                if (!isNumber(schemaVersion)) {
+                    throw new SchemaNotFoundException("Could not determine Schema because " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+                            + schemaVersion + "', which is not a valid Schema Version and is required by Protocol Version " + protocol);
+                }
+
+                final long schemaId = Long.parseLong(schemaIdentifier);
+                final int version = Integer.parseInt(schemaVersion);
+                identifier = SchemaIdentifier.builder().id(schemaId).version(version).build();
+                break;
+            case 2:
+            case 3:
+                final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
+                if (!isNumber(schemaVersionId)) {
+                    throw new SchemaNotFoundException("Could not determine schema because " + SCHEMA_VERSION_ID_ATTRIBUTE + " has a value of '"
+                            + schemaVersionId + "', which is not a valid Schema Version Identifier and is required by Protocol Version " + protocol);
+                }
+
+                final long svi = Long.parseLong(schemaVersionId);
+                identifier = SchemaIdentifier.builder().schemaVersionId(svi).build();
+                break;
+            default:
+                throw new SchemaNotFoundException("Unknown Protocol Version: " + protocol);
         }
 
         final RecordSchema schema = schemaRegistry.retrieveSchema(identifier);
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
index ad4558f..24de462 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
@@ -17,21 +17,30 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter {
-    private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
-    static final int LATEST_PROTOCOL_VERSION = 3;
+
     static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
 
+    private final int protocolVersion;
+
+    public HortonworksAttributeSchemaReferenceWriter(final int protocolVersion) {
+        this.protocolVersion = protocolVersion;
+
+        if (this.protocolVersion < HortonworksProtocolVersions.MIN_VERSION || this.protocolVersion > HortonworksProtocolVersions.MAX_VERSION) {
+            throw new IllegalArgumentException("Unknown Protocol Version '" + this.protocolVersion + "'. Protocol Version must be a value between "
+                    + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
+        }
+    }
+
     @Override
     public void writeHeader(RecordSchema schema, OutputStream out) throws IOException {
     }
@@ -41,21 +50,29 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
         final Map<String, String> attributes = new HashMap<>(4);
         final SchemaIdentifier id = schema.getIdentifier();
 
-        final Long schemaId = id.getIdentifier().getAsLong();
-        final Integer schemaVersion = id.getVersion().getAsInt();
+        switch (protocolVersion) {
+            case 1:
+                final Long schemaId = id.getIdentifier().getAsLong();
+                final Integer schemaVersion = id.getVersion().getAsInt();
+                attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+                attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
+                break;
+            case 2:
+            case 3:
+                final Long schemaVersionId = id.getSchemaVersionId().getAsLong();
+                attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+                break;
+            default:
+                // Can't reach this point
+                throw new IllegalStateException("Unknown Protocol Verison: " + protocolVersion);
+        }
 
-        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
-        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
-        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(id.getProtocol()));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocolVersion));
 
         if (id.getBranch().isPresent()) {
             attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get());
         }
 
-        if (id.getSchemaVersionId().isPresent()) {
-            attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(id.getSchemaVersionId().getAsLong()));
-        }
-
         return attributes;
     }
 
@@ -63,19 +80,33 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
     public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
         final SchemaIdentifier identifier = schema.getIdentifier();
 
-        if(!identifier.getSchemaVersionId().isPresent()) {
-            if (!identifier.getIdentifier().isPresent()) {
-                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
-            }
-            if (!identifier.getVersion().isPresent()) {
-                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
-            }
+        switch (protocolVersion) {
+            case 1:
+                if (!identifier.getIdentifier().isPresent()) {
+                    throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Identifier " +
+                            "is not known and is required for Protocol Version " + protocolVersion);
+                }
+                if (!identifier.getVersion().isPresent()) {
+                    throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Version " +
+                            "is not known and is required for Protocol Version " + protocolVersion);
+                }
+                break;
+            case 2:
+            case 3:
+                if (!identifier.getSchemaVersionId().isPresent()) {
+                    throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Version Identifier " +
+                            "is not known and is required for Protocol Version " + protocolVersion);
+                }
+                break;
+            default:
+                // Can't reach this point
+                throw new SchemaNotFoundException("Unknown Protocol Version: " + protocolVersion);
         }
     }
 
     @Override
     public Set<SchemaField> getRequiredSchemaFields() {
-        return requiredSchemaFields;
+        return HortonworksProtocolVersions.getRequiredSchemaFields(protocolVersion);
     }
 
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
index 8f3c1b4..137cee7 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
@@ -17,6 +17,11 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.stream.io.StreamUtils;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -25,13 +30,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-import org.apache.nifi.stream.io.StreamUtils;
-
 public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy {
-    private static final int LATEST_PROTOCOL_VERSION = 3;
 
     private final Set<SchemaField> schemaFields;
     private final SchemaRegistry schemaRegistry;
@@ -42,6 +41,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
         schemaFields = new HashSet<>();
         schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
         schemaFields.add(SchemaField.SCHEMA_VERSION);
+        schemaFields.add(SchemaField.SCHEMA_VERSION_ID);
         schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
     }
 
@@ -58,6 +58,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
         // See: https://registry-project.readthedocs.io/en/latest/serdes.html#
         final ByteBuffer bb = ByteBuffer.wrap(buffer);
         final int protocolVersion = bb.get();
+
         SchemaIdentifier schemaIdentifier;
 
         switch(protocolVersion) {
@@ -69,11 +70,11 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
                 } catch (final IOException ioe) {
                     throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
                 }
-                final ByteBuffer bbv1 = ByteBuffer.wrap(buffer);
+                final ByteBuffer bbv1 = ByteBuffer.wrap(bufferv1);
 
                 final long schemaId = bbv1.getLong();
                 final int schemaVersion = bbv1.getInt();
-                schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).protocol(protocolVersion).build();
+                schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build();
                 return schemaRegistry.retrieveSchema(schemaIdentifier);
 
             case 2:
@@ -84,10 +85,10 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
                 } catch (final IOException ioe) {
                     throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
                 }
-                final ByteBuffer bbv2 = ByteBuffer.wrap(buffer);
+                final ByteBuffer bbv2 = ByteBuffer.wrap(bufferv2);
 
                 final long sviLong = bbv2.getLong();
-                schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).protocol(protocolVersion).build();
+                schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).build();
                 return schemaRegistry.retrieveSchema(schemaIdentifier);
 
             case 3:
@@ -98,15 +99,16 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
                 } catch (final IOException ioe) {
                     throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
                 }
-                final ByteBuffer bbv3 = ByteBuffer.wrap(buffer);
+                final ByteBuffer bbv3 = ByteBuffer.wrap(bufferv3);
 
                 final int sviInt = bbv3.getInt();
-                schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).protocol(protocolVersion).build();
+                schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).build();
                 return schemaRegistry.retrieveSchema(schemaIdentifier);
 
             default:
-                throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
-                        + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
+                throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. Expected Protocol Version to be a value between "
+                        + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION
+                        + ", but data was encoded with protocol version " + protocolVersion + ".");
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
index 99dbd1f..504515a 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
@@ -17,20 +17,28 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter {
-    private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
-    private static final int LATEST_PROTOCOL_VERSION = 3;
+
+    private final int protocolVersion;
+
+    public HortonworksEncodedSchemaReferenceWriter(final int protocolVersion) {
+        this.protocolVersion = protocolVersion;
+
+        if (this.protocolVersion < HortonworksProtocolVersions.MIN_VERSION || this.protocolVersion > HortonworksProtocolVersions.MAX_VERSION) {
+            throw new IllegalArgumentException("Unknown Protocol Version '" + this.protocolVersion + "'. Protocol Version must be a value between "
+                    + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
+        }
+    }
 
     @Override
     public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
@@ -38,7 +46,7 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
 
         // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
         // See: https://registry-project.readthedocs.io/en/latest/serdes.html#
-        switch(identifier.getProtocol()) {
+        switch(protocolVersion) {
             case 1:
                 final Long id = identifier.getIdentifier().getAsLong();
                 final Integer version = identifier.getVersion().getAsInt();
@@ -49,25 +57,23 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
                 out.write(bbv1.array());
                 return;
             case 2:
-                final Long sviV2 = identifier.getIdentifier().getAsLong();
+                final Long sviV2 = identifier.getSchemaVersionId().getAsLong();
                 final ByteBuffer bbv2 = ByteBuffer.allocate(9);
                 bbv2.put((byte) 2);
                 bbv2.putLong(sviV2);
                 out.write(bbv2.array());
                 return;
             case 3:
-                final Long sviV3 = identifier.getIdentifier().getAsLong();
+                final Long sviV3 = identifier.getSchemaVersionId().getAsLong();
                 final ByteBuffer bbv3 = ByteBuffer.allocate(5);
                 bbv3.put((byte) 3);
                 bbv3.putInt(sviV3.intValue());
                 out.write(bbv3.array());
                 return;
             default:
-                throw new IOException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
-                        + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + identifier.getProtocol() + " or was not encoded with this data format");
+                // Can't reach this point
+                throw new IllegalStateException("Unknown Protocol Version: " + this.protocolVersion);
         }
-
-
     }
 
     @Override
@@ -79,19 +85,33 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
     public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
         final SchemaIdentifier identifier = schema.getIdentifier();
 
-        if(!identifier.getSchemaVersionId().isPresent()) {
-            if (!identifier.getIdentifier().isPresent()) {
-                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
-            }
-            if (!identifier.getVersion().isPresent()) {
-                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
-            }
+        switch (protocolVersion) {
+            case 1:
+                if (!identifier.getIdentifier().isPresent()) {
+                    throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier " +
+                            "is not known and is required for Protocol Version " + protocolVersion);
+                }
+                if (!identifier.getVersion().isPresent()) {
+                    throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version " +
+                            "is not known and is required for Protocol Version " + protocolVersion);
+                }
+                break;
+            case 2:
+            case 3:
+                if (!identifier.getSchemaVersionId().isPresent()) {
+                    throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version Identifier " +
+                            "is not known and is required for Protocol Version " + protocolVersion);
+                }
+                break;
+            default:
+                // Can't reach this point
+                throw new SchemaNotFoundException("Unknown Protocol Version: " + protocolVersion);
         }
     }
 
     @Override
     public Set<SchemaField> getRequiredSchemaFields() {
-        return requiredSchemaFields;
+        return HortonworksProtocolVersions.getRequiredSchemaFields(protocolVersion);
     }
 
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java
new file mode 100644
index 0000000..0fe0e83
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Constants related to Protocol Versions for Hortonworks Schema Registry.
+ */
+public class HortonworksProtocolVersions {
+
+    /**
+     * The minimum valid protocol version.
+     */
+    public static final int MIN_VERSION = 1;
+
+    /**
+     * The maximum valid protocol version.
+     */
+    public static final int MAX_VERSION = 3;
+
+    /**
+     * Map from protocol version to the required schema fields for the given version.
+     */
+    private static final Map<Integer, Set<SchemaField>> REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL;
+    static {
+        final Map<Integer,Set<SchemaField>> requiredFieldsByProtocol = new HashMap<>();
+        requiredFieldsByProtocol.put(1, EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION));
+        requiredFieldsByProtocol.put(2, EnumSet.of(SchemaField.SCHEMA_VERSION_ID));
+        requiredFieldsByProtocol.put(3, EnumSet.of(SchemaField.SCHEMA_VERSION_ID));
+        REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL = Collections.unmodifiableMap(requiredFieldsByProtocol);
+    }
+
+    public static Set<SchemaField> getRequiredSchemaFields(final Integer protocolVersion) {
+        return REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL.get(protocolVersion);
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
index f613ab8..f9dbb20 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
@@ -42,7 +42,11 @@ public class AbstractSchemaAccessStrategyTest {
         fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
 
         final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
-                .name("person").branch("master").version(1).id(1L).build();
+                .name("person")
+                .branch("master")
+                .version(1)
+                .id(1L)
+                .build();
 
         this.recordSchema = new SimpleRecordSchema(fields, schemaIdentifier);
     }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
index a651a06..f41ccdf 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.when;
 public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSchemaAccessStrategyTest {
 
     @Test
-    public void testGetSchemaWithValidAttributes() throws IOException, SchemaNotFoundException {
+    public void testGetSchemaWithValidSchemaIdVersionAndProtocol() throws IOException, SchemaNotFoundException {
         final long schemaId = 123456;
         final int version = 2;
         final int protocol = 1;
@@ -56,9 +56,115 @@ public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSch
         assertNotNull(retrievedSchema);
     }
 
+    @Test
+    public void testGetSchemaWithValidSchemaVersionIdAndProtocol() throws IOException, SchemaNotFoundException {
+        final long schemaVersionId = 9999;
+        final int protocol = 2;
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
+
+        final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+
+        final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
+                .schemaVersionId(schemaVersionId)
+                .build();
+
+        when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                .thenReturn(recordSchema);
+
+        final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+        assertNotNull(retrievedSchema);
+    }
+
+    @Test
+    public void testGetSchemaWithAllAttributes() throws IOException, SchemaNotFoundException {
+        final long schemaId = 123456;
+        final int version = 2;
+        final long schemaVersionId = 9999;
+        final int protocol = 2;
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
+
+        final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+
+        // The schema version id should take precedence
+        final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
+                .schemaVersionId(schemaVersionId)
+                .build();
+
+        when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                .thenReturn(recordSchema);
+
+        final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+        assertNotNull(retrievedSchema);
+    }
+
     @Test(expected = SchemaNotFoundException.class)
-    public void testGetSchemaMissingAttributes() throws IOException, SchemaNotFoundException {
+    public void testGetSchemaMissingAllAttributes() throws IOException, SchemaNotFoundException {
         final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
         schemaAccessStrategy.getSchema(Collections.emptyMap(), null, recordSchema);
     }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testGetSchemaMissingProtocol() throws IOException, SchemaNotFoundException {
+        final long schemaId = 123456;
+        final int version = 2;
+        final long schemaVersionId = 9999;
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+
+        final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+        schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testGetSchemaWithInvalidProtocol() throws IOException, SchemaNotFoundException {
+        final long schemaId = 123456;
+        final int version = 2;
+        final long schemaVersionId = 9999;
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, "INVALID_PROTOCOL");
+
+        final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+        schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testGetSchemaNotFound() throws IOException, SchemaNotFoundException {
+        final long schemaId = 123456;
+        final int version = 2;
+        final long schemaVersionId = 9999;
+        final int protocol = 2;
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
+
+        final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+
+        // The schema version id should take precedence
+        final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
+                .schemaVersionId(schemaVersionId)
+                .build();
+
+        when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                .thenReturn(null);
+
+        schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+    }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
index ea3be57..dd7e034 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
@@ -31,29 +31,75 @@ import java.util.Map;
 public class TestHortonworksAttributeSchemaReferenceWriter {
 
     @Test
-    public void testValidateWithValidSchema() throws SchemaNotFoundException {
+    public void testValidateWithProtocol1AndValidSchema() throws SchemaNotFoundException {
         final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
         final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
 
-        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
         schemaAccessWriter.validateSchema(recordSchema);
     }
 
     @Test(expected = SchemaNotFoundException.class)
-    public void testValidateWithInvalidSchema() throws SchemaNotFoundException {
+    public void testValidateWithProtocol1AndMissingSchemaId() throws SchemaNotFoundException {
         final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
         final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
 
-        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol1AndMissingSchemaName() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test
+    public void testValidateWithProtocol2AndValidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(2);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol2AndMissingSchemaVersionId() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(2);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test
+    public void testValidateWithProtocol3AndValidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(3);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol3AndMissingSchemaVersionId() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(3);
         schemaAccessWriter.validateSchema(recordSchema);
     }
 
     @Test
-    public void testGetAttributesWithoutBranch() {
+    public void testGetAttributesWithProtocol1() {
+        final Integer protocolVersion = 1;
         final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
         final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
 
-        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
         final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
 
         Assert.assertEquals(3, attributes.size());
@@ -64,16 +110,17 @@ public class TestHortonworksAttributeSchemaReferenceWriter {
         Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
                 attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
 
-        Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION),
+        Assert.assertEquals(String.valueOf(protocolVersion),
                 attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
     }
 
     @Test
-    public void testGetAttributesWithBranch() {
+    public void testGetAttributesWithProtocol1AndBranch() {
+        final Integer protocolVersion = 1;
         final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).branch("foo").build();
         final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
 
-        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
         final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
 
         Assert.assertEquals(4, attributes.size());
@@ -84,10 +131,52 @@ public class TestHortonworksAttributeSchemaReferenceWriter {
         Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
                 attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
 
-        Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION),
+        Assert.assertEquals(String.valueOf(protocolVersion),
                 attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
 
-        Assert.assertEquals("foo", attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE));
+        Assert.assertEquals(schemaIdentifier.getBranch().get(),
+                attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE));
+    }
+
+    @Test
+    public void testGetAttributesWithProtocol2() {
+        final Integer protocolVersion = 2;
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
+        final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
+
+        Assert.assertEquals(2, attributes.size());
+
+        Assert.assertEquals(String.valueOf(protocolVersion),
+                attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
+
+        Assert.assertEquals(String.valueOf(schemaIdentifier.getSchemaVersionId().getAsLong()),
+                attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE));
+    }
+
+    @Test
+    public void testGetAttributesWithProtocol3() {
+        final Integer protocolVersion = 3;
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
+        final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
+
+        Assert.assertEquals(2, attributes.size());
+
+        Assert.assertEquals(String.valueOf(protocolVersion),
+                attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
+
+        Assert.assertEquals(String.valueOf(schemaIdentifier.getSchemaVersionId().getAsLong()),
+                attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidProtocolVersion() {
+        new HortonworksAttributeSchemaReferenceWriter(99);
     }
 
     private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
index b0589ea..524c8b2 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
@@ -18,6 +18,8 @@
 package org.apache.nifi.schema.access;
 
 import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.junit.Test;
@@ -26,17 +28,71 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
 public class TestHortonworksEncodedSchemaReferenceWriter {
 
     @Test
-    public void testHeader() throws IOException {
-        final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter();
+    public void testEncodeProtocolVersion1() throws IOException {
+        final long id = 48;
+        final int version = 2;
+        final int protocolVersion = 1;
 
-        final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(), SchemaIdentifier.builder().name("name").id( 48L).version( 2).build());
+        final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
+
+        final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
+                SchemaIdentifier.builder().name("name").id(id).version(version).build());
+
+        final byte[] header;
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            writer.writeHeader(schema, baos);
+            header = baos.toByteArray();
+        }
+
+        try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
+            assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
+            assertEquals(id, dis.readLong()); // verify schema id
+            assertEquals(version, dis.readInt()); // verify schema version
+            assertEquals(-1, dis.read()); // no more bytes
+        }
+    }
+
+    @Test
+    public void testEncodeProtocolVersion2() throws IOException {
+        final long schemaVersionId = 123;
+        final int protocolVersion = 2;
+
+        final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
+
+        final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
+                SchemaIdentifier.builder().schemaVersionId(schemaVersionId).build());
+
+        final byte[] header;
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            writer.writeHeader(schema, baos);
+            header = baos.toByteArray();
+        }
+
+        try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
+            assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
+            assertEquals(schemaVersionId, dis.readLong()); // verify schema version id
+            assertEquals(-1, dis.read()); // no more bytes
+        }
+    }
+
+    @Test
+    public void testEncodeProtocolVersion3() throws IOException {
+        final int schemaVersionId = 123;
+        final int protocolVersion = 3;
+
+        final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
+
+        final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
+                SchemaIdentifier.builder().schemaVersionId((long)schemaVersionId).build());
 
         final byte[] header;
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
@@ -45,11 +101,79 @@ public class TestHortonworksEncodedSchemaReferenceWriter {
         }
 
         try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
-            assertEquals(1, dis.read()); // verify 'protocol version'
-            assertEquals(48, dis.readLong()); // verify schema id
-            assertEquals(2, dis.readInt()); // verify schema version
+            assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
+            assertEquals(schemaVersionId, dis.readInt()); // verify schema version id
             assertEquals(-1, dis.read()); // no more bytes
         }
     }
 
+    @Test
+    public void testValidateWithProtocol1AndValidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol1AndMissingSchemaId() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol1AndMissingSchemaName() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test
+    public void testValidateWithProtocol2AndValidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(2);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol2AndMissingSchemaVersionId() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(2);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test
+    public void testValidateWithProtocol3AndValidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(3);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol3AndMissingSchemaVersionId() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(3);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
+        return new SimpleRecordSchema(fields, schemaIdentifier);
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index c26c55a..317c5e6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -426,7 +426,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
                 .name(schemaName.get())
                 .branch(schemaBranchName.orElse(null))
                 .version(versionInfo.getVersion())
-                .protocol(schemaIdentifier.getProtocol())
+                .schemaVersionId(versionInfo.getId())
                 .build();
 
         final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@@ -477,7 +477,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
                 .name(schemaName)
                 .id(schemaId.getAsLong())
                 .version(version.getAsInt())
-                .protocol(schemaIdentifier.getProtocol())
+                .schemaVersionId(versionInfo.getId())
                 .build();
 
         final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@@ -522,7 +522,6 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
                 .id(versionInfo.getSchemaMetadataId())
                 .version(versionInfo.getVersion())
                 .schemaVersionId(schemaVersionId.getAsLong())
-                .protocol(schemaIdentifier.getProtocol())
                 .build();
 
         final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index 5730ee3..08c0887 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -148,7 +148,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
             if (AVRO_EMBEDDED.getValue().equals(strategyValue)) {
                 return new WriteAvroResultWithSchema(avroSchema, out, getCodecFactory(compressionFormat));
             } else {
-                return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out, encoderPool, getLogger());
+                return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema, variables), out, encoderPool, getLogger());
             }
         } catch (final SchemaNotFoundException e) {
             throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
index 9326d8a..ca934a7 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
@@ -17,13 +17,6 @@
 
 package org.apache.nifi.csv;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.csv.CSVFormat;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -37,6 +30,13 @@ import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 @Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"})
 @CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written "
     + "will be the column names (unless the 'Include Header Line' property is false). All subsequent lines will be the values "
@@ -92,7 +92,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R
             csvFormat = CSVUtils.createCSVFormat(context, variables);
         }
 
-        return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out,
+        return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema, variables), out,
             getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet);
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
index 864574f..21ff5c6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -17,14 +17,6 @@
 
 package org.apache.nifi.json;
 
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.compress.compressors.CompressorException;
 import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -49,6 +41,14 @@ import org.tukaani.xz.XZOutputStream;
 import org.xerial.snappy.SnappyFramedOutputStream;
 import org.xerial.snappy.SnappyOutputStream;
 
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
 @Tags({"json", "resultset", "writer", "serialize", "record", "recordset", "row"})
 @CapabilityDescription("Writes the results of a RecordSet as either a JSON Array or one JSON object per line. If using Array output, then even if the RecordSet "
         + "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.")
@@ -210,7 +210,7 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
             throw new IOException(e);
         }
 
-        return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), compressionOut, prettyPrint, nullSuppression, outputGrouping,
+        return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema, variables), compressionOut, prettyPrint, nullSuppression, outputGrouping,
                 getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeTypeRef);
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java
index 8787498..6ad7006 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java
@@ -203,7 +203,7 @@ public class XMLRecordSetWriter extends DateTimeTextRecordSetWriter implements R
 
         final String charSet = getConfigurationContext().getProperty(CHARACTER_SET).getValue();
 
-        return new WriteXMLResult(schema, getSchemaAccessWriter(schema),
+        return new WriteXMLResult(schema, getSchemaAccessWriter(schema, variables),
                 out, prettyPrint, nullSuppressionEnum, arrayWrappingEnum, arrayTagName, rootTagName, recordTagName, charSet,
                 getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
     }