You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/03/20 14:57:13 UTC
[nifi] 02/02: NIFI-7221 Support v2 and v3 protocol version for
Hortonworks Schema Registry - Update
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
- Addressing review feedback
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 2feeb57159799928bdaebd3bc272f9e924a29a2f
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Thu Mar 5 15:52:08 2020 -0500
NIFI-7221 Support v2 and v3 protocol version for Hortonworks Schema Registry
- Update nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
- Addressing review feedback
This closes #4120.
---
.../serialization/record/SchemaIdentifier.java | 9 +-
.../record/StandardSchemaIdentifier.java | 25 +---
.../SchemaRegistryRecordSetWriter.java | 72 +++++++++--
...ortonworksAttributeSchemaReferenceStrategy.java | 71 ++++++-----
.../HortonworksAttributeSchemaReferenceWriter.java | 77 ++++++++----
.../HortonworksEncodedSchemaReferenceStrategy.java | 30 ++---
.../HortonworksEncodedSchemaReferenceWriter.java | 62 ++++++----
.../schema/access/HortonworksProtocolVersions.java | 56 +++++++++
.../access/AbstractSchemaAccessStrategyTest.java | 6 +-
...ortonworksAttributeSchemaReferenceStrategy.java | 110 ++++++++++++++++-
...tHortonworksAttributeSchemaReferenceWriter.java | 111 +++++++++++++++--
...estHortonworksEncodedSchemaReferenceWriter.java | 136 ++++++++++++++++++++-
.../hortonworks/HortonworksSchemaRegistry.java | 5 +-
.../org/apache/nifi/avro/AvroRecordSetWriter.java | 2 +-
.../org/apache/nifi/csv/CSVRecordSetWriter.java | 16 +--
.../org/apache/nifi/json/JsonRecordSetWriter.java | 18 +--
.../org/apache/nifi/xml/XMLRecordSetWriter.java | 2 +-
17 files changed, 639 insertions(+), 169 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
index 4fb5371..78cef12 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
@@ -48,13 +48,8 @@ public interface SchemaIdentifier {
*/
Optional<String> getBranch();
- /**
- * @return the protocol used to get this schema identifier
- */
- Integer getProtocol();
-
- SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null, -1);
+ SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null);
static Builder builder() {
return new StandardSchemaIdentifier.Builder();
@@ -75,8 +70,6 @@ public interface SchemaIdentifier {
Builder branch(String branch);
- Builder protocol(Integer protocol);
-
SchemaIdentifier build();
}
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
index 0800982..a9f859b 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
@@ -27,20 +27,14 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
private final OptionalInt version;
private final OptionalLong schemaVersionId;
private final Optional<String> branch;
- private final int protocol;
StandardSchemaIdentifier(final String name, final Long identifier, final Integer version,
- final Long schemaVersionId, final String branch, final int protocol) {
+ final Long schemaVersionId, final String branch) {
this.name = Optional.ofNullable(name);
this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);
this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);
this.schemaVersionId = schemaVersionId == null ? OptionalLong.empty() : OptionalLong.of(schemaVersionId);
this.branch = Optional.ofNullable(branch);
- this.protocol = protocol;
-
- if ((this.name == null && this.identifier == null) || this.schemaVersionId == null) {
- throw new IllegalStateException("Name or Identifier must be provided");
- }
}
@Override
@@ -69,11 +63,6 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
}
@Override
- public Integer getProtocol() {
- return protocol;
- }
-
- @Override
public int hashCode() {
return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode()
+ 41 * getSchemaVersionId().hashCode() + 41 * getBranch().hashCode();
@@ -104,8 +93,7 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
+ "identifier = " + identifier + ", "
+ "version = " + version + ", "
+ "schemaVersionId = " + schemaVersionId + ", "
- + "branch = " + branch + ", "
- + "protocol = " + protocol + " ]";
+ + "branch = " + branch + " ]";
}
/**
@@ -118,7 +106,6 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
private Long identifier;
private Integer version;
private Long schemaVersionId;
- private Integer protocol;
@Override
public SchemaIdentifier.Builder name(final String name) {
@@ -151,14 +138,8 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
}
@Override
- public SchemaIdentifier.Builder protocol(final Integer protocol) {
- this.protocol = protocol;
- return this;
- }
-
- @Override
public SchemaIdentifier build() {
- return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch, protocol);
+ return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch);
}
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
index 57ec05a..567d860 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
@@ -21,9 +21,12 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.ConfluentSchemaRegistryWriter;
import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter;
import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter;
@@ -42,6 +45,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static org.apache.nifi.schema.access.SchemaAccessUtils.INHERIT_RECORD_SCHEMA;
@@ -80,6 +84,17 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
.identifiesControllerService(RecordSchemaCacheService.class)
.build();
+ static final PropertyDescriptor SCHEMA_PROTOCOL_VERSION = new Builder()
+ .name("schema-protocol-version")
+ .displayName("Schema Protocol Version")
+ .description("The protocol version to be used for Schema Write Strategies that require a protocol version, such as Hortonworks Schema Registry strategies. " +
+ "Valid protocol versions for Hortonworks Schema Registry are integer values 1, 2, or 3.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1")
+ .build();
+
/**
* This constant is just a base spec for the actual PropertyDescriptor.
* As it can be overridden by subclasses with different AllowableValues and default value,
@@ -97,9 +112,12 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
private final List<AllowableValue> schemaWriteStrategyList = Collections.unmodifiableList(Arrays.asList(
SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA, NO_SCHEMA));
+
private final List<AllowableValue> schemaAccessStrategyList = Collections.unmodifiableList(Arrays.asList(
SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY));
+ private final Set<String> schemaWriteStrategiesRequiringProtocolVersion = new HashSet<>(Arrays.asList(
+ HWX_CONTENT_ENCODED_SCHEMA.getValue(), HWX_SCHEMA_REF_ATTRIBUTES.getValue()));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -112,6 +130,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
.allowableValues(strategies)
.build());
properties.add(SCHEMA_CACHE);
+ properties.add(SCHEMA_PROTOCOL_VERSION);
properties.addAll(super.getSupportedPropertyDescriptors());
return properties;
@@ -134,10 +153,18 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
public void storeSchemaWriteStrategy(final ConfigurationContext context) {
this.configurationContext = context;
+ // If Schema Protocol Version is specified without EL then we can create it up front, otherwise when
+ // EL is present we will re-create it later so we can re-evaluate the EL against the incoming variables
+
final String strategy = context.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
if (strategy != null) {
final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
- this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService);
+
+ final PropertyValue protocolVersionValue = getConfigurationContext().getProperty(SCHEMA_PROTOCOL_VERSION);
+ if (!protocolVersionValue.isExpressionLanguagePresent()) {
+ final int protocolVersion = context.getProperty(SCHEMA_PROTOCOL_VERSION).asInteger();
+ this.schemaAccessWriter = createSchemaWriteStrategy(strategy, protocolVersion, recordSchemaCacheService);
+ }
}
}
@@ -146,7 +173,27 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
return configurationContext;
}
- protected SchemaAccessWriter getSchemaAccessWriter(final RecordSchema schema) throws SchemaNotFoundException {
+ protected SchemaAccessWriter getSchemaAccessWriter(final RecordSchema schema, final Map<String,String> variables) throws SchemaNotFoundException {
+ // If Schema Protocol Version is using expression language, then we reevaluate against the passed in variables
+ final PropertyValue protocolVersionValue = getConfigurationContext().getProperty(SCHEMA_PROTOCOL_VERSION);
+ if (protocolVersionValue.isExpressionLanguagePresent()) {
+ final int protocolVersion;
+ final String protocolVersionString = protocolVersionValue.evaluateAttributeExpressions(variables).getValue();
+ try {
+ protocolVersion = Integer.parseInt(protocolVersionString);
+ } catch (NumberFormatException nfe) {
+ throw new SchemaNotFoundException("Unable to create Schema Write Strategy because " + SCHEMA_PROTOCOL_VERSION.getDisplayName()
+ + " must be a positive integer, but was '" + protocolVersionString + "'", nfe);
+ }
+
+ // Now recreate the SchemaAccessWriter since we may have a new value for Schema Protocol Version
+ final String strategy = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
+ if (strategy != null) {
+ final RecordSchemaCacheService recordSchemaCacheService = getConfigurationContext().getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
+ schemaAccessWriter = createSchemaWriteStrategy(strategy, protocolVersion, recordSchemaCacheService);
+ }
+ }
+
schemaAccessWriter.validateSchema(schema);
return schemaAccessWriter;
}
@@ -164,8 +211,8 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
return schemaAccessWriter;
}
- private SchemaAccessWriter createSchemaWriteStrategy(final String strategy, final RecordSchemaCacheService recordSchemaCacheService) {
- final SchemaAccessWriter writer = createRawSchemaWriteStrategy(strategy);
+ private SchemaAccessWriter createSchemaWriteStrategy(final String strategy, final Integer protocolVersion, final RecordSchemaCacheService recordSchemaCacheService) {
+ final SchemaAccessWriter writer = createRawSchemaWriteStrategy(strategy, protocolVersion);
if (recordSchemaCacheService == null) {
return writer;
} else {
@@ -173,15 +220,15 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
}
}
- private SchemaAccessWriter createRawSchemaWriteStrategy(final String strategy) {
+ private SchemaAccessWriter createRawSchemaWriteStrategy(final String strategy, final Integer protocolVersion) {
if (strategy.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
return new SchemaNameAsAttribute();
} else if (strategy.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
return new WriteAvroSchemaAttributeStrategy();
} else if (strategy.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
- return new HortonworksEncodedSchemaReferenceWriter();
+ return new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
} else if (strategy.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
- return new HortonworksAttributeSchemaReferenceWriter();
+ return new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
} else if (strategy.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) {
return new ConfluentSchemaRegistryWriter();
} else if (strategy.equalsIgnoreCase(NO_SCHEMA.getValue())) {
@@ -221,6 +268,17 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
.build());
}
+ final String schemaWriteStrategy = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
+ final String protocolVersion = validationContext.getProperty(SCHEMA_PROTOCOL_VERSION).getValue();
+
+ if (schemaWriteStrategy != null && schemaWriteStrategiesRequiringProtocolVersion.contains(schemaWriteStrategy) && protocolVersion == null) {
+ results.add(new ValidationResult.Builder()
+ .subject(SCHEMA_PROTOCOL_VERSION.getDisplayName())
+ .valid(false)
+ .explanation("The configured Schema Write Strategy requires a Schema Protocol Version to be specified.")
+ .build());
+ }
+
return results;
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
index b20d683..cb03778 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
@@ -17,6 +17,10 @@
package org.apache.nifi.schema.access;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
@@ -24,10 +28,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy {
private final Set<SchemaField> schemaFields;
@@ -37,7 +37,6 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
public static final String SCHEMA_VERSION_ID_ATTRIBUTE = "schema.version.id";
private final SchemaRegistry schemaRegistry;
- static final int LATEST_PROTOCOL_VERSION = 3;
public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
@@ -55,44 +54,52 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
@Override
public RecordSchema getSchema(Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
- final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
- final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
- final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
- if ((schemaVersionId == null && (schemaIdentifier == null || schemaVersion == null)) || schemaProtocol == null) {
- throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because it is missing one of the following three required attributes: "
- + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
- }
-
if (!isNumber(schemaProtocol)) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Protocol Version number");
}
final int protocol = Integer.parseInt(schemaProtocol);
- if (protocol > LATEST_PROTOCOL_VERSION) {
- throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
- + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocol + " or was not encoded with this data format");
+ if (protocol < HortonworksProtocolVersions.MIN_VERSION || protocol > HortonworksProtocolVersions.MAX_VERSION) {
+ throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
+ + schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be a value between "
+ + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
}
SchemaIdentifier identifier;
- if (!isNumber(schemaVersionId)) {
- if (!isNumber(schemaIdentifier)) {
- throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
- + schemaProtocol + "', which is not a valid Schema Identifier number");
- }
-
- if (!isNumber(schemaVersion)) {
- throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
- + schemaProtocol + "', which is not a valid Schema Version number");
- }
- final long schemaId = Long.parseLong(schemaIdentifier);
- final int version = Integer.parseInt(schemaVersion);
- identifier = SchemaIdentifier.builder().id(schemaId).version(version).protocol(protocol).build();
- } else {
- final long svi = Long.parseLong(schemaVersionId);
- identifier = SchemaIdentifier.builder().schemaVersionId(svi).protocol(protocol).build();
+ switch (protocol) {
+ case 1:
+ final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
+ if (!isNumber(schemaIdentifier)) {
+ throw new SchemaNotFoundException("Could not determine Schema because " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+ + schemaIdentifier + "', which is not a valid Schema Identifier and is required by Protocol Version " + protocol);
+ }
+
+ final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
+ if (!isNumber(schemaVersion)) {
+ throw new SchemaNotFoundException("Could not determine Schema because " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+ + schemaVersion + "', which is not a valid Schema Version and is required by Protocol Version " + protocol);
+ }
+
+ final long schemaId = Long.parseLong(schemaIdentifier);
+ final int version = Integer.parseInt(schemaVersion);
+ identifier = SchemaIdentifier.builder().id(schemaId).version(version).build();
+ break;
+ case 2:
+ case 3:
+ final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
+ if (!isNumber(schemaVersionId)) {
+ throw new SchemaNotFoundException("Could not determine schema because " + SCHEMA_VERSION_ID_ATTRIBUTE + " has a value of '"
+ + schemaVersionId + "', which is not a valid Schema Version Identifier and is required by Protocol Version " + protocol);
+ }
+
+ final long svi = Long.parseLong(schemaVersionId);
+ identifier = SchemaIdentifier.builder().schemaVersionId(svi).build();
+ break;
+ default:
+ throw new SchemaNotFoundException("Unknown Protocol Version: " + protocol);
}
final RecordSchema schema = schemaRegistry.retrieveSchema(identifier);
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
index ad4558f..24de462 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
@@ -17,21 +17,30 @@
package org.apache.nifi.schema.access;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
import java.io.IOException;
import java.io.OutputStream;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter {
- private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
- static final int LATEST_PROTOCOL_VERSION = 3;
+
static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
+ private final int protocolVersion;
+
+ public HortonworksAttributeSchemaReferenceWriter(final int protocolVersion) {
+ this.protocolVersion = protocolVersion;
+
+ if (this.protocolVersion < HortonworksProtocolVersions.MIN_VERSION || this.protocolVersion > HortonworksProtocolVersions.MAX_VERSION) {
+ throw new IllegalArgumentException("Unknown Protocol Version '" + this.protocolVersion + "'. Protocol Version must be a value between "
+ + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
+ }
+ }
+
@Override
public void writeHeader(RecordSchema schema, OutputStream out) throws IOException {
}
@@ -41,21 +50,29 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
final Map<String, String> attributes = new HashMap<>(4);
final SchemaIdentifier id = schema.getIdentifier();
- final Long schemaId = id.getIdentifier().getAsLong();
- final Integer schemaVersion = id.getVersion().getAsInt();
+ switch (protocolVersion) {
+ case 1:
+ final Long schemaId = id.getIdentifier().getAsLong();
+ final Integer schemaVersion = id.getVersion().getAsInt();
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
+ break;
+ case 2:
+ case 3:
+ final Long schemaVersionId = id.getSchemaVersionId().getAsLong();
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+ break;
+ default:
+ // Can't reach this point
+ throw new IllegalStateException("Unknown Protocol Verison: " + protocolVersion);
+ }
- attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
- attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
- attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(id.getProtocol()));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocolVersion));
if (id.getBranch().isPresent()) {
attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get());
}
- if (id.getSchemaVersionId().isPresent()) {
- attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(id.getSchemaVersionId().getAsLong()));
- }
-
return attributes;
}
@@ -63,19 +80,33 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
final SchemaIdentifier identifier = schema.getIdentifier();
- if(!identifier.getSchemaVersionId().isPresent()) {
- if (!identifier.getIdentifier().isPresent()) {
- throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
- }
- if (!identifier.getVersion().isPresent()) {
- throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
- }
+ switch (protocolVersion) {
+ case 1:
+ if (!identifier.getIdentifier().isPresent()) {
+ throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Identifier " +
+ "is not known and is required for Protocol Version " + protocolVersion);
+ }
+ if (!identifier.getVersion().isPresent()) {
+ throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Version " +
+ "is not known and is required for Protocol Version " + protocolVersion);
+ }
+ break;
+ case 2:
+ case 3:
+ if (!identifier.getSchemaVersionId().isPresent()) {
+ throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Version Identifier " +
+ "is not known and is required for Protocol Version " + protocolVersion);
+ }
+ break;
+ default:
+ // Can't reach this point
+ throw new SchemaNotFoundException("Unknown Protocol Version: " + protocolVersion);
}
}
@Override
public Set<SchemaField> getRequiredSchemaFields() {
- return requiredSchemaFields;
+ return HortonworksProtocolVersions.getRequiredSchemaFields(protocolVersion);
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
index 8f3c1b4..137cee7 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
@@ -17,6 +17,11 @@
package org.apache.nifi.schema.access;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.stream.io.StreamUtils;
+
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -25,13 +30,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-import org.apache.nifi.stream.io.StreamUtils;
-
public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy {
- private static final int LATEST_PROTOCOL_VERSION = 3;
private final Set<SchemaField> schemaFields;
private final SchemaRegistry schemaRegistry;
@@ -42,6 +41,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
schemaFields = new HashSet<>();
schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
schemaFields.add(SchemaField.SCHEMA_VERSION);
+ schemaFields.add(SchemaField.SCHEMA_VERSION_ID);
schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
}
@@ -58,6 +58,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
// See: https://registry-project.readthedocs.io/en/latest/serdes.html#
final ByteBuffer bb = ByteBuffer.wrap(buffer);
final int protocolVersion = bb.get();
+
SchemaIdentifier schemaIdentifier;
switch(protocolVersion) {
@@ -69,11 +70,11 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
} catch (final IOException ioe) {
throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
}
- final ByteBuffer bbv1 = ByteBuffer.wrap(buffer);
+ final ByteBuffer bbv1 = ByteBuffer.wrap(bufferv1);
final long schemaId = bbv1.getLong();
final int schemaVersion = bbv1.getInt();
- schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).protocol(protocolVersion).build();
+ schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
case 2:
@@ -84,10 +85,10 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
} catch (final IOException ioe) {
throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
}
- final ByteBuffer bbv2 = ByteBuffer.wrap(buffer);
+ final ByteBuffer bbv2 = ByteBuffer.wrap(bufferv2);
final long sviLong = bbv2.getLong();
- schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).protocol(protocolVersion).build();
+ schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
case 3:
@@ -98,15 +99,16 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
} catch (final IOException ioe) {
throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
}
- final ByteBuffer bbv3 = ByteBuffer.wrap(buffer);
+ final ByteBuffer bbv3 = ByteBuffer.wrap(bufferv3);
final int sviInt = bbv3.getInt();
- schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).protocol(protocolVersion).build();
+ schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
default:
- throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
- + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
+ throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. Expected Protocol Version to be a value between "
+ + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION
+ + ", but data was encoded with protocol version " + protocolVersion + ".");
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
index 99dbd1f..504515a 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
@@ -17,20 +17,28 @@
package org.apache.nifi.schema.access;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter {
- private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
- private static final int LATEST_PROTOCOL_VERSION = 3;
+
+ private final int protocolVersion;
+
+ public HortonworksEncodedSchemaReferenceWriter(final int protocolVersion) {
+ this.protocolVersion = protocolVersion;
+
+ if (this.protocolVersion < HortonworksProtocolVersions.MIN_VERSION || this.protocolVersion > HortonworksProtocolVersions.MAX_VERSION) {
+ throw new IllegalArgumentException("Unknown Protocol Version '" + this.protocolVersion + "'. Protocol Version must be a value between "
+ + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
+ }
+ }
@Override
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
@@ -38,7 +46,7 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
// This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
// See: https://registry-project.readthedocs.io/en/latest/serdes.html#
- switch(identifier.getProtocol()) {
+ switch(protocolVersion) {
case 1:
final Long id = identifier.getIdentifier().getAsLong();
final Integer version = identifier.getVersion().getAsInt();
@@ -49,25 +57,23 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
out.write(bbv1.array());
return;
case 2:
- final Long sviV2 = identifier.getIdentifier().getAsLong();
+ final Long sviV2 = identifier.getSchemaVersionId().getAsLong();
final ByteBuffer bbv2 = ByteBuffer.allocate(9);
bbv2.put((byte) 2);
bbv2.putLong(sviV2);
out.write(bbv2.array());
return;
case 3:
- final Long sviV3 = identifier.getIdentifier().getAsLong();
+ final Long sviV3 = identifier.getSchemaVersionId().getAsLong();
final ByteBuffer bbv3 = ByteBuffer.allocate(5);
bbv3.put((byte) 3);
bbv3.putInt(sviV3.intValue());
out.write(bbv3.array());
return;
default:
- throw new IOException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
- + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + identifier.getProtocol() + " or was not encoded with this data format");
+ // Can't reach this point
+ throw new IllegalStateException("Unknown Protocol Version: " + this.protocolVersion);
}
-
-
}
@Override
@@ -79,19 +85,33 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
final SchemaIdentifier identifier = schema.getIdentifier();
- if(!identifier.getSchemaVersionId().isPresent()) {
- if (!identifier.getIdentifier().isPresent()) {
- throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
- }
- if (!identifier.getVersion().isPresent()) {
- throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
- }
+ switch (protocolVersion) {
+ case 1:
+ if (!identifier.getIdentifier().isPresent()) {
+ throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier " +
+ "is not known and is required for Protocol Version " + protocolVersion);
+ }
+ if (!identifier.getVersion().isPresent()) {
+ throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version " +
+ "is not known and is required for Protocol Version " + protocolVersion);
+ }
+ break;
+ case 2:
+ case 3:
+ if (!identifier.getSchemaVersionId().isPresent()) {
+ throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version Identifier " +
+ "is not known and is required for Protocol Version " + protocolVersion);
+ }
+ break;
+ default:
+ // Can't reach this point
+ throw new SchemaNotFoundException("Unknown Protocol Version: " + protocolVersion);
}
}
@Override
public Set<SchemaField> getRequiredSchemaFields() {
- return requiredSchemaFields;
+ return HortonworksProtocolVersions.getRequiredSchemaFields(protocolVersion);
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java
new file mode 100644
index 0000000..0fe0e83
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Constants related to Protocol Versions for Hortonworks Schema Registry.
+ */
+public class HortonworksProtocolVersions {
+
+ /**
+ * The minimum valid protocol version.
+ */
+ public static final int MIN_VERSION = 1;
+
+ /**
+ * The maximum valid protocol version.
+ */
+ public static final int MAX_VERSION = 3;
+
+ /**
+ * Map from protocol version to the required schema fields for the given version.
+ */
+ private static final Map<Integer, Set<SchemaField>> REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL;
+ static {
+ final Map<Integer,Set<SchemaField>> requiredFieldsByProtocol = new HashMap<>();
+ requiredFieldsByProtocol.put(1, EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION));
+ requiredFieldsByProtocol.put(2, EnumSet.of(SchemaField.SCHEMA_VERSION_ID));
+ requiredFieldsByProtocol.put(3, EnumSet.of(SchemaField.SCHEMA_VERSION_ID));
+ REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL = Collections.unmodifiableMap(requiredFieldsByProtocol);
+ }
+
+ public static Set<SchemaField> getRequiredSchemaFields(final Integer protocolVersion) {
+ return REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL.get(protocolVersion);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
index f613ab8..f9dbb20 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
@@ -42,7 +42,11 @@ public class AbstractSchemaAccessStrategyTest {
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
- .name("person").branch("master").version(1).id(1L).build();
+ .name("person")
+ .branch("master")
+ .version(1)
+ .id(1L)
+ .build();
this.recordSchema = new SimpleRecordSchema(fields, schemaIdentifier);
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
index a651a06..f41ccdf 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.when;
public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSchemaAccessStrategyTest {
@Test
- public void testGetSchemaWithValidAttributes() throws IOException, SchemaNotFoundException {
+ public void testGetSchemaWithValidSchemaIdVersionAndProtocol() throws IOException, SchemaNotFoundException {
final long schemaId = 123456;
final int version = 2;
final int protocol = 1;
@@ -56,9 +56,115 @@ public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSch
assertNotNull(retrievedSchema);
}
+ @Test
+ public void testGetSchemaWithValidSchemaVersionIdAndProtocol() throws IOException, SchemaNotFoundException {
+ final long schemaVersionId = 9999;
+ final int protocol = 2;
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
+
+ final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+
+ final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
+ .schemaVersionId(schemaVersionId)
+ .build();
+
+ when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+ .thenReturn(recordSchema);
+
+ final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+ assertNotNull(retrievedSchema);
+ }
+
+ @Test
+ public void testGetSchemaWithAllAttributes() throws IOException, SchemaNotFoundException {
+ final long schemaId = 123456;
+ final int version = 2;
+ final long schemaVersionId = 9999;
+ final int protocol = 2;
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
+
+ final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+
+ // The schema version id should take precedence
+ final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
+ .schemaVersionId(schemaVersionId)
+ .build();
+
+ when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+ .thenReturn(recordSchema);
+
+ final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+ assertNotNull(retrievedSchema);
+ }
+
@Test(expected = SchemaNotFoundException.class)
- public void testGetSchemaMissingAttributes() throws IOException, SchemaNotFoundException {
+ public void testGetSchemaMissingAllAttributes() throws IOException, SchemaNotFoundException {
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
schemaAccessStrategy.getSchema(Collections.emptyMap(), null, recordSchema);
}
+
+ @Test(expected = SchemaNotFoundException.class)
+ public void testGetSchemaMissingProtocol() throws IOException, SchemaNotFoundException {
+ final long schemaId = 123456;
+ final int version = 2;
+ final long schemaVersionId = 9999;
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+
+ final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+ schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+ }
+
+ @Test(expected = SchemaNotFoundException.class)
+ public void testGetSchemaWithInvalidProtocol() throws IOException, SchemaNotFoundException {
+ final long schemaId = 123456;
+ final int version = 2;
+ final long schemaVersionId = 9999;
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, "INVALID_PROTOCOL");
+
+ final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+ schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+ }
+
+ @Test(expected = SchemaNotFoundException.class)
+ public void testGetSchemaNotFound() throws IOException, SchemaNotFoundException {
+ final long schemaId = 123456;
+ final int version = 2;
+ final long schemaVersionId = 9999;
+ final int protocol = 2;
+
+ final Map<String,String> attributes = new HashMap<>();
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
+
+ final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+
+ // The schema version id should take precedence
+ final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
+ .schemaVersionId(schemaVersionId)
+ .build();
+
+ when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+ .thenReturn(null);
+
+ schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+ }
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
index ea3be57..dd7e034 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
@@ -31,29 +31,75 @@ import java.util.Map;
public class TestHortonworksAttributeSchemaReferenceWriter {
@Test
- public void testValidateWithValidSchema() throws SchemaNotFoundException {
+ public void testValidateWithProtocol1AndValidSchema() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
- final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
- public void testValidateWithInvalidSchema() throws SchemaNotFoundException {
+ public void testValidateWithProtocol1AndMissingSchemaId() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
- final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
+ schemaAccessWriter.validateSchema(recordSchema);
+ }
+
+ @Test(expected = SchemaNotFoundException.class)
+ public void testValidateWithProtocol1AndMissingSchemaName() throws SchemaNotFoundException {
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
+ schemaAccessWriter.validateSchema(recordSchema);
+ }
+
+ @Test
+ public void testValidateWithProtocol2AndValidSchema() throws SchemaNotFoundException {
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(2);
+ schemaAccessWriter.validateSchema(recordSchema);
+ }
+
+ @Test(expected = SchemaNotFoundException.class)
+ public void testValidateWithProtocol2AndMissingSchemaVersionId() throws SchemaNotFoundException {
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(2);
+ schemaAccessWriter.validateSchema(recordSchema);
+ }
+
+ @Test
+ public void testValidateWithProtocol3AndValidSchema() throws SchemaNotFoundException {
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(3);
+ schemaAccessWriter.validateSchema(recordSchema);
+ }
+
+ @Test(expected = SchemaNotFoundException.class)
+ public void testValidateWithProtocol3AndMissingSchemaVersionId() throws SchemaNotFoundException {
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(3);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test
- public void testGetAttributesWithoutBranch() {
+ public void testGetAttributesWithProtocol1() {
+ final Integer protocolVersion = 1;
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
- final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
Assert.assertEquals(3, attributes.size());
@@ -64,16 +110,17 @@ public class TestHortonworksAttributeSchemaReferenceWriter {
Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
- Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION),
+ Assert.assertEquals(String.valueOf(protocolVersion),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
}
@Test
- public void testGetAttributesWithBranch() {
+ public void testGetAttributesWithProtocol1AndBranch() {
+ final Integer protocolVersion = 1;
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).branch("foo").build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
- final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
Assert.assertEquals(4, attributes.size());
@@ -84,10 +131,52 @@ public class TestHortonworksAttributeSchemaReferenceWriter {
Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
- Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION),
+ Assert.assertEquals(String.valueOf(protocolVersion),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
- Assert.assertEquals("foo", attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE));
+ Assert.assertEquals(schemaIdentifier.getBranch().get(),
+ attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE));
+ }
+
+ @Test
+ public void testGetAttributesWithProtocol2() {
+ final Integer protocolVersion = 2;
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
+ final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
+
+ Assert.assertEquals(2, attributes.size());
+
+ Assert.assertEquals(String.valueOf(protocolVersion),
+ attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
+
+ Assert.assertEquals(String.valueOf(schemaIdentifier.getSchemaVersionId().getAsLong()),
+ attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE));
+ }
+
+ @Test
+ public void testGetAttributesWithProtocol3() {
+ final Integer protocolVersion = 3;
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
+ final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
+
+ Assert.assertEquals(2, attributes.size());
+
+ Assert.assertEquals(String.valueOf(protocolVersion),
+ attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
+
+ Assert.assertEquals(String.valueOf(schemaIdentifier.getSchemaVersionId().getAsLong()),
+ attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidProtocolVersion() {
+ new HortonworksAttributeSchemaReferenceWriter(99);
}
private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
index b0589ea..524c8b2 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
@@ -18,6 +18,8 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Test;
@@ -26,17 +28,71 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import static org.junit.Assert.assertEquals;
public class TestHortonworksEncodedSchemaReferenceWriter {
@Test
- public void testHeader() throws IOException {
- final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter();
+ public void testEncodeProtocolVersion1() throws IOException {
+ final long id = 48;
+ final int version = 2;
+ final int protocolVersion = 1;
- final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(), SchemaIdentifier.builder().name("name").id( 48L).version( 2).build());
+ final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
+
+ final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
+ SchemaIdentifier.builder().name("name").id(id).version(version).build());
+
+ final byte[] header;
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ writer.writeHeader(schema, baos);
+ header = baos.toByteArray();
+ }
+
+ try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
+ assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
+ assertEquals(id, dis.readLong()); // verify schema id
+ assertEquals(version, dis.readInt()); // verify schema version
+ assertEquals(-1, dis.read()); // no more bytes
+ }
+ }
+
+ @Test
+ public void testEncodeProtocolVersion2() throws IOException {
+ final long schemaVersionId = 123;
+ final int protocolVersion = 2;
+
+ final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
+
+ final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
+ SchemaIdentifier.builder().schemaVersionId(schemaVersionId).build());
+
+ final byte[] header;
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ writer.writeHeader(schema, baos);
+ header = baos.toByteArray();
+ }
+
+ try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
+ assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
+ assertEquals(schemaVersionId, dis.readLong()); // verify schema version id
+ assertEquals(-1, dis.read()); // no more bytes
+ }
+ }
+
+ @Test
+ public void testEncodeProtocolVersion3() throws IOException {
+ final int schemaVersionId = 123;
+ final int protocolVersion = 3;
+
+ final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
+
+ final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
+ SchemaIdentifier.builder().schemaVersionId((long)schemaVersionId).build());
final byte[] header;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
@@ -45,11 +101,79 @@ public class TestHortonworksEncodedSchemaReferenceWriter {
}
try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
- assertEquals(1, dis.read()); // verify 'protocol version'
- assertEquals(48, dis.readLong()); // verify schema id
- assertEquals(2, dis.readInt()); // verify schema version
+ assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
+ assertEquals(schemaVersionId, dis.readInt()); // verify schema version id
assertEquals(-1, dis.read()); // no more bytes
}
}
+ @Test
+ public void testValidateWithProtocol1AndValidSchema() throws SchemaNotFoundException {
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
+ schemaAccessWriter.validateSchema(recordSchema);
+ }
+
+ @Test(expected = SchemaNotFoundException.class)
+ public void testValidateWithProtocol1AndMissingSchemaId() throws SchemaNotFoundException {
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
+ schemaAccessWriter.validateSchema(recordSchema);
+ }
+
+ @Test(expected = SchemaNotFoundException.class)
+ public void testValidateWithProtocol1AndMissingSchemaName() throws SchemaNotFoundException {
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
+ schemaAccessWriter.validateSchema(recordSchema);
+ }
+
+ @Test
+ public void testValidateWithProtocol2AndValidSchema() throws SchemaNotFoundException {
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(2);
+ schemaAccessWriter.validateSchema(recordSchema);
+ }
+
+ @Test(expected = SchemaNotFoundException.class)
+ public void testValidateWithProtocol2AndMissingSchemaVersionId() throws SchemaNotFoundException {
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(2);
+ schemaAccessWriter.validateSchema(recordSchema);
+ }
+
+ @Test
+ public void testValidateWithProtocol3AndValidSchema() throws SchemaNotFoundException {
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(3);
+ schemaAccessWriter.validateSchema(recordSchema);
+ }
+
+ @Test(expected = SchemaNotFoundException.class)
+ public void testValidateWithProtocol3AndMissingSchemaVersionId() throws SchemaNotFoundException {
+ final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
+ final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+ final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(3);
+ schemaAccessWriter.validateSchema(recordSchema);
+ }
+
+ private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
+ return new SimpleRecordSchema(fields, schemaIdentifier);
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index c26c55a..317c5e6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -426,7 +426,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.name(schemaName.get())
.branch(schemaBranchName.orElse(null))
.version(versionInfo.getVersion())
- .protocol(schemaIdentifier.getProtocol())
+ .schemaVersionId(versionInfo.getId())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@@ -477,7 +477,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.name(schemaName)
.id(schemaId.getAsLong())
.version(version.getAsInt())
- .protocol(schemaIdentifier.getProtocol())
+ .schemaVersionId(versionInfo.getId())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@@ -522,7 +522,6 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.id(versionInfo.getSchemaMetadataId())
.version(versionInfo.getVersion())
.schemaVersionId(schemaVersionId.getAsLong())
- .protocol(schemaIdentifier.getProtocol())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index 5730ee3..08c0887 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -148,7 +148,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
if (AVRO_EMBEDDED.getValue().equals(strategyValue)) {
return new WriteAvroResultWithSchema(avroSchema, out, getCodecFactory(compressionFormat));
} else {
- return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out, encoderPool, getLogger());
+ return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema, variables), out, encoderPool, getLogger());
}
} catch (final SchemaNotFoundException e) {
throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
index 9326d8a..ca934a7 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
@@ -17,13 +17,6 @@
package org.apache.nifi.csv;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
import org.apache.commons.csv.CSVFormat;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -37,6 +30,13 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
@Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"})
@CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written "
+ "will be the column names (unless the 'Include Header Line' property is false). All subsequent lines will be the values "
@@ -92,7 +92,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R
csvFormat = CSVUtils.createCSVFormat(context, variables);
}
- return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out,
+ return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema, variables), out,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
index 864574f..21ff5c6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -17,14 +17,6 @@
package org.apache.nifi.json;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -49,6 +41,14 @@ import org.tukaani.xz.XZOutputStream;
import org.xerial.snappy.SnappyFramedOutputStream;
import org.xerial.snappy.SnappyOutputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
@Tags({"json", "resultset", "writer", "serialize", "record", "recordset", "row"})
@CapabilityDescription("Writes the results of a RecordSet as either a JSON Array or one JSON object per line. If using Array output, then even if the RecordSet "
+ "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.")
@@ -210,7 +210,7 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
throw new IOException(e);
}
- return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), compressionOut, prettyPrint, nullSuppression, outputGrouping,
+ return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema, variables), compressionOut, prettyPrint, nullSuppression, outputGrouping,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeTypeRef);
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java
index 8787498..6ad7006 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java
@@ -203,7 +203,7 @@ public class XMLRecordSetWriter extends DateTimeTextRecordSetWriter implements R
final String charSet = getConfigurationContext().getProperty(CHARACTER_SET).getValue();
- return new WriteXMLResult(schema, getSchemaAccessWriter(schema),
+ return new WriteXMLResult(schema, getSchemaAccessWriter(schema, variables),
out, prettyPrint, nullSuppressionEnum, arrayWrappingEnum, arrayTagName, rootTagName, recordTagName, charSet,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
}