You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/03/20 14:57:12 UTC
[nifi] 01/02: NIFI-7221 Initial work
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 1fe79021b59fb8763b4af3e911658800bed3d41a
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Wed Mar 4 19:25:26 2020 +0100
NIFI-7221 Initial work
---
.../org/apache/nifi/schema/access/SchemaField.java | 1 +
.../serialization/record/SchemaIdentifier.java | 16 ++++-
.../record/StandardSchemaIdentifier.java | 53 +++++++++++++--
...ortonworksAttributeSchemaReferenceStrategy.java | 52 ++++++++-------
.../HortonworksAttributeSchemaReferenceWriter.java | 29 +++++----
.../HortonworksEncodedSchemaReferenceStrategy.java | 75 ++++++++++++++++------
.../HortonworksEncodedSchemaReferenceWriter.java | 65 ++++++++++++-------
.../hortonworks/HortonworksSchemaRegistry.java | 50 ++++++++++++++-
8 files changed, 257 insertions(+), 84 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
index 1844eea..f577f05 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
@@ -22,6 +22,7 @@ public enum SchemaField {
SCHEMA_TEXT_FORMAT("Schema Text Format"),
SCHEMA_NAME("Schema Name"),
SCHEMA_IDENTIFIER("Schema Identifier"),
+ SCHEMA_VERSION_ID("Schema-Version Identifier"),
SCHEMA_VERSION("Schema Version"),
SCHEMA_BRANCH_NAME("Schema Branch Name");
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
index bca408a..4fb5371 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
@@ -39,12 +39,22 @@ public interface SchemaIdentifier {
OptionalInt getVersion();
/**
+ * @return the schema version ID of the schema, if one has been defined.
+ */
+ OptionalLong getSchemaVersionId();
+
+ /**
* @return the name of the branch where the schema is located, if one has been defined
*/
Optional<String> getBranch();
+ /**
+ * @return the protocol used to get this schema identifier
+ */
+ Integer getProtocol();
+
- SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null);
+ SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null, -1);
static Builder builder() {
return new StandardSchemaIdentifier.Builder();
@@ -61,8 +71,12 @@ public interface SchemaIdentifier {
Builder version(Integer version);
+ Builder schemaVersionId(Long schemaVersionId);
+
Builder branch(String branch);
+ Builder protocol(Integer protocol);
+
SchemaIdentifier build();
}
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
index 712486b..0800982 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
@@ -25,15 +25,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
private final Optional<String> name;
private final OptionalLong identifier;
private final OptionalInt version;
+ private final OptionalLong schemaVersionId;
private final Optional<String> branch;
+ private final int protocol;
- StandardSchemaIdentifier(final String name, final Long identifier, final Integer version, final String branch) {
+ StandardSchemaIdentifier(final String name, final Long identifier, final Integer version,
+ final Long schemaVersionId, final String branch, final int protocol) {
this.name = Optional.ofNullable(name);
- this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);;
- this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);;
+ this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);
+ this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);
+ this.schemaVersionId = schemaVersionId == null ? OptionalLong.empty() : OptionalLong.of(schemaVersionId);
this.branch = Optional.ofNullable(branch);
+ this.protocol = protocol;
- if (this.name == null && this.identifier == null) {
+ if ((this.name == null && this.identifier == null) || this.schemaVersionId == null) {
throw new IllegalStateException("Name or Identifier must be provided");
}
}
@@ -54,13 +59,24 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
}
@Override
+ public OptionalLong getSchemaVersionId() {
+ return schemaVersionId;
+ }
+
+ @Override
public Optional<String> getBranch() {
return branch;
}
@Override
+ public Integer getProtocol() {
+ return protocol;
+ }
+
+ @Override
public int hashCode() {
- return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode() + 41 * getBranch().hashCode();
+ return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode()
+ + 41 * getSchemaVersionId().hashCode() + 41 * getBranch().hashCode();
}
@Override
@@ -78,9 +94,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
return getName().equals(other.getName())
&& getIdentifier().equals(other.getIdentifier())
&& getVersion().equals(other.getVersion())
+ && getSchemaVersionId().equals(other.getSchemaVersionId())
&& getBranch().equals(other.getBranch());
}
+ @Override
+ public String toString() {
+ return "[ name = " + name + ", "
+ + "identifier = " + identifier + ", "
+ + "version = " + version + ", "
+ + "schemaVersionId = " + schemaVersionId + ", "
+ + "branch = " + branch + ", "
+ + "protocol = " + protocol + " ]";
+ }
+
/**
* Builder to create instances of SchemaIdentifier.
*/
@@ -90,6 +117,8 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
private String branch;
private Long identifier;
private Integer version;
+ private Long schemaVersionId;
+ private Integer protocol;
@Override
public SchemaIdentifier.Builder name(final String name) {
@@ -116,8 +145,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
}
@Override
+ public SchemaIdentifier.Builder schemaVersionId(final Long schemaVersionId) {
+ this.schemaVersionId = schemaVersionId;
+ return this;
+ }
+
+ @Override
+ public SchemaIdentifier.Builder protocol(final Integer protocol) {
+ this.protocol = protocol;
+ return this;
+ }
+
+ @Override
public SchemaIdentifier build() {
- return new StandardSchemaIdentifier(name, identifier, version, branch);
+ return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch, protocol);
}
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
index f4fdfcb..b20d683 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
@@ -17,10 +17,6 @@
package org.apache.nifi.schema.access;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
@@ -28,16 +24,20 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy {
private final Set<SchemaField> schemaFields;
public static final String SCHEMA_ID_ATTRIBUTE = "schema.identifier";
public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version";
public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = "schema.protocol.version";
-
+ public static final String SCHEMA_VERSION_ID_ATTRIBUTE = "schema.version.id";
private final SchemaRegistry schemaRegistry;
-
+ static final int LATEST_PROTOCOL_VERSION = 3;
public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
@@ -45,6 +45,7 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
schemaFields = new HashSet<>();
schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
schemaFields.add(SchemaField.SCHEMA_VERSION);
+ schemaFields.add(SchemaField.SCHEMA_VERSION_ID);
schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
}
@@ -57,7 +58,8 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
- if (schemaIdentifier == null || schemaVersion == null || schemaProtocol == null) {
+ final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
+ if ((schemaVersionId == null && (schemaIdentifier == null || schemaVersion == null)) || schemaProtocol == null) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because it is missing one of the following three required attributes: "
+ SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
}
@@ -68,28 +70,34 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
}
final int protocol = Integer.parseInt(schemaProtocol);
- if (protocol != 1) {
- throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
- + schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be 1.");
+ if (protocol > LATEST_PROTOCOL_VERSION) {
+ throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+ + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocol + " or was not encoded with this data format");
}
- if (!isNumber(schemaIdentifier)) {
- throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
- + schemaProtocol + "', which is not a valid Schema Identifier number");
- }
+ SchemaIdentifier identifier;
+ if (!isNumber(schemaVersionId)) {
+ if (!isNumber(schemaIdentifier)) {
+ throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+ + schemaProtocol + "', which is not a valid Schema Identifier number");
+ }
- if (!isNumber(schemaVersion)) {
- throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
- + schemaProtocol + "', which is not a valid Schema Version number");
- }
+ if (!isNumber(schemaVersion)) {
+ throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+ + schemaProtocol + "', which is not a valid Schema Version number");
+ }
- final long schemaId = Long.parseLong(schemaIdentifier);
- final int version = Integer.parseInt(schemaVersion);
+ final long schemaId = Long.parseLong(schemaIdentifier);
+ final int version = Integer.parseInt(schemaVersion);
+ identifier = SchemaIdentifier.builder().id(schemaId).version(version).protocol(protocol).build();
+ } else {
+ final long svi = Long.parseLong(schemaVersionId);
+ identifier = SchemaIdentifier.builder().schemaVersionId(svi).protocol(protocol).build();
+ }
- final SchemaIdentifier identifier = SchemaIdentifier.builder().id(schemaId).version(version).build();
final RecordSchema schema = schemaRegistry.retrieveSchema(identifier);
if (schema == null) {
- throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + version + "'");
+ throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + identifier.toString() + "'");
}
return schema;
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
index bea2f96..ad4558f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
@@ -17,9 +17,6 @@
package org.apache.nifi.schema.access;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
import java.io.IOException;
import java.io.OutputStream;
import java.util.EnumSet;
@@ -27,9 +24,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter {
private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
- static final int LATEST_PROTOCOL_VERSION = 1;
+ static final int LATEST_PROTOCOL_VERSION = 3;
static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
@Override
@@ -46,23 +46,30 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
- attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(LATEST_PROTOCOL_VERSION));
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(id.getProtocol()));
if (id.getBranch().isPresent()) {
attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get());
}
+ if (id.getSchemaVersionId().isPresent()) {
+ attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(id.getSchemaVersionId().getAsLong()));
+ }
+
return attributes;
}
@Override
public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
- final SchemaIdentifier id = schema.getIdentifier();
- if (!id.getIdentifier().isPresent()) {
- throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Identifier");
- }
- if (!id.getVersion().isPresent()) {
- throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Version");
+ final SchemaIdentifier identifier = schema.getIdentifier();
+
+ if(!identifier.getSchemaVersionId().isPresent()) {
+ if (!identifier.getIdentifier().isPresent()) {
+ throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
+ }
+ if (!identifier.getVersion().isPresent()) {
+ throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
index 077016a..8f3c1b4 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
@@ -17,11 +17,6 @@
package org.apache.nifi.schema.access;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-import org.apache.nifi.stream.io.StreamUtils;
-
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@@ -30,8 +25,13 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.stream.io.StreamUtils;
+
public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy {
- private static final int LATEST_PROTOCOL_VERSION = 1;
+ private static final int LATEST_PROTOCOL_VERSION = 3;
private final Set<SchemaField> schemaFields;
private final SchemaRegistry schemaRegistry;
@@ -47,28 +47,67 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
@Override
public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
- final byte[] buffer = new byte[13];
+ final byte[] buffer = new byte[1];
try {
StreamUtils.fillBuffer(contentStream, buffer);
} catch (final IOException ioe) {
- throw new SchemaNotFoundException("Could not read first 13 bytes from stream", ioe);
+ throw new SchemaNotFoundException("Could not read first byte from stream", ioe);
}
// This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
- // as it is provided at:
- // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
+ // See: https://registry-project.readthedocs.io/en/latest/serdes.html#
final ByteBuffer bb = ByteBuffer.wrap(buffer);
final int protocolVersion = bb.get();
- if (protocolVersion != 1) {
- throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
- + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
- }
+ SchemaIdentifier schemaIdentifier;
+
+ switch(protocolVersion) {
+ case 1:
+ final byte[] bufferv1 = new byte[12];
+
+ try {
+ StreamUtils.fillBuffer(contentStream, bufferv1);
+ } catch (final IOException ioe) {
+ throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
+ }
+ final ByteBuffer bbv1 = ByteBuffer.wrap(buffer);
+
+ final long schemaId = bbv1.getLong();
+ final int schemaVersion = bbv1.getInt();
+ schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).protocol(protocolVersion).build();
+ return schemaRegistry.retrieveSchema(schemaIdentifier);
+
+ case 2:
+ final byte[] bufferv2 = new byte[8];
- final long schemaId = bb.getLong();
- final int schemaVersion = bb.getInt();
+ try {
+ StreamUtils.fillBuffer(contentStream, bufferv2);
+ } catch (final IOException ioe) {
+ throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
+ }
+ final ByteBuffer bbv2 = ByteBuffer.wrap(buffer);
- final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build();
- return schemaRegistry.retrieveSchema(schemaIdentifier);
+ final long sviLong = bbv2.getLong();
+ schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).protocol(protocolVersion).build();
+ return schemaRegistry.retrieveSchema(schemaIdentifier);
+
+ case 3:
+ final byte[] bufferv3 = new byte[4];
+
+ try {
+ StreamUtils.fillBuffer(contentStream, bufferv3);
+ } catch (final IOException ioe) {
+ throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
+ }
+ final ByteBuffer bbv3 = ByteBuffer.wrap(buffer);
+
+ final int sviInt = bbv3.getInt();
+ schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).protocol(protocolVersion).build();
+ return schemaRegistry.retrieveSchema(schemaIdentifier);
+
+ default:
+ throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+ + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
+ }
}
@Override
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
index cb4ed4e..99dbd1f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
@@ -17,38 +17,57 @@
package org.apache.nifi.schema.access;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
-import java.util.OptionalInt;
-import java.util.OptionalLong;
import java.util.Set;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter {
private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
- private static final int LATEST_PROTOCOL_VERSION = 1;
+ private static final int LATEST_PROTOCOL_VERSION = 3;
@Override
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
final SchemaIdentifier identifier = schema.getIdentifier();
- final Long id = identifier.getIdentifier().getAsLong();
- final Integer version = identifier.getVersion().getAsInt();
- // This decoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
- // as it is provided at:
- // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
- final ByteBuffer bb = ByteBuffer.allocate(13);
- bb.put((byte) LATEST_PROTOCOL_VERSION);
- bb.putLong(id);
- bb.putInt(version);
+ // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
+ // See: https://registry-project.readthedocs.io/en/latest/serdes.html#
+ switch(identifier.getProtocol()) {
+ case 1:
+ final Long id = identifier.getIdentifier().getAsLong();
+ final Integer version = identifier.getVersion().getAsInt();
+ final ByteBuffer bbv1 = ByteBuffer.allocate(13);
+ bbv1.put((byte) 1);
+ bbv1.putLong(id);
+ bbv1.putInt(version);
+ out.write(bbv1.array());
+ return;
+ case 2:
+ final Long sviV2 = identifier.getIdentifier().getAsLong();
+ final ByteBuffer bbv2 = ByteBuffer.allocate(9);
+ bbv2.put((byte) 2);
+ bbv2.putLong(sviV2);
+ out.write(bbv2.array());
+ return;
+ case 3:
+ final Long sviV3 = identifier.getIdentifier().getAsLong();
+ final ByteBuffer bbv3 = ByteBuffer.allocate(5);
+ bbv3.put((byte) 3);
+ bbv3.putInt(sviV3.intValue());
+ out.write(bbv3.array());
+ return;
+ default:
+ throw new IOException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+ + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + identifier.getProtocol() + " or was not encoded with this data format");
+ }
+
- out.write(bb.array());
}
@Override
@@ -59,14 +78,14 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
@Override
public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
final SchemaIdentifier identifier = schema.getIdentifier();
- final OptionalLong identifierOption = identifier.getIdentifier();
- if (!identifierOption.isPresent()) {
- throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
- }
- final OptionalInt versionOption = identifier.getVersion();
- if (!versionOption.isPresent()) {
- throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
+ if(!identifier.getSchemaVersionId().isPresent()) {
+ if (!identifier.getIdentifier().isPresent()) {
+ throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
+ }
+ if (!identifier.getVersion().isPresent()) {
+ throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index 0e42afc..c26c55a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -17,6 +17,7 @@
package org.apache.nifi.schemaregistry.hortonworks;
import com.google.common.collect.ImmutableMap;
+import com.hortonworks.registries.schemaregistry.SchemaIdVersion;
import com.hortonworks.registries.schemaregistry.SchemaMetadata;
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
@@ -64,8 +65,13 @@ import java.util.concurrent.TimeUnit;
@Tags({"schema", "registry", "avro", "hortonworks", "hwx"})
@CapabilityDescription("Provides a Schema Registry Service that interacts with a Hortonworks Schema Registry, available at https://github.com/hortonworks/registry")
public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
- private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_BRANCH_NAME, SchemaField.SCHEMA_TEXT,
- SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
+ private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME,
+ SchemaField.SCHEMA_BRANCH_NAME,
+ SchemaField.SCHEMA_TEXT,
+ SchemaField.SCHEMA_TEXT_FORMAT,
+ SchemaField.SCHEMA_IDENTIFIER,
+ SchemaField.SCHEMA_VERSION,
+ SchemaField.SCHEMA_VERSION_ID);
private static final String CLIENT_SSL_PROPERTY_PREFIX = "schema.registry.client.ssl";
@@ -420,6 +426,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.name(schemaName.get())
.branch(schemaBranchName.orElse(null))
.version(versionInfo.getVersion())
+ .protocol(schemaIdentifier.getProtocol())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@@ -470,6 +477,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.name(schemaName)
.id(schemaId.getAsLong())
.version(version.getAsInt())
+ .protocol(schemaIdentifier.getProtocol())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@@ -481,13 +489,49 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
@Override
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
- if (schemaIdentifier.getIdentifier().isPresent()) {
+ if (schemaIdentifier.getSchemaVersionId().isPresent()) {
+ return retrieveSchemaBySchemaVersionId(schemaIdentifier);
+ } else if (schemaIdentifier.getIdentifier().isPresent()) {
return retrieveSchemaByIdAndVersion(schemaIdentifier);
} else {
return retrieveSchemaByName(schemaIdentifier);
}
}
+ private RecordSchema retrieveSchemaBySchemaVersionId(final SchemaIdentifier schemaIdentifier) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
+ final SchemaRegistryClient client = getClient();
+ final OptionalLong schemaVersionId = schemaIdentifier.getSchemaVersionId();
+
+ final SchemaIdVersion svi = new SchemaIdVersion(schemaVersionId.getAsLong());
+
+ final String schemaName;
+ final SchemaVersionInfo versionInfo;
+
+ try {
+ versionInfo = client.getSchemaVersionInfo(svi);
+ schemaName = versionInfo.getName();
+ } catch (final Exception e) {
+ handleException("Failed to retrieve schema with Schema Version ID '" + schemaVersionId.getAsLong() + "'", e);
+ return null;
+ }
+
+ final String schemaText = versionInfo.getSchemaText();
+
+ final SchemaIdentifier resultSchemaIdentifier = SchemaIdentifier.builder()
+ .name(schemaName)
+ .id(versionInfo.getSchemaMetadataId())
+ .version(versionInfo.getVersion())
+ .schemaVersionId(schemaVersionId.getAsLong())
+ .protocol(schemaIdentifier.getProtocol())
+ .build();
+
+ final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
+ return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
+ final Schema schema = new Schema.Parser().parse(schemaText);
+ return AvroTypeUtil.createSchema(schema, schemaText, resultSchemaIdentifier);
+ });
+ }
+
private String createErrorMessage(final String baseMessage, final Optional<String> schemaName, final Optional<String> branchName, final OptionalInt version) {
final StringBuilder builder = new StringBuilder(baseMessage)
.append(" with name '")