You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/04/24 21:03:48 UTC

[2/7] nifi git commit: NIFI-3682: This closes #1682. Add Schema Access Strategy and Schema Write Strategy Record Readers and Writers; bug fixes.

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
new file mode 100644
index 0000000..4eec14e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy {
+    private final Set<SchemaField> schemaFields;
+
+    public static final String SCHEMA_ID_ATTRIBUTE = "schema.identifier";
+    public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version";
+    public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = "schema.protocol.version";
+
+    private final SchemaRegistry schemaRegistry;
+
+
+    public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) {
+        this.schemaRegistry = schemaRegistry;
+
+        schemaFields = new HashSet<>();
+        schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
+        schemaFields.add(SchemaField.SCHEMA_VERSION);
+        schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
+    }
+
+    @Override
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final ConfigurationContext context) throws SchemaNotFoundException, IOException {
+        final String schemaIdentifier = flowFile.getAttribute(SCHEMA_ID_ATTRIBUTE);
+        final String schemaVersion = flowFile.getAttribute(SCHEMA_VERSION_ATTRIBUTE);
+        final String schemaProtocol = flowFile.getAttribute(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
+        if (schemaIdentifier == null || schemaVersion == null || schemaProtocol == null) {
+            throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because it is missing one of the following three required attributes: "
+                + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
+        }
+
+        if (!isNumber(schemaProtocol)) {
+            throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
+                + schemaProtocol + "', which is not a valid Protocol Version number");
+        }
+
+        final int protocol = Integer.parseInt(schemaProtocol);
+        if (protocol != 1) {
+            throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
+                + schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be 1.");
+        }
+
+        if (!isNumber(schemaIdentifier)) {
+            throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+                + schemaProtocol + "', which is not a valid Schema Identifier number");
+        }
+
+        if (!isNumber(schemaVersion)) {
+            throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+                + schemaProtocol + "', which is not a valid Schema Version number");
+        }
+
+        final long schemaId = Long.parseLong(schemaIdentifier);
+        final int version = Integer.parseInt(schemaVersion);
+
+        final RecordSchema schema = schemaRegistry.retrieveSchema(schemaId, version);
+        if (schema == null) {
+            throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + version + "'");
+        }
+
+        return schema;
+    }
+
+    private static boolean isNumber(final String value) {
+        if (value == null) {
+            return false;
+        }
+
+        final String trimmed = value.trim();
+        if (value.isEmpty()) {
+            return false;
+        }
+
+        for (int i = 0; i < trimmed.length(); i++) {
+            final char c = value.charAt(i);
+            if (c > '9' || c < '0') {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return schemaFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
new file mode 100644
index 0000000..f492ec4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter {
+    private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
+    private static final int LATEST_PROTOCOL_VERSION = 1;
+
+    @Override
+    public void writeHeader(RecordSchema schema, OutputStream out) throws IOException {
+    }
+
+    @Override
+    public Map<String, String> getAttributes(final RecordSchema schema) {
+        final Map<String, String> attributes = new HashMap<>(4);
+        final SchemaIdentifier id = schema.getIdentifier();
+
+        final long schemaId = id.getIdentifier().getAsLong();
+        final int schemaVersion = id.getVersion().getAsInt();
+
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
+        attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(LATEST_PROTOCOL_VERSION));
+
+        return attributes;
+    }
+
+    @Override
+    public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
+        final SchemaIdentifier id = schema.getIdentifier();
+        if (!id.getIdentifier().isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Identifier");
+        }
+        if (!id.getVersion().isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Version");
+        }
+    }
+
+    @Override
+    public Set<SchemaField> getRequiredSchemaFields() {
+        return requiredSchemaFields;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
new file mode 100644
index 0000000..081e97c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.stream.io.StreamUtils;
+
+public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy {
+    private static final int LATEST_PROTOCOL_VERSION = 1;
+
+    private final Set<SchemaField> schemaFields;
+    private final SchemaRegistry schemaRegistry;
+
+    public HortonworksEncodedSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) {
+        this.schemaRegistry = schemaRegistry;
+
+        schemaFields = new HashSet<>();
+        schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
+        schemaFields.add(SchemaField.SCHEMA_VERSION);
+        schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
+    }
+
+    @Override
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final ConfigurationContext context) throws SchemaNotFoundException, IOException {
+        final byte[] buffer = new byte[13];
+        try {
+            StreamUtils.fillBuffer(contentStream, buffer);
+        } catch (final IOException ioe) {
+            throw new SchemaNotFoundException("Could not read first 13 bytes from stream", ioe);
+        }
+
+        // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
+        // as it is provided at:
+        // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
+        final ByteBuffer bb = ByteBuffer.wrap(buffer);
+        final int protocolVersion = bb.get();
+        if (protocolVersion != 1) {
+            throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+                + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion);
+        }
+
+        final long schemaId = bb.getLong();
+        final int schemaVersion = bb.getInt();
+
+        return schemaRegistry.retrieveSchema(schemaId, schemaVersion);
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return schemaFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
new file mode 100644
index 0000000..bf6a9ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter {
+    private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
+    private static final int LATEST_PROTOCOL_VERSION = 1;
+
+    @Override
+    public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
+        final SchemaIdentifier identifier = schema.getIdentifier();
+        final long id = identifier.getIdentifier().getAsLong();
+        final int version = identifier.getVersion().getAsInt();
+
+        // This decoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
+        // as it is provided at:
+        // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java
+        final ByteBuffer bb = ByteBuffer.allocate(13);
+        bb.put((byte) LATEST_PROTOCOL_VERSION);
+        bb.putLong(id);
+        bb.putInt(version);
+
+        out.write(bb.array());
+    }
+
+    @Override
+    public Map<String, String> getAttributes(final RecordSchema schema) {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
+        final SchemaIdentifier identifier = schema.getIdentifier();
+        final OptionalLong identifierOption = identifier.getIdentifier();
+        if (!identifierOption.isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
+        }
+
+        final OptionalInt versionOption = identifier.getVersion();
+        if (!versionOption.isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
+        }
+    }
+
+    @Override
+    public Set<SchemaField> getRequiredSchemaFields() {
+        return requiredSchemaFields;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
new file mode 100644
index 0000000..6635e3d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public interface SchemaAccessStrategy {
+    /**
+     * Returns the schema for the given FlowFile using the supplied stream of content and configuration
+     *
+     * @param flowFile flowfile
+     * @param contentStream content of flowfile
+     * @param context configuration
+     * @return the RecordSchema for the FlowFile
+     */
+    RecordSchema getSchema(FlowFile flowFile, InputStream contentStream, ConfigurationContext context) throws SchemaNotFoundException, IOException;
+
+    /**
+     * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #getSchema(FlowFile, InputStream, ConfigurationContext)}.
+     */
+    Set<SchemaField> getSuppliedSchemaFields();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java
new file mode 100644
index 0000000..30a995c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaAccessWriter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public interface SchemaAccessWriter {
+
+    /**
+     * Writes the given Record Schema to the given OutputStream as header information, if appropriate,
+     * or returns without writing anything if the implementation does not need to write information to
+     * the contents of the FlowFile
+     *
+     * @param schema the schema to write
+     * @param out the OutputStream to write to
+     * @throws IOException if unable to write to the given stream
+     */
+    void writeHeader(RecordSchema schema, OutputStream out) throws IOException;
+
+    /**
+     * Returns a Map of String to String that represent the attributes that should be added to the FlowFile, or
+     * an empty map if no attributes should be added.
+     *
+     * @return a Map of attributes to add to the FlowFile.
+     */
+    Map<String, String> getAttributes(RecordSchema schema);
+
+    /**
+     * Ensures that the given schema can be written by this SchemaAccessWriter or throws SchemaNotFoundException if
+     * the schema does not contain sufficient information to be written
+     *
+     * @param schema the schema to validate
+     * @throws SchemaNotFoundException if the schema does not contain sufficient information to be written
+     */
+    void validateSchema(RecordSchema schema) throws SchemaNotFoundException;
+
+    /**
+     * Specifies the set of SchemaField's that are required in order to use this Schema Access Writer
+     *
+     * @return the set of SchemaField's that are required in order to use this Schema Access Writer
+     */
+    Set<SchemaField> getRequiredSchemaFields();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
new file mode 100644
index 0000000..54a248d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
+public class SchemaNameAsAttribute implements SchemaAccessWriter {
+    private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME);
+    private static final String SCHEMA_NAME_ATTRIBUTE = "schema.name";
+
+    @Override
+    public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
+    }
+
+    @Override
+    public Map<String, String> getAttributes(final RecordSchema schema) {
+        final SchemaIdentifier identifier = schema.getIdentifier();
+        final Optional<String> nameOption = identifier.getName();
+        if (nameOption.isPresent()) {
+            return Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, nameOption.get());
+        }
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
+        final SchemaIdentifier schemaId = schema.getIdentifier();
+        if (!schemaId.getName().isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Schema Name As Attribute because the Schema Name is not known");
+        }
+    }
+
+    @Override
+    public Set<SchemaField> getRequiredSchemaFields() {
+        return schemaFields;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
new file mode 100644
index 0000000..bc21c1d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class SchemaNamePropertyStrategy implements SchemaAccessStrategy {
+    private final Set<SchemaField> schemaFields;
+
+    private final SchemaRegistry schemaRegistry;
+    private final PropertyValue schemaNamePropertyValue;
+
+    public SchemaNamePropertyStrategy(final SchemaRegistry schemaRegistry, final PropertyValue schemaNamePropertyValue) {
+        this.schemaRegistry = schemaRegistry;
+        this.schemaNamePropertyValue = schemaNamePropertyValue;
+
+        schemaFields = new HashSet<>();
+        schemaFields.add(SchemaField.SCHEMA_NAME);
+        schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
+    }
+
+    @Override
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final ConfigurationContext context) throws SchemaNotFoundException {
+        final String schemaName = schemaNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue();
+        if (schemaName.trim().isEmpty()) {
+            throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Name.");
+        }
+
+        try {
+            final RecordSchema recordSchema = schemaRegistry.retrieveSchema(schemaName);
+            if (recordSchema == null) {
+                throw new SchemaNotFoundException("Could not find a schema with name '" + schemaName + "' in the configured Schema Registry");
+            }
+
+            return recordSchema;
+        } catch (final Exception e) {
+            throw new SchemaNotFoundException("Could not retrieve schema with name '" + schemaName + "' from the configured Schema Registry", e);
+        }
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return schemaFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
new file mode 100644
index 0000000..f39bdca
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class SchemaTextAsAttribute implements SchemaAccessWriter {
+    private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
+
+    @Override
+    public void writeHeader(final RecordSchema schema, final OutputStream out) {
+    }
+
+    @Override
+    public Map<String, String> getAttributes(final RecordSchema schema) {
+        final Optional<String> textFormatOption = schema.getSchemaFormat();
+        final Optional<String> textOption = schema.getSchemaText();
+        return Collections.singletonMap(textFormatOption.get() + ".schema", textOption.get());
+    }
+
+    @Override
+    public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
+        final Optional<String> textFormatOption = schema.getSchemaFormat();
+        if (!textFormatOption.isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text Format is not present");
+        }
+
+        final Optional<String> textOption = schema.getSchemaText();
+        if (!textOption.isPresent()) {
+            throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text is not present");
+        }
+    }
+
+    @Override
+    public Set<SchemaField> getRequiredSchemaFields() {
+        return schemaFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeTextRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeTextRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeTextRecordSetWriter.java
index 5545090..2260c2e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeTextRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DateTimeTextRecordSetWriter.java
@@ -17,15 +17,14 @@
 
 package org.apache.nifi.serialization;
 
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 
-public abstract class DateTimeTextRecordSetWriter extends AbstractControllerService {
+public abstract class DateTimeTextRecordSetWriter extends SchemaRegistryRecordSetWriter {
 
     private volatile String dateFormat;
     private volatile String timeFormat;
@@ -33,7 +32,11 @@ public abstract class DateTimeTextRecordSetWriter extends AbstractControllerServ
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return Arrays.asList(DateTimeUtils.DATE_FORMAT, DateTimeUtils.TIME_FORMAT, DateTimeUtils.TIMESTAMP_FORMAT);
+        final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(DateTimeUtils.DATE_FORMAT);
+        properties.add(DateTimeUtils.TIME_FORMAT);
+        properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
+        return properties;
     }
 
     @OnEnabled

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordReader.java
deleted file mode 100644
index ee25e64..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordReader.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-public abstract class SchemaRegistryRecordReader extends AbstractControllerService {
-
-    protected static final PropertyDescriptor REQUIRED_SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
-        .name("Schema Registry")
-        .description("Specifies the Controller Service to use for the Schema Registry")
-        .identifiesControllerService(SchemaRegistry.class)
-        .required(true)
-        .build();
-
-    protected static final PropertyDescriptor OPTIONAL_SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
-        .fromPropertyDescriptor(REQUIRED_SCHEMA_REGISTRY)
-        .required(false)
-        .build();
-
-    protected static final PropertyDescriptor REQUIRED_SCHEMA_NAME = new PropertyDescriptor.Builder()
-        .name("Schema Name")
-        .description("Name of the Schema that is stored in the Schema Registry")
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(true)
-        .required(true)
-        .build();
-
-    protected static final PropertyDescriptor OPTIONAL_SCHEMA_NAME = new PropertyDescriptor.Builder()
-        .fromPropertyDescriptor(REQUIRED_SCHEMA_NAME)
-        .required(false)
-        .build();
-
-
-    private volatile SchemaRegistry schemaRegistry;
-    private volatile PropertyValue schemaName;
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>(2);
-        if (isSchemaRequired()) {
-            properties.add(REQUIRED_SCHEMA_REGISTRY);
-            properties.add(REQUIRED_SCHEMA_NAME);
-        } else {
-            properties.add(OPTIONAL_SCHEMA_REGISTRY);
-            properties.add(OPTIONAL_SCHEMA_NAME);
-        }
-
-        return properties;
-    }
-
-    @OnEnabled
-    public void storeRegistryValues(final ConfigurationContext context) {
-        schemaRegistry = context.getProperty(REQUIRED_SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
-        schemaName = context.getProperty(REQUIRED_SCHEMA_NAME);
-    }
-
-    public RecordSchema getSchema(final FlowFile flowFile) {
-        final String evaluatedSchemaName = schemaName.evaluateAttributeExpressions(flowFile).getValue();
-        final RecordSchema schema = schemaRegistry.retrieveSchema(evaluatedSchemaName);
-        return schema;
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        if (validationContext.getProperty(OPTIONAL_SCHEMA_REGISTRY).isSet() && !validationContext.getProperty(OPTIONAL_SCHEMA_NAME).isSet()) {
-            return Collections.singleton(new ValidationResult.Builder()
-                .subject("Schema Registry")
-                .explanation("If the Schema Registry is configured, the Schema name must also be configured")
-                .valid(false)
-                .build());
-        }
-
-        return Collections.emptyList();
-    }
-
-    protected boolean isSchemaRequired() {
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
new file mode 100644
index 0000000..c9daded
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter;
+import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter;
+import org.apache.nifi.schema.access.SchemaAccessWriter;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNameAsAttribute;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.access.SchemaTextAsAttribute;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryService {
+
+    static final AllowableValue SCHEMA_NAME_ATTRIBUTE = new AllowableValue("schema-name", "Set 'schema.name' Attribute",
+        "The FlowFile will be given an attribute named 'schema.name' and this attribute will indicate the name of the schema in the Schema Registry. Note that if"
+            + "the schema for a record is not obtained from a Schema Registry, then no attribute will be added.");
+    static final AllowableValue AVRO_SCHEMA_ATTRIBUTE = new AllowableValue("full-schema-attribute", "Set 'avro.schema' Attribute",
+        "The FlowFile will be given an attribute named 'avro.schema' and this attribute will contain the Avro Schema that describes the records in the FlowFile. "
+            + "The contents of the FlowFile need not be Avro, but the text of the schema will be used.");
+    static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema Reference",
+        "The content of the FlowFile will contain a reference to a schema in the Schema Registry service. The reference is encoded as a single byte indicating the 'protocol version', "
+            + "followed by 8 bytes indicating the schema identifier, and finally 4 bytes indicating the schema version, as per the Hortonworks Schema Registry serializers and deserializers, "
+            + "as found at https://github.com/hortonworks/registry. "
+            + "This will be prepended to each FlowFile. Note that "
+            + "if the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data.");
+    static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
+        "The FlowFile will be given a set of 3 attributes to describe the schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. Note that if "
+            + "the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data.");
+
+    protected static final PropertyDescriptor SCHEMA_WRITE_STRATEGY = new PropertyDescriptor.Builder()
+        .name("Schema Write Strategy")
+        .description("Specifies how the schema for a Record should be added to the data.")
+        .allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
+        .defaultValue(AVRO_SCHEMA_ATTRIBUTE.getValue())
+        .required(true)
+        .build();
+
+
+    private volatile ConfigurationContext configurationContext;
+    private volatile SchemaAccessWriter schemaAccessWriter;
+
+    private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA));
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+
+        final AllowableValue[] strategies = getSchemaWriteStrategyValues().toArray(new AllowableValue[0]);
+        properties.add(new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(SCHEMA_WRITE_STRATEGY)
+            .defaultValue(getDefaultSchemaWriteStrategy().getValue())
+            .allowableValues(strategies)
+            .build());
+        properties.addAll(super.getSupportedPropertyDescriptors());
+
+        return properties;
+    }
+
+    protected AllowableValue getDefaultSchemaWriteStrategy() {
+        return AVRO_SCHEMA_ATTRIBUTE;
+    }
+
+    protected PropertyDescriptor getSchemaWriteStrategyDescriptor() {
+        return getPropertyDescriptor(SCHEMA_WRITE_STRATEGY.getName());
+    }
+
+    @OnEnabled
+    public void storeSchemaWriteStrategy(final ConfigurationContext context) {
+        this.configurationContext = context;
+
+        final String writerValue = context.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
+        this.schemaAccessWriter = getSchemaWriteStrategy(writerValue);
+    }
+
+    @Override
+    protected ConfigurationContext getConfigurationContext() {
+        return configurationContext;
+    }
+
+    protected SchemaAccessWriter getSchemaAccessWriter(final RecordSchema schema) throws SchemaNotFoundException {
+        schemaAccessWriter.validateSchema(schema);
+        return schemaAccessWriter;
+    }
+
+    protected List<AllowableValue> getSchemaWriteStrategyValues() {
+        return strategyList;
+    }
+
+    protected SchemaAccessWriter getSchemaWriteStrategy(final String allowableValue) {
+        if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
+            return new SchemaNameAsAttribute();
+        } else if (allowableValue.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
+            return new SchemaTextAsAttribute();
+        } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
+            return new HortonworksEncodedSchemaReferenceWriter();
+        } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
+            return new HortonworksAttributeSchemaReferenceWriter();
+        }
+
+        return null;
+    }
+
+    protected Set<SchemaField> getRequiredSchemaFields(final ValidationContext validationContext) {
+        final String writeStrategyValue = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
+        final SchemaAccessWriter writer = getSchemaWriteStrategy(writeStrategyValue);
+        final Set<SchemaField> requiredFields = writer.getRequiredSchemaFields();
+        return requiredFields;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        final Set<SchemaField> suppliedFields = getSuppliedSchemaFields(validationContext);
+        final Set<SchemaField> requiredFields = getRequiredSchemaFields(validationContext);
+
+        final Set<SchemaField> missingFields = new HashSet<>(requiredFields);
+        missingFields.removeAll(suppliedFields);
+
+        if (!missingFields.isEmpty()) {
+            results.add(new ValidationResult.Builder()
+                .subject("Schema Access Strategy")
+                .valid(false)
+                .explanation("The configured Schema Write Strategy requires the " + missingFields.iterator().next()
+                    + " but the configured Schema Access Strategy does not provide this information in conjunction with the selected Schema Registry. "
+                    + "This Schema Access Strategy, as configured, cannot be used in conjunction with this Schema Write Strategy.")
+                .build());
+        }
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
new file mode 100644
index 0000000..0988935
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.avro.AvroSchemaValidator;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.AvroSchemaTextStrategy;
+import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceStrategy;
+import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceStrategy;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNamePropertyStrategy;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public abstract class SchemaRegistryService extends AbstractControllerService {
+
+    static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property",
+        "The name of the Schema to use is specified by the 'Schema Name' Property. The value of this property is used to lookup the Schema in the configured Schema Registry service.");
+    static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use 'Schema Text' Property",
+        "The text of the Schema itself is specified by the 'Schema Text' Property. The value of this property must be a valid Avro Schema. "
+            + "If Expression Language is used, the value of the 'Schema Text' property must be valid after substituting the expressions.");
+    static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema Reference",
+        "The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single byte indicating the 'protocol version', "
+            + "followed by 8 bytes indicating the schema identifier, and finally 4 bytes indicating the schema version, as per the Hortonworks Schema Registry serializers and deserializers, "
+            + "found at https://github.com/hortonworks/registry");
+    static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
+        "The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
+
+    protected static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
+        .name("Schema Registry")
+        .description("Specifies the Controller Service to use for the Schema Registry")
+        .identifiesControllerService(SchemaRegistry.class)
+        .required(false)
+        .build();
+
+    protected static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
+        .name("Schema Access Strategy")
+        .description("Specifies how to obtain the schema that is to be used for interpreting the data.")
+        .allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
+        .defaultValue(SCHEMA_NAME_PROPERTY.getValue())
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("Schema Name")
+        .description("Specifies the name of the schema to lookup in the Schema Registry property")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .defaultValue("${schema.name}")
+        .required(false)
+        .build();
+
+    static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
+        .name("schema-text")
+        .displayName("Schema Text")
+        .description("The text of an Avro-formatted Schema")
+        .addValidator(new AvroSchemaValidator())
+        .expressionLanguageSupported(true)
+        .defaultValue("${avro.schema}")
+        .required(false)
+        .build();
+
+
+    private volatile ConfigurationContext configurationContext;
+    private volatile SchemaAccessStrategy schemaAccessStrategy;
+
+    private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA));
+
+    private PropertyDescriptor getSchemaAcessStrategyDescriptor() {
+        return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(2);
+
+        final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
+        properties.add(new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
+            .allowableValues(strategies)
+            .defaultValue(getDefaultSchemaAccessStrategy().getValue())
+            .build());
+
+        properties.add(SCHEMA_REGISTRY);
+        properties.add(SCHEMA_NAME);
+        properties.add(SCHEMA_TEXT);
+
+        return properties;
+    }
+
+    protected AllowableValue getDefaultSchemaAccessStrategy() {
+        return SCHEMA_NAME_PROPERTY;
+    }
+
+    @OnEnabled
+    public void storeSchemaAccessStrategy(final ConfigurationContext context) {
+        this.configurationContext = context;
+
+        final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
+
+        final PropertyDescriptor descriptor = getSchemaAcessStrategyDescriptor();
+        final String schemaAccess = context.getProperty(descriptor).getValue();
+        this.schemaAccessStrategy = getSchemaAccessStrategy(schemaAccess, schemaRegistry);
+    }
+
+    protected ConfigurationContext getConfigurationContext() {
+        return configurationContext;
+    }
+
+    protected SchemaAccessStrategy getSchemaAccessStrategy() {
+        return schemaAccessStrategy;
+    }
+
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
+        return getSchemaAccessStrategy().getSchema(flowFile, contentStream, configurationContext);
+    }
+
+    private String getSchemaAccessStrategyName(final String schemaAccessValue) {
+        for (final AllowableValue allowableValue : getSchemaAcessStrategyDescriptor().getAllowableValues()) {
+            if (allowableValue.getValue().equalsIgnoreCase(schemaAccessValue)) {
+                return allowableValue.getDisplayName();
+            }
+        }
+
+        return null;
+    }
+
+    private boolean isSchemaRegistryRequired(final String schemaAccessValue) {
+        return HWX_CONTENT_ENCODED_SCHEMA.getValue().equalsIgnoreCase(schemaAccessValue) || SCHEMA_NAME_PROPERTY.getValue().equalsIgnoreCase(schemaAccessValue)
+            || HWX_SCHEMA_REF_ATTRIBUTES.getValue().equalsIgnoreCase(schemaAccessValue);
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final String schemaAccessStrategy = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
+        if (isSchemaRegistryRequired(schemaAccessStrategy)) {
+            final boolean registrySet = validationContext.getProperty(SCHEMA_REGISTRY).isSet();
+            if (!registrySet) {
+                final String schemaAccessStrategyName = getSchemaAccessStrategyName(schemaAccessStrategy);
+
+                return Collections.singleton(new ValidationResult.Builder()
+                    .subject("Schema Registry")
+                    .explanation("The '" + schemaAccessStrategyName + "' Schema Access Strategy requires that the Schema Registry property be set.")
+                    .valid(false)
+                    .build());
+            }
+        }
+
+        return Collections.emptyList();
+    }
+
+    protected List<AllowableValue> getSchemaAccessStrategyValues() {
+        return strategyList;
+    }
+
+    protected Set<SchemaField> getSuppliedSchemaFields(final ValidationContext validationContext) {
+        final String accessStrategyValue = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
+        final SchemaRegistry schemaRegistry = validationContext.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
+        final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy(accessStrategyValue, schemaRegistry, validationContext);
+
+        final Set<SchemaField> suppliedFields = accessStrategy.getSuppliedSchemaFields();
+        return suppliedFields;
+    }
+
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry) {
+        if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
+            return new SchemaNamePropertyStrategy(schemaRegistry, getConfigurationContext().getProperty(SCHEMA_NAME));
+        } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
+            return new AvroSchemaTextStrategy(getConfigurationContext().getProperty(SCHEMA_TEXT));
+        } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
+            return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
+        } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
+            return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+        }
+
+        return null;
+    }
+
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
+        if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
+            return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
+        } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
+            return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
+        } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
+            return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
+        } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
+            return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+        }
+
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
index 598a8c4..cb69444 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.text;
 
+import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
@@ -28,6 +29,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.RecordSetWriter;
@@ -35,13 +37,12 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 
 @Tags({"text", "freeform", "expression", "language", "el", "record", "recordset", "resultset", "writer", "serialize"})
 @CapabilityDescription("Writes the contents of a RecordSet as free-form text. The configured "
-    + "text is able to make use of the Expression Language to reference each of the columns that are available "
+    + "text is able to make use of the Expression Language to reference each of the fields that are available "
     + "in a Record. Each record in the RecordSet will be separated by a single newline character.")
 public class FreeFormTextRecordSetWriter extends AbstractControllerService implements RecordSetWriterFactory {
     static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder()
         .name("Text")
-        .description("The text to use when writing the results. This property will evaluate the Expression Language using any of the columns available to the Result Set. For example, if the "
-            + "following SQL Query is used: \"SELECT Name, COUNT(*) AS Count\" then the Expression can reference \"Name\" and \"Count\", such as \"${Name:toUpper()} ${Count:minus(1)}\"")
+        .description("The text to use when writing the results. This property will evaluate the Expression Language using any of the fields available in a Record.")
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .expressionLanguageSupported(true)
         .required(true)
@@ -73,7 +74,7 @@ public class FreeFormTextRecordSetWriter extends AbstractControllerService imple
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger) {
+    public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream in) {
         return new FreeFormTextWriter(textValue, characterSet);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
index 781f41f..7fdc7a4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
@@ -20,8 +20,10 @@ package org.apache.nifi.text;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.nifi.components.PropertyValue;
@@ -29,6 +31,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 
@@ -48,7 +51,7 @@ public class FreeFormTextWriter implements RecordSetWriter {
 
         try {
             final RecordSchema schema = recordSet.getSchema();
-            final String[] colNames = getColumnNames(schema);
+            final List<String> colNames = getColumnNames(schema);
 
             Record record;
             while ((record = recordSet.next()) != null) {
@@ -62,11 +65,13 @@ public class FreeFormTextWriter implements RecordSetWriter {
         return WriteResult.of(count, Collections.emptyMap());
     }
 
-    private String[] getColumnNames(final RecordSchema schema) {
-        final int numCols = schema.getFieldCount();
-        final String[] columnNames = new String[numCols];
-        for (int i = 0; i < numCols; i++) {
-            columnNames[i] = schema.getField(i).getFieldName();
+    private List<String> getColumnNames(final RecordSchema schema) {
+        final List<String> columnNames = new ArrayList<>();
+        for (final RecordField field : schema.getFields()) {
+            columnNames.add(field.getFieldName());
+            for (final String alias : field.getAliases()) {
+                columnNames.add(alias);
+            }
         }
 
         return columnNames;
@@ -78,11 +83,11 @@ public class FreeFormTextWriter implements RecordSetWriter {
         return WriteResult.of(1, Collections.emptyMap());
     }
 
-    private void write(final Record record, final OutputStream out, final String[] columnNames) throws IOException {
-        final int numCols = columnNames.length;
+    private void write(final Record record, final OutputStream out, final List<String> columnNames) throws IOException {
+        final int numCols = columnNames.size();
         final Map<String, String> values = new HashMap<>(numCols);
         for (int i = 0; i < numCols; i++) {
-            final String columnName = columnNames[i];
+            final String columnName = columnNames.get(i);
             final String columnValue = record.getAsString(columnName);
             values.put(columnName, columnValue);
         }