You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/03/20 14:57:13 UTC

[nifi] 02/02: NIFI-7221 Support v2 and v3 protocol version for Hortonworks Schema Registry - Update nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java - Addressing review feedback

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 2feeb57159799928bdaebd3bc272f9e924a29a2f
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Thu Mar 5 15:52:08 2020 -0500

    NIFI-7221 Support v2 and v3 protocol version for Hortonworks Schema Registry
    - Update nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
    - Addressing review feedback
    
    This closes #4120.
---
 .../serialization/record/SchemaIdentifier.java     |   9 +-
 .../record/StandardSchemaIdentifier.java           |  25 +---
 .../SchemaRegistryRecordSetWriter.java             |  72 +++++++++--
 ...ortonworksAttributeSchemaReferenceStrategy.java |  71 ++++++-----
 .../HortonworksAttributeSchemaReferenceWriter.java |  77 ++++++++----
 .../HortonworksEncodedSchemaReferenceStrategy.java |  30 ++---
 .../HortonworksEncodedSchemaReferenceWriter.java   |  62 ++++++----
 .../schema/access/HortonworksProtocolVersions.java |  56 +++++++++
 .../access/AbstractSchemaAccessStrategyTest.java   |   6 +-
 ...ortonworksAttributeSchemaReferenceStrategy.java | 110 ++++++++++++++++-
 ...tHortonworksAttributeSchemaReferenceWriter.java | 111 +++++++++++++++--
 ...estHortonworksEncodedSchemaReferenceWriter.java | 136 ++++++++++++++++++++-
 .../hortonworks/HortonworksSchemaRegistry.java     |   5 +-
 .../org/apache/nifi/avro/AvroRecordSetWriter.java  |   2 +-
 .../org/apache/nifi/csv/CSVRecordSetWriter.java    |  16 +--
 .../org/apache/nifi/json/JsonRecordSetWriter.java  |  18 +--
 .../org/apache/nifi/xml/XMLRecordSetWriter.java    |   2 +-
 17 files changed, 639 insertions(+), 169 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
index 4fb5371..78cef12 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
@@ -48,13 +48,8 @@ public interface SchemaIdentifier {
      */
     Optional<String> getBranch();
 
-    /**
-     * @return the protocol used to get this schema identifier
-     */
-    Integer getProtocol();
 
-
-    SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null, -1);
+    SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null);
 
     static Builder builder() {
         return new StandardSchemaIdentifier.Builder();
@@ -75,8 +70,6 @@ public interface SchemaIdentifier {
 
         Builder branch(String branch);
 
-        Builder protocol(Integer protocol);
-
         SchemaIdentifier build();
 
     }
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
index 0800982..a9f859b 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
@@ -27,20 +27,14 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
     private final OptionalInt version;
     private final OptionalLong schemaVersionId;
     private final Optional<String> branch;
-    private final int protocol;
 
     StandardSchemaIdentifier(final String name, final Long identifier, final Integer version,
-            final Long schemaVersionId, final String branch, final int protocol) {
+            final Long schemaVersionId, final String branch) {
         this.name = Optional.ofNullable(name);
         this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);
         this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);
         this.schemaVersionId = schemaVersionId == null ? OptionalLong.empty() : OptionalLong.of(schemaVersionId);
         this.branch = Optional.ofNullable(branch);
-        this.protocol = protocol;
-
-        if ((this.name == null && this.identifier == null) || this.schemaVersionId == null) {
-            throw new IllegalStateException("Name or Identifier must be provided");
-        }
     }
 
     @Override
@@ -69,11 +63,6 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
     }
 
     @Override
-    public Integer getProtocol() {
-        return protocol;
-    }
-
-    @Override
     public int hashCode() {
         return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode()
                 + 41 * getSchemaVersionId().hashCode() + 41 * getBranch().hashCode();
@@ -104,8 +93,7 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
                 + "identifier = " + identifier + ", "
                 + "version = " + version + ", "
                 + "schemaVersionId = " + schemaVersionId + ", "
-                + "branch = " + branch + ", "
-                + "protocol = " + protocol + " ]";
+                + "branch = " + branch + " ]";
     }
 
     /**
@@ -118,7 +106,6 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
         private Long identifier;
         private Integer version;
         private Long schemaVersionId;
-        private Integer protocol;
 
         @Override
         public SchemaIdentifier.Builder name(final String name) {
@@ -151,14 +138,8 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
         }
 
         @Override
-        public SchemaIdentifier.Builder protocol(final Integer protocol) {
-            this.protocol = protocol;
-            return this;
-        }
-
-        @Override
         public SchemaIdentifier build() {
-            return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch, protocol);
+            return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch);
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
index 57ec05a..567d860 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
@@ -21,9 +21,12 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.schema.access.ConfluentSchemaRegistryWriter;
 import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter;
 import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter;
@@ -42,6 +45,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.apache.nifi.schema.access.SchemaAccessUtils.INHERIT_RECORD_SCHEMA;
@@ -80,6 +84,17 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
         .identifiesControllerService(RecordSchemaCacheService.class)
         .build();
 
+    static final PropertyDescriptor SCHEMA_PROTOCOL_VERSION = new Builder()
+            .name("schema-protocol-version")
+            .displayName("Schema Protocol Version")
+            .description("The protocol version to be used for Schema Write Strategies that require a protocol version, such as Hortonworks Schema Registry strategies. " +
+                    "Valid protocol versions for Hortonworks Schema Registry are integer values 1, 2, or 3.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .build();
+
     /**
      * This constant is just a base spec for the actual PropertyDescriptor.
      * As it can be overridden by subclasses with different AllowableValues and default value,
@@ -97,9 +112,12 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
 
     private final List<AllowableValue> schemaWriteStrategyList = Collections.unmodifiableList(Arrays.asList(
         SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA, NO_SCHEMA));
+
     private final List<AllowableValue> schemaAccessStrategyList = Collections.unmodifiableList(Arrays.asList(
         SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY));
 
+    private final Set<String> schemaWriteStrategiesRequiringProtocolVersion = new HashSet<>(Arrays.asList(
+            HWX_CONTENT_ENCODED_SCHEMA.getValue(), HWX_SCHEMA_REF_ATTRIBUTES.getValue()));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -112,6 +130,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
             .allowableValues(strategies)
             .build());
         properties.add(SCHEMA_CACHE);
+        properties.add(SCHEMA_PROTOCOL_VERSION);
         properties.addAll(super.getSupportedPropertyDescriptors());
 
         return properties;
@@ -134,10 +153,18 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
     public void storeSchemaWriteStrategy(final ConfigurationContext context) {
         this.configurationContext = context;
 
+        // If Schema Protocol Version is specified without EL then we can create it up front, otherwise when
+        // EL is present we will re-create it later so we can re-evaluate the EL against the incoming variables
+
         final String strategy = context.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
         if (strategy != null) {
             final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
-            this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService);
+
+            final PropertyValue protocolVersionValue = getConfigurationContext().getProperty(SCHEMA_PROTOCOL_VERSION);
+            if (!protocolVersionValue.isExpressionLanguagePresent()) {
+                final int protocolVersion = context.getProperty(SCHEMA_PROTOCOL_VERSION).asInteger();
+                this.schemaAccessWriter = createSchemaWriteStrategy(strategy, protocolVersion, recordSchemaCacheService);
+            }
         }
     }
 
@@ -146,7 +173,27 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
         return configurationContext;
     }
 
-    protected SchemaAccessWriter getSchemaAccessWriter(final RecordSchema schema) throws SchemaNotFoundException {
+    protected SchemaAccessWriter getSchemaAccessWriter(final RecordSchema schema, final Map<String,String> variables) throws SchemaNotFoundException {
+        // If Schema Protocol Version is using expression language, then we reevaluate against the passed in variables
+        final PropertyValue protocolVersionValue = getConfigurationContext().getProperty(SCHEMA_PROTOCOL_VERSION);
+        if (protocolVersionValue.isExpressionLanguagePresent()) {
+            final int protocolVersion;
+            final String protocolVersionString = protocolVersionValue.evaluateAttributeExpressions(variables).getValue();
+            try {
+                protocolVersion = Integer.parseInt(protocolVersionString);
+            } catch (NumberFormatException nfe) {
+                throw new SchemaNotFoundException("Unable to create Schema Write Strategy because " + SCHEMA_PROTOCOL_VERSION.getDisplayName()
+                        + " must be a positive integer, but was '" + protocolVersionString + "'", nfe);
+            }
+
+            // Now recreate the SchemaAccessWriter since we may have a new value for Schema Protocol Version
+            final String strategy = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
+            if (strategy != null) {
+                final RecordSchemaCacheService recordSchemaCacheService = getConfigurationContext().getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
+                schemaAccessWriter = createSchemaWriteStrategy(strategy, protocolVersion, recordSchemaCacheService);
+            }
+        }
+
         schemaAccessWriter.validateSchema(schema);
         return schemaAccessWriter;
     }
@@ -164,8 +211,8 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
         return schemaAccessWriter;
     }
 
-    private SchemaAccessWriter createSchemaWriteStrategy(final String strategy, final RecordSchemaCacheService recordSchemaCacheService) {
-        final SchemaAccessWriter writer = createRawSchemaWriteStrategy(strategy);
+    private SchemaAccessWriter createSchemaWriteStrategy(final String strategy, final Integer protocolVersion, final RecordSchemaCacheService recordSchemaCacheService) {
+        final SchemaAccessWriter writer = createRawSchemaWriteStrategy(strategy, protocolVersion);
         if (recordSchemaCacheService == null) {
             return writer;
         } else {
@@ -173,15 +220,15 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
         }
     }
 
-    private SchemaAccessWriter createRawSchemaWriteStrategy(final String strategy) {
+    private SchemaAccessWriter createRawSchemaWriteStrategy(final String strategy, final Integer protocolVersion) {
         if (strategy.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
             return new SchemaNameAsAttribute();
         } else if (strategy.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
             return new WriteAvroSchemaAttributeStrategy();
         } else if (strategy.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
-            return new HortonworksEncodedSchemaReferenceWriter();
+            return new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
         } else if (strategy.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
-            return new HortonworksAttributeSchemaReferenceWriter();
+            return new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
         } else if (strategy.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) {
             return new ConfluentSchemaRegistryWriter();
         } else if (strategy.equalsIgnoreCase(NO_SCHEMA.getValue())) {
@@ -221,6 +268,17 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
                 .build());
         }
 
+        final String schemaWriteStrategy = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
+        final String protocolVersion = validationContext.getProperty(SCHEMA_PROTOCOL_VERSION).getValue();
+
+        if (schemaWriteStrategy != null && schemaWriteStrategiesRequiringProtocolVersion.contains(schemaWriteStrategy) && protocolVersion == null) {
+            results.add(new ValidationResult.Builder()
+                    .subject(SCHEMA_PROTOCOL_VERSION.getDisplayName())
+                    .valid(false)
+                    .explanation("The configured Schema Write Strategy requires a Schema Protocol Version to be specified.")
+                    .build());
+        }
+
         return results;
     }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
index b20d683..cb03778 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
@@ -17,6 +17,10 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
@@ -24,10 +28,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy {
     private final Set<SchemaField> schemaFields;
 
@@ -37,7 +37,6 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
     public static final String SCHEMA_VERSION_ID_ATTRIBUTE = "schema.version.id";
 
     private final SchemaRegistry schemaRegistry;
-    static final int LATEST_PROTOCOL_VERSION = 3;
 
     public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) {
         this.schemaRegistry = schemaRegistry;
@@ -55,44 +54,52 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
 
     @Override
     public RecordSchema getSchema(Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
-        final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
-        final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
         final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
-        final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
-        if ((schemaVersionId == null && (schemaIdentifier == null || schemaVersion == null)) || schemaProtocol == null) {
-            throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because it is missing one of the following three required attributes: "
-                + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
-        }
-
         if (!isNumber(schemaProtocol)) {
             throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
                 + schemaProtocol + "', which is not a valid Protocol Version number");
         }
 
         final int protocol = Integer.parseInt(schemaProtocol);
-        if (protocol > LATEST_PROTOCOL_VERSION) {
-            throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
-                    + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocol + " or was not encoded with this data format");
+        if (protocol < HortonworksProtocolVersions.MIN_VERSION || protocol > HortonworksProtocolVersions.MAX_VERSION) {
+            throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
+                    + schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be a value between "
+                    + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
         }
 
         SchemaIdentifier identifier;
-        if (!isNumber(schemaVersionId)) {
-            if (!isNumber(schemaIdentifier)) {
-                throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
-                    + schemaProtocol + "', which is not a valid Schema Identifier number");
-            }
-
-            if (!isNumber(schemaVersion)) {
-                throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
-                    + schemaProtocol + "', which is not a valid Schema Version number");
-            }
 
-            final long schemaId = Long.parseLong(schemaIdentifier);
-            final int version = Integer.parseInt(schemaVersion);
-            identifier = SchemaIdentifier.builder().id(schemaId).version(version).protocol(protocol).build();
-        } else {
-            final long svi = Long.parseLong(schemaVersionId);
-            identifier = SchemaIdentifier.builder().schemaVersionId(svi).protocol(protocol).build();
+        switch (protocol) {
+            case 1:
+                final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
+                if (!isNumber(schemaIdentifier)) {
+                    throw new SchemaNotFoundException("Could not determine Schema because " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+                            + schemaIdentifier + "', which is not a valid Schema Identifier and is required by Protocol Version " + protocol);
+                }
+
+                final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
+                if (!isNumber(schemaVersion)) {
+                    throw new SchemaNotFoundException("Could not determine Schema because " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+                            + schemaVersion + "', which is not a valid Schema Version and is required by Protocol Version " + protocol);
+                }
+
+                final long schemaId = Long.parseLong(schemaIdentifier);
+                final int version = Integer.parseInt(schemaVersion);
+                identifier = SchemaIdentifier.builder().id(schemaId).version(version).build();
+                break;
+            case 2:
+            case 3:
+                final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
+                if (!isNumber(schemaVersionId)) {
+                    throw new SchemaNotFoundException("Could not determine schema because " + SCHEMA_VERSION_ID_ATTRIBUTE + " has a value of '"
+                            + schemaVersionId + "', which is not a valid Schema Version Identifier and is required by Protocol Version " + protocol);
+                }
+
+                final long svi = Long.parseLong(schemaVersionId);
+                identifier = SchemaIdentifier.builder().schemaVersionId(svi).build();
+                break;
+            default:
+                throw new SchemaNotFoundException("Unknown Protocol Version: " + protocol);
         }
 
         final RecordSchema schema = schemaRegistry.retrieveSchema(identifier);
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
index ad4558f..24de462 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
@@ -17,21 +17,30 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter {
-    private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
-    static final int LATEST_PROTOCOL_VERSION = 3;
+
     static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
 
+    private final int protocolVersion;
+
+    public HortonworksAttributeSchemaReferenceWriter(final int protocolVersion) {
+        this.protocolVersion = protocolVersion;
+
+        if (this.protocolVersion < HortonworksProtocolVersions.MIN_VERSION || this.protocolVersion > HortonworksProtocolVersions.MAX_VERSION) {
+            throw new IllegalArgumentException("Unknown Protocol Version '" + this.protocolVersion + "'. Protocol Version must be a value between "
+                    + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
+        }
+    }
+
     @Override
     public void writeHeader(RecordSchema schema, OutputStream out) throws IOException {
     }
@@ -41,21 +50,29 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
         final Map<String, String> attributes = new HashMap<>(4);
         final SchemaIdentifier id = schema.getIdentifier();
 
-        final Long schemaId = id.getIdentifier().getAsLong();
-        final Integer schemaVersion = id.getVersion().getAsInt();
+        switch (protocolVersion) {
+            case 1:
+                final Long schemaId = id.getIdentifier().getAsLong();
+                final Integer schemaVersion = id.getVersion().getAsInt();
+                attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+                attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
+                break;
+            case 2:
+            case 3:
+                final Long schemaVersionId = id.getSchemaVersionId().getAsLong();
+                attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+                break;
+            default:
+                // Can't reach this point
+                throw new IllegalStateException("Unknown Protocol Verison: " + protocolVersion);
+        }
 
-        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
-        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
-        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(id.getProtocol()));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocolVersion));
 
         if (id.getBranch().isPresent()) {
             attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get());
         }
 
-        if (id.getSchemaVersionId().isPresent()) {
-            attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(id.getSchemaVersionId().getAsLong()));
-        }
-
         return attributes;
     }
 
@@ -63,19 +80,33 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
     public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
         final SchemaIdentifier identifier = schema.getIdentifier();
 
-        if(!identifier.getSchemaVersionId().isPresent()) {
-            if (!identifier.getIdentifier().isPresent()) {
-                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
-            }
-            if (!identifier.getVersion().isPresent()) {
-                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
-            }
+        switch (protocolVersion) {
+            case 1:
+                if (!identifier.getIdentifier().isPresent()) {
+                    throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Identifier " +
+                            "is not known and is required for Protocol Version " + protocolVersion);
+                }
+                if (!identifier.getVersion().isPresent()) {
+                    throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Version " +
+                            "is not known and is required for Protocol Version " + protocolVersion);
+                }
+                break;
+            case 2:
+            case 3:
+                if (!identifier.getSchemaVersionId().isPresent()) {
+                    throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Version Identifier " +
+                            "is not known and is required for Protocol Version " + protocolVersion);
+                }
+                break;
+            default:
+                // Can't reach this point
+                throw new SchemaNotFoundException("Unknown Protocol Version: " + protocolVersion);
         }
     }
 
     @Override
     public Set<SchemaField> getRequiredSchemaFields() {
-        return requiredSchemaFields;
+        return HortonworksProtocolVersions.getRequiredSchemaFields(protocolVersion);
     }
 
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
index 8f3c1b4..137cee7 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
@@ -17,6 +17,11 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.stream.io.StreamUtils;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -25,13 +30,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-import org.apache.nifi.stream.io.StreamUtils;
-
 public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy {
-    private static final int LATEST_PROTOCOL_VERSION = 3;
 
     private final Set<SchemaField> schemaFields;
     private final SchemaRegistry schemaRegistry;
@@ -42,6 +41,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
         schemaFields = new HashSet<>();
         schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
         schemaFields.add(SchemaField.SCHEMA_VERSION);
+        schemaFields.add(SchemaField.SCHEMA_VERSION_ID);
         schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
     }
 
@@ -58,6 +58,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
         // See: https://registry-project.readthedocs.io/en/latest/serdes.html#
         final ByteBuffer bb = ByteBuffer.wrap(buffer);
         final int protocolVersion = bb.get();
+
         SchemaIdentifier schemaIdentifier;
 
         switch(protocolVersion) {
@@ -69,11 +70,11 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
                 } catch (final IOException ioe) {
                     throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
                 }
-                final ByteBuffer bbv1 = ByteBuffer.wrap(buffer);
+                final ByteBuffer bbv1 = ByteBuffer.wrap(bufferv1);
 
                 final long schemaId = bbv1.getLong();
                 final int schemaVersion = bbv1.getInt();
-                schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).protocol(protocolVersion).build();
+                schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build();
                 return schemaRegistry.retrieveSchema(schemaIdentifier);
 
             case 2:
@@ -84,10 +85,10 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
                 } catch (final IOException ioe) {
                     throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
                 }
-                final ByteBuffer bbv2 = ByteBuffer.wrap(buffer);
+                final ByteBuffer bbv2 = ByteBuffer.wrap(bufferv2);
 
                 final long sviLong = bbv2.getLong();
-                schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).protocol(protocolVersion).build();
+                schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).build();
                 return schemaRegistry.retrieveSchema(schemaIdentifier);
 
             case 3:
@@ -98,15 +99,16 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
                 } catch (final IOException ioe) {
                     throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
                 }
-                final ByteBuffer bbv3 = ByteBuffer.wrap(buffer);
+                final ByteBuffer bbv3 = ByteBuffer.wrap(bufferv3);
 
                 final int sviInt = bbv3.getInt();
-                schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).protocol(protocolVersion).build();
+                schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).build();
                 return schemaRegistry.retrieveSchema(schemaIdentifier);
 
             default:
-                throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
-                        + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
+                throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. Expected Protocol Version to be a value between "
+                        + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION
+                        + ", but data was encoded with protocol version " + protocolVersion + ".");
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
index 99dbd1f..504515a 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
@@ -17,20 +17,28 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter {
-    private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
-    private static final int LATEST_PROTOCOL_VERSION = 3;
+
+    private final int protocolVersion;
+
+    public HortonworksEncodedSchemaReferenceWriter(final int protocolVersion) {
+        this.protocolVersion = protocolVersion;
+
+        if (this.protocolVersion < HortonworksProtocolVersions.MIN_VERSION || this.protocolVersion > HortonworksProtocolVersions.MAX_VERSION) {
+            throw new IllegalArgumentException("Unknown Protocol Version '" + this.protocolVersion + "'. Protocol Version must be a value between "
+                    + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
+        }
+    }
 
     @Override
     public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
@@ -38,7 +46,7 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
 
         // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
         // See: https://registry-project.readthedocs.io/en/latest/serdes.html#
-        switch(identifier.getProtocol()) {
+        switch(protocolVersion) {
             case 1:
                 final Long id = identifier.getIdentifier().getAsLong();
                 final Integer version = identifier.getVersion().getAsInt();
@@ -49,25 +57,23 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
                 out.write(bbv1.array());
                 return;
             case 2:
-                final Long sviV2 = identifier.getIdentifier().getAsLong();
+                final Long sviV2 = identifier.getSchemaVersionId().getAsLong();
                 final ByteBuffer bbv2 = ByteBuffer.allocate(9);
                 bbv2.put((byte) 2);
                 bbv2.putLong(sviV2);
                 out.write(bbv2.array());
                 return;
             case 3:
-                final Long sviV3 = identifier.getIdentifier().getAsLong();
+                final Long sviV3 = identifier.getSchemaVersionId().getAsLong();
                 final ByteBuffer bbv3 = ByteBuffer.allocate(5);
                 bbv3.put((byte) 3);
                 bbv3.putInt(sviV3.intValue());
                 out.write(bbv3.array());
                 return;
             default:
-                throw new IOException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
-                        + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + identifier.getProtocol() + " or was not encoded with this data format");
+                // Can't reach this point
+                throw new IllegalStateException("Unknown Protocol Version: " + this.protocolVersion);
         }
-
-
     }
 
     @Override
@@ -79,19 +85,33 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
     public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
         final SchemaIdentifier identifier = schema.getIdentifier();
 
-        if(!identifier.getSchemaVersionId().isPresent()) {
-            if (!identifier.getIdentifier().isPresent()) {
-                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
-            }
-            if (!identifier.getVersion().isPresent()) {
-                throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
-            }
+        switch (protocolVersion) {
+            case 1:
+                if (!identifier.getIdentifier().isPresent()) {
+                    throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier " +
+                            "is not known and is required for Protocol Version " + protocolVersion);
+                }
+                if (!identifier.getVersion().isPresent()) {
+                    throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version " +
+                            "is not known and is required for Protocol Version " + protocolVersion);
+                }
+                break;
+            case 2:
+            case 3:
+                if (!identifier.getSchemaVersionId().isPresent()) {
+                    throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version Identifier " +
+                            "is not known and is required for Protocol Version " + protocolVersion);
+                }
+                break;
+            default:
+                // Can't reach this point
+                throw new SchemaNotFoundException("Unknown Protocol Version: " + protocolVersion);
         }
     }
 
     @Override
     public Set<SchemaField> getRequiredSchemaFields() {
-        return requiredSchemaFields;
+        return HortonworksProtocolVersions.getRequiredSchemaFields(protocolVersion);
     }
 
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java
new file mode 100644
index 0000000..0fe0e83
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Constants related to Protocol Versions for Hortonworks Schema Registry.
+ */
+public class HortonworksProtocolVersions {
+
+    /**
+     * The minimum valid protocol version.
+     */
+    public static final int MIN_VERSION = 1;
+
+    /**
+     * The maximum valid protocol version.
+     */
+    public static final int MAX_VERSION = 3;
+
+    /**
+     * Map from protocol version to the required schema fields for the given version.
+     */
+    private static final Map<Integer, Set<SchemaField>> REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL;
+    static {
+        final Map<Integer,Set<SchemaField>> requiredFieldsByProtocol = new HashMap<>();
+        requiredFieldsByProtocol.put(1, EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION));
+        requiredFieldsByProtocol.put(2, EnumSet.of(SchemaField.SCHEMA_VERSION_ID));
+        requiredFieldsByProtocol.put(3, EnumSet.of(SchemaField.SCHEMA_VERSION_ID));
+        REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL = Collections.unmodifiableMap(requiredFieldsByProtocol);
+    }
+
+    public static Set<SchemaField> getRequiredSchemaFields(final Integer protocolVersion) {
+        return REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL.get(protocolVersion);
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
index f613ab8..f9dbb20 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
@@ -42,7 +42,11 @@ public class AbstractSchemaAccessStrategyTest {
         fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
 
         final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
-                .name("person").branch("master").version(1).id(1L).build();
+                .name("person")
+                .branch("master")
+                .version(1)
+                .id(1L)
+                .build();
 
         this.recordSchema = new SimpleRecordSchema(fields, schemaIdentifier);
     }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
index a651a06..f41ccdf 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.when;
 public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSchemaAccessStrategyTest {
 
     @Test
-    public void testGetSchemaWithValidAttributes() throws IOException, SchemaNotFoundException {
+    public void testGetSchemaWithValidSchemaIdVersionAndProtocol() throws IOException, SchemaNotFoundException {
         final long schemaId = 123456;
         final int version = 2;
         final int protocol = 1;
@@ -56,9 +56,115 @@ public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSch
         assertNotNull(retrievedSchema);
     }
 
+    @Test
+    public void testGetSchemaWithValidSchemaVersionIdAndProtocol() throws IOException, SchemaNotFoundException {
+        final long schemaVersionId = 9999;
+        final int protocol = 2;
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
+
+        final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+
+        final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
+                .schemaVersionId(schemaVersionId)
+                .build();
+
+        when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                .thenReturn(recordSchema);
+
+        final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+        assertNotNull(retrievedSchema);
+    }
+
+    @Test
+    public void testGetSchemaWithAllAttributes() throws IOException, SchemaNotFoundException {
+        final long schemaId = 123456;
+        final int version = 2;
+        final long schemaVersionId = 9999;
+        final int protocol = 2;
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
+
+        final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+
+        // The schema version id should take precedence
+        final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
+                .schemaVersionId(schemaVersionId)
+                .build();
+
+        when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                .thenReturn(recordSchema);
+
+        final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+        assertNotNull(retrievedSchema);
+    }
+
     @Test(expected = SchemaNotFoundException.class)
-    public void testGetSchemaMissingAttributes() throws IOException, SchemaNotFoundException {
+    public void testGetSchemaMissingAllAttributes() throws IOException, SchemaNotFoundException {
         final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
         schemaAccessStrategy.getSchema(Collections.emptyMap(), null, recordSchema);
     }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testGetSchemaMissingProtocol() throws IOException, SchemaNotFoundException {
+        final long schemaId = 123456;
+        final int version = 2;
+        final long schemaVersionId = 9999;
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+
+        final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+        schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testGetSchemaWithInvalidProtocol() throws IOException, SchemaNotFoundException {
+        final long schemaId = 123456;
+        final int version = 2;
+        final long schemaVersionId = 9999;
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, "INVALID_PROTOCOL");
+
+        final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+        schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testGetSchemaNotFound() throws IOException, SchemaNotFoundException {
+        final long schemaId = 123456;
+        final int version = 2;
+        final long schemaVersionId = 9999;
+        final int protocol = 2;
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
+
+        final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+
+        // The schema version id should take precedence
+        final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
+                .schemaVersionId(schemaVersionId)
+                .build();
+
+        when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                .thenReturn(null);
+
+        schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+    }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
index ea3be57..dd7e034 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
@@ -31,29 +31,75 @@ import java.util.Map;
 public class TestHortonworksAttributeSchemaReferenceWriter {
 
     @Test
-    public void testValidateWithValidSchema() throws SchemaNotFoundException {
+    public void testValidateWithProtocol1AndValidSchema() throws SchemaNotFoundException {
         final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
         final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
 
-        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
         schemaAccessWriter.validateSchema(recordSchema);
     }
 
     @Test(expected = SchemaNotFoundException.class)
-    public void testValidateWithInvalidSchema() throws SchemaNotFoundException {
+    public void testValidateWithProtocol1AndMissingSchemaId() throws SchemaNotFoundException {
         final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
         final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
 
-        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol1AndMissingSchemaName() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test
+    public void testValidateWithProtocol2AndValidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(2);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol2AndMissingSchemaVersionId() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(2);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test
+    public void testValidateWithProtocol3AndValidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(3);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol3AndMissingSchemaVersionId() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(3);
         schemaAccessWriter.validateSchema(recordSchema);
     }
 
     @Test
-    public void testGetAttributesWithoutBranch() {
+    public void testGetAttributesWithProtocol1() {
+        final Integer protocolVersion = 1;
         final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
         final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
 
-        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
         final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
 
         Assert.assertEquals(3, attributes.size());
@@ -64,16 +110,17 @@ public class TestHortonworksAttributeSchemaReferenceWriter {
         Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
                 attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
 
-        Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION),
+        Assert.assertEquals(String.valueOf(protocolVersion),
                 attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
     }
 
     @Test
-    public void testGetAttributesWithBranch() {
+    public void testGetAttributesWithProtocol1AndBranch() {
+        final Integer protocolVersion = 1;
         final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).branch("foo").build();
         final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
 
-        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
         final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
 
         Assert.assertEquals(4, attributes.size());
@@ -84,10 +131,52 @@ public class TestHortonworksAttributeSchemaReferenceWriter {
         Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
                 attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
 
-        Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION),
+        Assert.assertEquals(String.valueOf(protocolVersion),
                 attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
 
-        Assert.assertEquals("foo", attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE));
+        Assert.assertEquals(schemaIdentifier.getBranch().get(),
+                attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE));
+    }
+
+    @Test
+    public void testGetAttributesWithProtocol2() {
+        final Integer protocolVersion = 2;
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
+        final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
+
+        Assert.assertEquals(2, attributes.size());
+
+        Assert.assertEquals(String.valueOf(protocolVersion),
+                attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
+
+        Assert.assertEquals(String.valueOf(schemaIdentifier.getSchemaVersionId().getAsLong()),
+                attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE));
+    }
+
+    @Test
+    public void testGetAttributesWithProtocol3() {
+        final Integer protocolVersion = 3;
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
+        final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
+
+        Assert.assertEquals(2, attributes.size());
+
+        Assert.assertEquals(String.valueOf(protocolVersion),
+                attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
+
+        Assert.assertEquals(String.valueOf(schemaIdentifier.getSchemaVersionId().getAsLong()),
+                attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidProtocolVersion() {
+        new HortonworksAttributeSchemaReferenceWriter(99);
     }
 
     private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
index b0589ea..524c8b2 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
@@ -18,6 +18,8 @@
 package org.apache.nifi.schema.access;
 
 import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.junit.Test;
@@ -26,17 +28,71 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
 public class TestHortonworksEncodedSchemaReferenceWriter {
 
     @Test
-    public void testHeader() throws IOException {
-        final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter();
+    public void testEncodeProtocolVersion1() throws IOException {
+        final long id = 48;
+        final int version = 2;
+        final int protocolVersion = 1;
 
-        final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(), SchemaIdentifier.builder().name("name").id( 48L).version( 2).build());
+        final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
+
+        final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
+                SchemaIdentifier.builder().name("name").id(id).version(version).build());
+
+        final byte[] header;
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            writer.writeHeader(schema, baos);
+            header = baos.toByteArray();
+        }
+
+        try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
+            assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
+            assertEquals(id, dis.readLong()); // verify schema id
+            assertEquals(version, dis.readInt()); // verify schema version
+            assertEquals(-1, dis.read()); // no more bytes
+        }
+    }
+
+    @Test
+    public void testEncodeProtocolVersion2() throws IOException {
+        final long schemaVersionId = 123;
+        final int protocolVersion = 2;
+
+        final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
+
+        final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
+                SchemaIdentifier.builder().schemaVersionId(schemaVersionId).build());
+
+        final byte[] header;
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            writer.writeHeader(schema, baos);
+            header = baos.toByteArray();
+        }
+
+        try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
+            assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
+            assertEquals(schemaVersionId, dis.readLong()); // verify schema version id
+            assertEquals(-1, dis.read()); // no more bytes
+        }
+    }
+
+    @Test
+    public void testEncodeProtocolVersion3() throws IOException {
+        final int schemaVersionId = 123;
+        final int protocolVersion = 3;
+
+        final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
+
+        final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
+                SchemaIdentifier.builder().schemaVersionId((long)schemaVersionId).build());
 
         final byte[] header;
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
@@ -45,11 +101,79 @@ public class TestHortonworksEncodedSchemaReferenceWriter {
         }
 
         try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
-            assertEquals(1, dis.read()); // verify 'protocol version'
-            assertEquals(48, dis.readLong()); // verify schema id
-            assertEquals(2, dis.readInt()); // verify schema version
+            assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
+            assertEquals(schemaVersionId, dis.readInt()); // verify schema version id
             assertEquals(-1, dis.read()); // no more bytes
         }
     }
 
+    @Test
+    public void testValidateWithProtocol1AndValidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol1AndMissingSchemaId() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol1AndMissingSchemaName() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test
+    public void testValidateWithProtocol2AndValidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(2);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol2AndMissingSchemaVersionId() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(2);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test
+    public void testValidateWithProtocol3AndValidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(3);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithProtocol3AndMissingSchemaVersionId() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(3);
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
+        return new SimpleRecordSchema(fields, schemaIdentifier);
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index c26c55a..317c5e6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -426,7 +426,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
                 .name(schemaName.get())
                 .branch(schemaBranchName.orElse(null))
                 .version(versionInfo.getVersion())
-                .protocol(schemaIdentifier.getProtocol())
+                .schemaVersionId(versionInfo.getId())
                 .build();
 
         final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@@ -477,7 +477,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
                 .name(schemaName)
                 .id(schemaId.getAsLong())
                 .version(version.getAsInt())
-                .protocol(schemaIdentifier.getProtocol())
+                .schemaVersionId(versionInfo.getId())
                 .build();
 
         final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@@ -522,7 +522,6 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
                 .id(versionInfo.getSchemaMetadataId())
                 .version(versionInfo.getVersion())
                 .schemaVersionId(schemaVersionId.getAsLong())
-                .protocol(schemaIdentifier.getProtocol())
                 .build();
 
         final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index 5730ee3..08c0887 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -148,7 +148,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
             if (AVRO_EMBEDDED.getValue().equals(strategyValue)) {
                 return new WriteAvroResultWithSchema(avroSchema, out, getCodecFactory(compressionFormat));
             } else {
-                return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out, encoderPool, getLogger());
+                return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema, variables), out, encoderPool, getLogger());
             }
         } catch (final SchemaNotFoundException e) {
             throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
index 9326d8a..ca934a7 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
@@ -17,13 +17,6 @@
 
 package org.apache.nifi.csv;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.csv.CSVFormat;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -37,6 +30,13 @@ import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 @Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"})
 @CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written "
     + "will be the column names (unless the 'Include Header Line' property is false). All subsequent lines will be the values "
@@ -92,7 +92,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R
             csvFormat = CSVUtils.createCSVFormat(context, variables);
         }
 
-        return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out,
+        return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema, variables), out,
             getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet);
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
index 864574f..21ff5c6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -17,14 +17,6 @@
 
 package org.apache.nifi.json;
 
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.compress.compressors.CompressorException;
 import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -49,6 +41,14 @@ import org.tukaani.xz.XZOutputStream;
 import org.xerial.snappy.SnappyFramedOutputStream;
 import org.xerial.snappy.SnappyOutputStream;
 
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
 @Tags({"json", "resultset", "writer", "serialize", "record", "recordset", "row"})
 @CapabilityDescription("Writes the results of a RecordSet as either a JSON Array or one JSON object per line. If using Array output, then even if the RecordSet "
         + "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.")
@@ -210,7 +210,7 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
             throw new IOException(e);
         }
 
-        return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), compressionOut, prettyPrint, nullSuppression, outputGrouping,
+        return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema, variables), compressionOut, prettyPrint, nullSuppression, outputGrouping,
                 getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeTypeRef);
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java
index 8787498..6ad7006 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java
@@ -203,7 +203,7 @@ public class XMLRecordSetWriter extends DateTimeTextRecordSetWriter implements R
 
         final String charSet = getConfigurationContext().getProperty(CHARACTER_SET).getValue();
 
-        return new WriteXMLResult(schema, getSchemaAccessWriter(schema),
+        return new WriteXMLResult(schema, getSchemaAccessWriter(schema, variables),
                 out, prettyPrint, nullSuppressionEnum, arrayWrappingEnum, arrayTagName, rootTagName, recordTagName, charSet,
                 getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
     }