You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/11 23:33:41 UTC

[18/19] nifi git commit: NIFI-1280 added support for RecordSchema in SchemaRegistry

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java
deleted file mode 100644
index f54a4b5..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-
-@Tags({ "registry", "schema", "avro", "json", "transform" })
-@CapabilityDescription("Transforms JSON content of the Flow File to Avro using the schema provided by the Schema Registry Service.")
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-public final class TransformJsonToAvro extends AbstractContentTransformer {
-
-    /**
-     *
-     */
-    @Override
-    protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) {
-        GenericRecord avroRecord = JsonUtils.read(in, schema);
-        AvroUtils.write(avroRecord, out);
-        return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "binary/avro");
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java
deleted file mode 100644
index c026570..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-
-@Tags({ "registry", "schema", "csv", "json", "transform" })
-@CapabilityDescription("Transforms JSON content of the Flow File to CSV using the schema provided by the Schema Registry Service.")
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-public final class TransformJsonToCSV extends AbstractCSVTransformer {
-
-    /**
-     *
-     */
-    @Override
-    protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) {
-        GenericRecord avroRecord = JsonUtils.read(in, schema);
-        CSVUtils.write(avroRecord, this.delimiter, out);
-        return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "text/csv");
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
deleted file mode 100644
index 0bb067e..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.schemaregistry.processors.TransformJsonToAvro
-org.apache.nifi.schemaregistry.processors.TransformAvroToJson
-org.apache.nifi.schemaregistry.processors.TransformCSVToAvro
-org.apache.nifi.schemaregistry.processors.TransformCSVToJson
-org.apache.nifi.schemaregistry.processors.TransformAvroToCSV
-org.apache.nifi.schemaregistry.processors.TransformJsonToCSV
-org.apache.nifi.schemaregistry.processors.ExtractAvroFields
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java
deleted file mode 100644
index 058af62..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import junitparams.JUnitParamsRunner;
-import junitparams.Parameters;
-import static org.junit.Assume.assumeFalse;
-
-@RunWith(JUnitParamsRunner.class)
-public class TransformersTest {
-
-    private final ClassLoader classLoader = getClass().getClassLoader();
-
-    @Test
-    public void validateCSVtoAvroPair() throws Exception {
-        String data = "John Dow|13|blue";
-        String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
-                + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, "
-                + "{\"name\": \"favorite_number\",  \"type\": \"int\"}, "
-                + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}";
-
-        Schema schema = new Schema.Parser().parse(fooSchemaText);
-
-        // CSV -> AVRO -> CSV
-        ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
-        GenericRecord record = CSVUtils.read(in, '|', schema, '\"');
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        AvroUtils.write(record, out);
-        byte[] avro = out.toByteArray();
-
-        in = new ByteArrayInputStream(avro);
-        record = AvroUtils.read(in, schema);
-        out = new ByteArrayOutputStream();
-        CSVUtils.write(record, '|', out);
-        byte[] csv = out.toByteArray();
-        assertEquals(data, new String(csv, StandardCharsets.UTF_8));
-    }
-
-    @Test
-    public void validateCSVtoJsonPair() throws Exception {
-        String data = "John Dow|13|blue";
-        String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
-                + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, "
-                + "{\"name\": \"favorite_number\",  \"type\": \"int\"}, "
-                + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}";
-
-        Schema schema = new Schema.Parser().parse(fooSchemaText);
-
-        // CSV -> JSON -> CSV
-        ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
-        GenericRecord record = CSVUtils.read(in, '|', schema, '\"');
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        JsonUtils.write(record, out);
-        byte[] json = out.toByteArray();
-
-        assertEquals("{\"name\":\"John Dow\",\"favorite_number\":13,\"favorite_color\":\"blue\"}", new String(json, StandardCharsets.UTF_8));
-
-        in = new ByteArrayInputStream(json);
-        record = JsonUtils.read(in, schema);
-        out = new ByteArrayOutputStream();
-        CSVUtils.write(record, '|', out);
-        byte[] csv = out.toByteArray();
-        assertEquals(data, new String(csv, StandardCharsets.UTF_8));
-    }
-
-    @Test
-    public void validateJsonToAvroPair() throws Exception {
-        String data = "{\"name\":\"John Dow\",\"favorite_number\":13,\"favorite_color\":\"blue\"}";
-        String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
-                + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, "
-                + "{\"name\": \"favorite_number\",  \"type\": \"int\"}, "
-                + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}";
-
-        Schema schema = new Schema.Parser().parse(fooSchemaText);
-
-        // JSON -> AVRO -> JSON
-        ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
-        GenericRecord record = JsonUtils.read(in, schema);
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        AvroUtils.write(record, out);
-        byte[] avro = out.toByteArray();
-
-        in = new ByteArrayInputStream(avro);
-        record = AvroUtils.read(in, schema);
-        out = new ByteArrayOutputStream();
-        JsonUtils.write(record, out);
-        byte[] csv = out.toByteArray();
-        assertEquals(data, new String(csv, StandardCharsets.UTF_8));
-    }
-
-    @Test
-    @Parameters({"input_csv/union_null_last_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_last_field_with_default.txt",
-            "input_csv/union_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_with_default.txt",
-            "input_csv/union_null_middle_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_middle_field_with_default.txt",
-            "input_csv/primitive_types.txt,input_avro/primitive_types_no_defaults.txt,expected_ouput_csv/primitive_types.txt",
-            "input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_matching_default.txt,expected_ouput_csv/primitive_types_with_matching_default.txt",
-            "input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_with_default.txt"})
-    public void testCSVRoundtrip(final String inputCSVFileName, final String inputAvroSchema, final String expectedOuput) throws Exception {
-        assumeFalse(isWindowsEnvironment());
-        final String data = getResourceAsString(inputCSVFileName);
-        final String schemaText = getResourceAsString(inputAvroSchema);
-        final String result = getResourceAsString(expectedOuput);
-        csvRoundTrip(data, schemaText, result);
-    }
-
-    private boolean isWindowsEnvironment() {
-        return System.getProperty("os.name").toLowerCase().startsWith("windows");
-    }
-
-    @Test
-    @Parameters({"input_csv/union_with_missing_value.txt,input_avro/union_and_mismatch_defaults.txt",
-            "input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_mismatch_default.txt"})
-    public void testCSVMismatchDefaults(final String inputCSVFileName, final String inputAvroSchema)  {
-        assumeFalse(isWindowsEnvironment());
-        try {
-            final String data = getResourceAsString(inputCSVFileName);
-            final String schemaText = getResourceAsString(inputAvroSchema);
-            Schema schema = new Schema.Parser().parse(schemaText);
-
-            ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
-            CSVUtils.read(in, '|', schema, '\"');
-        }catch (IOException ioe){
-            assertTrue(false);
-        }catch(IllegalArgumentException iae){
-            assertTrue(true);
-        }
-    }
-
-    @Test
-    public void testCSVRoundTrip() throws IOException {
-        assumeFalse(isWindowsEnvironment());
-        NumberFormat numberFormat = DecimalFormat.getInstance();
-        numberFormat.setGroupingUsed(false);
-        ((DecimalFormat) numberFormat).setParseBigDecimal(true);
-
-        //"input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_invalid_scale.txt",
-        String decimalLogicalType = "\"fake_transactionid\"|" + numberFormat.format(new BigDecimal(11234567.89));
-        String data = getResourceAsString("input_csv/decimal_logicalType.txt");
-        String schemaText = getResourceAsString("input_avro/decimal_logicalType_invalid_scale_with_default.txt");
-        csvRoundTrip(data, schemaText, decimalLogicalType);
-
-        // needs to be set now because scale < precision
-        numberFormat.setMaximumIntegerDigits(10);
-        numberFormat.setMaximumFractionDigits(3);
-        numberFormat.setMinimumFractionDigits(3);
-
-        //"input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_valid_scale_with_no_default.txt,expected_ouput_csv/decimal_logicalType.txt",
-        decimalLogicalType = "\"fake_transactionid\"|" + numberFormat.format(new BigDecimal(11234567.890));
-        data = getResourceAsString("input_csv/decimal_logicalType.txt");
-        schemaText = getResourceAsString("input_avro/decimal_logicalType_valid_scale_with_no_default.txt");
-
-        //"input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_valid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt",
-        decimalLogicalType = "\"fake_transactionid\"|" + numberFormat.format(new BigDecimal(0.000));
-        data = getResourceAsString("input_csv/decimal_logicalType_missing_value.txt");
-        schemaText = getResourceAsString("input_avro/decimal_logicalType_valid_scale_with_default.txt");
-        csvRoundTrip(data, schemaText, decimalLogicalType);
-    }
-
-    private void csvRoundTrip(final String data, final String schemaText, final String result) {
-        Schema schema = new Schema.Parser().parse(schemaText);
-
-        // CSV -> AVRO -> CSV
-        ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
-        GenericRecord record = CSVUtils.read(in, '|', schema, '\"');
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        AvroUtils.write(record, out);
-        byte[] avro = out.toByteArray();
-
-        in = new ByteArrayInputStream(avro);
-        record = AvroUtils.read(in, schema);
-        out = new ByteArrayOutputStream();
-        CSVUtils.write(record, '|', out);
-        byte[] csv = out.toByteArray();
-        assertEquals(result, new String(csv, StandardCharsets.UTF_8));
-    }
-
-    /**
-     * Simple wrapper around getting the test resource file that is used by the above test cases
-     *
-     * @param fileName - the filename of the file to read
-     * @return A string that contains the body of the file.
-     * @throws IOException - if an error occurs reading the file.
-     */
-    private String getResourceAsString(String fileName) throws IOException {
-        return new String(Files.readAllBytes(FileSystems.getDefault().getPath(classLoader.getResource(fileName).getPath())));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt
deleted file mode 100644
index 1a53f85..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt
+++ /dev/null
@@ -1 +0,0 @@
-"fake_transactionid"|11234567.890
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt
deleted file mode 100644
index 9506ad4..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt
+++ /dev/null
@@ -1 +0,0 @@
-"fake_transactionid"|11234567.89
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt
deleted file mode 100644
index 2309e71..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-"fake_transactionid"|0.000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt
deleted file mode 100644
index 3a9689c..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-"fake_transactionid"|0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt
deleted file mode 100644
index 77f353f..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt
+++ /dev/null
@@ -1 +0,0 @@
-"this is a simple string."|10|21474836470|1.7976931348623157E308|true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt
deleted file mode 100644
index 095f81e..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-"default_string"|1234|21474836470|1.7976931348623157E308|true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt
deleted file mode 100644
index 83cbf75..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew|13|
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt
deleted file mode 100644
index 1b03c97..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew|21474|blue
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt
deleted file mode 100644
index 9c7abb5..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew|13|blue
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt
deleted file mode 100644
index 54ba8b1..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{
-	"name": "trx_table",
-	"type": "record",
-	"fields": [
-	 {
-		"name": "transactionid",
-		"type": ["string", "null"]
-	}, {
-		"name": "amount",
-		"type": "bytes",
-		"logicalType": "decimal",
-		"precision": 10,
-		"scale": 13,
-		"default": 0.0
-	}]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt
deleted file mode 100644
index 8385fb1..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{
-	"name": "trx_table",
-	"type": "record",
-	"fields": [
-	 {
-		"name": "transactionid",
-		"type": ["string", "null"]
-	}, {
-		"name": "amount",
-		"type": "bytes",
-		"logicalType": "decimal",
-		"precision": 10,
-		"scale": 3,
-		"default": 0.0
-	}]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt
deleted file mode 100644
index 9878590..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-{
-	"name": "trx_table",
-	"type": "record",
-	"fields": [
-	 {
-		"name": "transactionid",
-		"type": ["string", "null"]
-	}, {
-		"name": "amount",
-		"type": "bytes",
-		"logicalType": "decimal",
-		"precision": 10,
-		"scale": 3
-	}]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt
deleted file mode 100644
index 934a53c..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-{
-    "type":"record",
-    "name":"basic_primitive_type_check",
-    "fields":[
-      {"name":"string","type":"string"},
-      {"name":"integer","type":"int"},
-      {"name":"long","type":"long"},
-      {"name":"double","type":"double"},
-      {"name":"boolean","type":"boolean"}
-    ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt
deleted file mode 100644
index abc80ca..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-{
-    "type":"record",
-    "name":"basic_primitive_type_check",
-    "fields":[
-      {"name":"string","type":["null","string"],"default":null},
-      {"name":"integer","type":["null","int"],"default":null},
-      {"name":"long","type":["null","long"],"default":null},
-      {"name":"double","type":["null","double"],"default":null},
-      {"name":"boolean","type":["null","boolean"],"default":null}
-    ]
-  }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt
deleted file mode 100644
index b3ea951..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-{
-    "type":"record",
-    "name":"basic_primitive_type_check",
-    "fields":[
-      {"name":"string","type":"string","default":"default_string"},
-      {"name":"integer","type":"int","default":1234},
-      {"name":"long","type":"long","default":21474836470},
-      {"name":"double","type":"double","default":1.7976931348623157E308},
-      {"name":"boolean","type":"boolean","default":true}
-    ]
-  }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt
deleted file mode 100644
index e8f0e28..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-{
-    "type":"record",
-    "name":"basic_primitive_type_check",
-    "fields":[
-      {"name":"string","type":"string","default":1234},
-      {"name":"integer","type":"int","default":"mismatch_int"},
-      {"name":"long","type":"long","default":"mismatch_long"},
-      {"name":"double","type":"double","default":"mismatch_double"},
-      {"name":"boolean","type":"boolean","default":"mismatch_boolean"}
-    ]
-  }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt
deleted file mode 100644
index 442a3a4..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt
+++ /dev/null
@@ -1,18 +0,0 @@
-{
-	"namespace": "example.avro",
-	"type": "record",
-	"name": "User",
-	"fields": [{
-		"name": "name",
-		"type": "string",
-		"default": "default_name"
-	}, {
-		"name": "favorite_number",
-		"type": "int",
-		"default": 21474
-	}, {
-		"name": "favorite_color",
-		"type": ["null", "string"],
-		"default": null
-	}]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt
deleted file mode 100644
index 5222074..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt
+++ /dev/null
@@ -1,18 +0,0 @@
-{
-	"namespace": "example.avro",
-	"type": "record",
-	"name": "User",
-	"fields": [{
-		"name": "name",
-		"type": "string",
-		"default": "default_name"
-	}, {
-		"name": "favorite_number",
-		"type": "int",
-		"default": "mismatched_int_default"
-	}, {
-		"name": "favorite_color",
-		"type": ["null", "string"],
-		"default": null
-	}]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt
deleted file mode 100644
index 1a53f85..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt
+++ /dev/null
@@ -1 +0,0 @@
-"fake_transactionid"|11234567.890
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt
deleted file mode 100644
index 1ee2a9b..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt
+++ /dev/null
@@ -1 +0,0 @@
-"fake_transactionid"|
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt
deleted file mode 100644
index 77f353f..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt
+++ /dev/null
@@ -1 +0,0 @@
-"this is a simple string."|10|21474836470|1.7976931348623157E308|true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt
deleted file mode 100644
index b60c01b..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-"default_string"||21474836470||true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt
deleted file mode 100644
index 83cbf75..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew|13|
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt
deleted file mode 100644
index 5a706ac..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew||blue
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt
deleted file mode 100644
index 9c7abb5..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew|13|blue
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt
deleted file mode 100644
index 5a706ac..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew||blue
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
index d99dc64..3cfae30 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
@@ -23,6 +23,18 @@
 	<packaging>jar</packaging>
 
 	<dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-record-serialization-service-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro</artifactId>
+		</dependency>
 		<dependency>
 			<groupId>org.apache.nifi</groupId>
 			<artifactId>nifi-api</artifactId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
new file mode 100644
index 0000000..13b1d5d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+@Tags({"schema", "registry", "avro", "json", "csv"})
+@CapabilityDescription("Provides a service for registering and accessing schemas. You can register a schema "
+    + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual "
+    + "representation of the actual schema following the syntax and semantics of Avro's Schema format.")
+public class AvroSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
+
+    private final Map<String, String> schemaNameToSchemaMap;
+
+    private static final String LOGICAL_TYPE_DATE = "date";
+    private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
+    private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
+    private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
+    private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
+
+
+    public AvroSchemaRegistry() {
+        this.schemaNameToSchemaMap = new HashMap<>();
+    }
+
+    @OnEnabled
+    public void enable(ConfigurationContext configuratiponContext) throws InitializationException {
+        this.schemaNameToSchemaMap.putAll(configuratiponContext.getProperties().entrySet().stream()
+            .filter(propEntry -> propEntry.getKey().isDynamic())
+            .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue())));
+    }
+
+    @Override
+    public String retrieveSchemaText(String schemaName) {
+        if (!this.schemaNameToSchemaMap.containsKey(schemaName)) {
+            throw new IllegalArgumentException("Failed to find schema; Name: '" + schemaName + ".");
+        } else {
+            return this.schemaNameToSchemaMap.get(schemaName);
+        }
+    }
+
+    @Override
+    public String retrieveSchemaText(String schemaName, Map<String, String> attributes) {
+        throw new UnsupportedOperationException("This version of schema registry does not "
+            + "support this operation, since schemas are only identofied by name.");
+    }
+
+    @Override
+    @OnDisabled
+    public void close() throws Exception {
+        this.schemaNameToSchemaMap.clear();
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .name(propertyDescriptorName)
+            .required(false)
+            .addValidator(new AvroSchemaValidator())
+            .dynamic(true)
+            .expressionLanguageSupported(true)
+            .build();
+    }
+
+
+    @Override
+    public RecordSchema retrieveSchema(String schemaName) {
+        final String schemaText = this.retrieveSchemaText(schemaName);
+        final Schema schema = new Schema.Parser().parse(schemaText);
+        return createRecordSchema(schema);
+    }
+
+    /**
+     * Converts an Avro Schema to a RecordSchema
+     *
+     * @param avroSchema the Avro Schema to convert
+     * @return the Corresponding Record Schema
+     */
+    private RecordSchema createRecordSchema(final Schema avroSchema) {
+        final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
+        for (final Field field : avroSchema.getFields()) {
+            final String fieldName = field.name();
+            final DataType dataType = determineDataType(field.schema());
+            recordFields.add(new RecordField(fieldName, dataType));
+        }
+
+        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+        return recordSchema;
+    }
+
+    /**
+     * Returns a DataType for the given Avro Schema
+     *
+     * @param avroSchema the Avro Schema to convert
+     * @return a Data Type that corresponds to the given Avro Schema
+     */
+    private DataType determineDataType(final Schema avroSchema) {
+        final Type avroType = avroSchema.getType();
+
+        final LogicalType logicalType = avroSchema.getLogicalType();
+        if (logicalType != null) {
+            final String logicalTypeName = logicalType.getName();
+            switch (logicalTypeName) {
+                case LOGICAL_TYPE_DATE:
+                    return RecordFieldType.DATE.getDataType();
+                case LOGICAL_TYPE_TIME_MILLIS:
+                case LOGICAL_TYPE_TIME_MICROS:
+                    return RecordFieldType.TIME.getDataType();
+                case LOGICAL_TYPE_TIMESTAMP_MILLIS:
+                case LOGICAL_TYPE_TIMESTAMP_MICROS:
+                    return RecordFieldType.TIMESTAMP.getDataType();
+            }
+        }
+
+        switch (avroType) {
+            case ARRAY:
+                return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType()));
+            case BYTES:
+            case FIXED:
+                return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+            case BOOLEAN:
+                return RecordFieldType.BOOLEAN.getDataType();
+            case DOUBLE:
+                return RecordFieldType.DOUBLE.getDataType();
+            case ENUM:
+            case STRING:
+                return RecordFieldType.STRING.getDataType();
+            case FLOAT:
+                return RecordFieldType.FLOAT.getDataType();
+            case INT:
+                return RecordFieldType.INT.getDataType();
+            case LONG:
+                return RecordFieldType.LONG.getDataType();
+            case RECORD: {
+                final List<Field> avroFields = avroSchema.getFields();
+                final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
+
+                for (final Field field : avroFields) {
+                    final String fieldName = field.name();
+                    final Schema fieldSchema = field.schema();
+                    final DataType fieldType = determineDataType(fieldSchema);
+                    recordFields.add(new RecordField(fieldName, fieldType));
+                }
+
+                final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+                return RecordFieldType.RECORD.getRecordDataType(recordSchema);
+            }
+            case NULL:
+            case MAP:
+                return RecordFieldType.RECORD.getDataType();
+            case UNION: {
+                final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream()
+                    .filter(s -> s.getType() != Type.NULL)
+                    .collect(Collectors.toList());
+
+                if (nonNullSubSchemas.size() == 1) {
+                    return determineDataType(nonNullSubSchemas.get(0));
+                }
+
+                final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
+                for (final Schema subSchema : nonNullSubSchemas) {
+                    final DataType childDataType = determineDataType(subSchema);
+                    possibleChildTypes.add(childDataType);
+                }
+
+                return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
+            }
+        }
+
+        return null;
+    }
+
+    /*
+     * For this implementation 'attributes' argument is ignored since the underlying storage mechanisms
+     * is based strictly on key/value pairs. In other implementation additional attributes may play a role (e.g., version id,)
+     */
+    @Override
+    public RecordSchema retrieveSchema(String schemaName, Map<String, String> attributes) {
+        return this.retrieveSchema(schemaName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
new file mode 100644
index 0000000..32b700f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schemaregistry.services;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+public class AvroSchemaValidator implements Validator {
+
+    @Override
+    public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+        if (context.isExpressionLanguagePresent(input)) {
+            return new ValidationResult.Builder()
+                .input(input)
+                .subject(subject)
+                .valid(true)
+                .explanation("Expression Language is present")
+                .build();
+        }
+
+        try {
+            new Schema.Parser().parse(input);
+
+            return new ValidationResult.Builder()
+                .input(input)
+                .subject(subject)
+                .valid(true)
+                .explanation("Schema is valid")
+                .build();
+        } catch (final Exception e) {
+            return new ValidationResult.Builder()
+                .input(input)
+                .subject(subject)
+                .valid(false)
+                .explanation("Not a valid Avro Schema: " + e.getMessage())
+                .build();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
deleted file mode 100644
index fd5d0c5..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.services;
-
-import java.util.Properties;
-
-import org.apache.nifi.controller.ControllerService;
-
-/**
- * Represents {@link ControllerService} strategy to expose internal and/or
- * integrate with external Schema Registry
- */
-public interface SchemaRegistry extends ControllerService, AutoCloseable {
-
-    public static final String SCHEMA_NAME_ATTR = "schema.name";
-
-
-    /**
-     * Retrieves and returns the textual representation of the schema based on
-     * the provided name of the schema available in Schema Registry. Will throw
-     * an runtime exception if schema can not be found.
-     */
-    String retrieveSchemaText(String schemaName);
-
-    /**
-     * Retrieves and returns the textual representation of the schema based on
-     * the provided name of the schema available in Schema Registry and optional
-     * additional attributes. Will throw an runtime exception if schema can not
-     * be found.
-     */
-    String retrieveSchemaText(String schemaName, Properties attributes);
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java
deleted file mode 100644
index aaedea2..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.services;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
-
-@Tags({ "schema", "registry", "avro", "json", "csv" })
-@CapabilityDescription("Provides a service for registering and accessing schemas. You can register schema "
-        + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual "
-        + "representation of the actual schema.")
-public class SimpleKeyValueSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
-
-    private static final List<PropertyDescriptor> propertyDescriptors;
-
-    static {
-        propertyDescriptors = Collections.emptyList();
-    }
-
-    private final Map<String, String> schemaNameToSchemaMap;
-
-    public SimpleKeyValueSchemaRegistry() {
-        this.schemaNameToSchemaMap = new HashMap<>();
-    }
-
-    @OnEnabled
-    public void enable(ConfigurationContext configuratiponContext) throws InitializationException {
-        this.schemaNameToSchemaMap.putAll(configuratiponContext.getProperties().entrySet().stream()
-                .filter(propEntry -> propEntry.getKey().isDynamic())
-                .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue())));
-    }
-
-    /**
-     *
-     */
-    @Override
-    public String retrieveSchemaText(String schemaName) {
-        if (!this.schemaNameToSchemaMap.containsKey(schemaName)) {
-            throw new IllegalArgumentException("Failed to find schema; Name: '" + schemaName + ".");
-        } else {
-            return this.schemaNameToSchemaMap.get(schemaName);
-        }
-    }
-
-    @Override
-    public String retrieveSchemaText(String schemaName, Properties attributes) {
-        throw new UnsupportedOperationException("This version of schema registry does not "
-                + "support this operation, since schemas are only identofied by name.");
-    }
-
-    @Override
-    @OnDisabled
-    public void close() throws Exception {
-        this.schemaNameToSchemaMap.clear();
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder().required(false).name(propertyDescriptorName)
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true).expressionLanguageSupported(true)
-                .build();
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return propertyDescriptors;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 1775b76..a000cd7 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,4 +12,4 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.schemaregistry.services.SimpleKeyValueSchemaRegistry
\ No newline at end of file
+org.apache.nifi.schemaregistry.services.AvroSchemaRegistry
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java
deleted file mode 100644
index 29179ab..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.services;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.schemaregistry.services.SimpleKeyValueSchemaRegistry;
-import org.junit.Test;
-
-public class SimpleKeyValueSchemaRegistryTest {
-
-    @Test
-    public void validateSchemaRegistrationFromrDynamicProperties() throws Exception {
-        String schemaName = "fooSchema";
-        ConfigurationContext configContext = mock(ConfigurationContext.class);
-        Map<PropertyDescriptor, String> properties = new HashMap<>();
-        PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
-                .name(schemaName)
-                .dynamic(true)
-                .build();
-        String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
-                + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, "
-                + "{\"name\": \"favorite_number\",  \"type\": [\"int\", \"null\"]}, "
-                + "{\"name\": \"foo\",  \"type\": [\"int\", \"null\"]}, "
-                + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}";
-        PropertyDescriptor barSchema = new PropertyDescriptor.Builder()
-                .name("barSchema")
-                .dynamic(false)
-                .build();
-        properties.put(fooSchema, fooSchemaText);
-        properties.put(barSchema, "");
-        when(configContext.getProperties()).thenReturn(properties);
-        SchemaRegistry delegate = new SimpleKeyValueSchemaRegistry();
-        ((SimpleKeyValueSchemaRegistry)delegate).enable(configContext);
-
-        String locatedSchemaText = delegate.retrieveSchemaText(schemaName);
-        assertEquals(fooSchemaText, locatedSchemaText);
-        try {
-            locatedSchemaText = delegate.retrieveSchemaText("barSchema");
-            fail();
-        } catch (Exception e) {
-            // ignore
-        }
-        delegate.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
new file mode 100644
index 0000000..929aab9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Test;
+
+public class TestAvroSchemaRegistry {
+
+    @Test
+    public void validateSchemaRegistrationFromrDynamicProperties() throws Exception {
+        String schemaName = "fooSchema";
+        ConfigurationContext configContext = mock(ConfigurationContext.class);
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
+            .name(schemaName)
+            .dynamic(true)
+            .build();
+        String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
+            + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, "
+            + "{\"name\": \"favorite_number\",  \"type\": [\"int\", \"null\"]}, "
+            + "{\"name\": \"foo\",  \"type\": [\"int\", \"null\"]}, "
+            + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}";
+        PropertyDescriptor barSchema = new PropertyDescriptor.Builder()
+            .name("barSchema")
+            .dynamic(false)
+            .build();
+        properties.put(fooSchema, fooSchemaText);
+        properties.put(barSchema, "");
+        when(configContext.getProperties()).thenReturn(properties);
+        SchemaRegistry delegate = new AvroSchemaRegistry();
+        ((AvroSchemaRegistry) delegate).enable(configContext);
+
+        String locatedSchemaText = delegate.retrieveSchemaText(schemaName);
+        assertEquals(fooSchemaText, locatedSchemaText);
+        try {
+            locatedSchemaText = delegate.retrieveSchemaText("barSchema");
+            fail();
+        } catch (Exception e) {
+            // ignore
+        }
+        delegate.close();
+    }
+
+
+    @Test
+    public void validateRecordSchemaRetrieval() throws Exception {
+        String schemaName = "fooSchema";
+        ConfigurationContext configContext = mock(ConfigurationContext.class);
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
+            .name(schemaName)
+            .dynamic(true)
+            .build();
+        String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
+            + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, "
+            + "{\"name\": \"favorite_number\",  \"type\": \"int\"}, "
+            + "{\"name\": \"foo\",  \"type\": \"boolean\"}, "
+            + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}";
+        PropertyDescriptor barSchema = new PropertyDescriptor.Builder()
+            .name("barSchema")
+            .dynamic(false)
+            .build();
+        properties.put(fooSchema, fooSchemaText);
+        properties.put(barSchema, "");
+        when(configContext.getProperties()).thenReturn(properties);
+        SchemaRegistry delegate = new AvroSchemaRegistry();
+        ((AvroSchemaRegistry) delegate).enable(configContext);
+
+        RecordSchema locatedSchema = delegate.retrieveSchema(schemaName);
+        List<RecordField> recordFields = locatedSchema.getFields();
+        assertEquals(4, recordFields.size());
+        assertEquals(RecordFieldType.STRING.getDataType(), recordFields.get(0).getDataType());
+        assertEquals("name", recordFields.get(0).getFieldName());
+        assertEquals(RecordFieldType.INT.getDataType(), recordFields.get(1).getDataType());
+        assertEquals("favorite_number", recordFields.get(1).getFieldName());
+        assertEquals(RecordFieldType.BOOLEAN.getDataType(), recordFields.get(2).getDataType());
+        assertEquals("foo", recordFields.get(2).getFieldName());
+        assertEquals(RecordFieldType.STRING.getDataType(), recordFields.get(3).getDataType());
+        assertEquals("favorite_color", recordFields.get(3).getFieldName());
+        delegate.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/pom.xml
index fa6d30f..5287a02 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-registry-bundle/pom.xml
@@ -21,12 +21,7 @@
 	<packaging>pom</packaging>
 	<description>A bundle of processors that rely on external service to obtain schema.</description>
 
-	<properties>
-		<commons-lang3.version>3.0</commons-lang3.version>
-	</properties>
-
 	<modules>
-		<module>nifi-registry-processors</module>
 		<module>nifi-registry-service</module>
 		<module>nifi-registry-nar</module>
 	</modules>

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index e390097..295ae96 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -290,8 +290,8 @@
         </dependency>
         <dependency>
             <groupId>org.apache.calcite</groupId>
-            <artifactId>calcite-example-csv</artifactId>
-            <version>1.11.0</version>
+            <artifactId>calcite-core</artifactId>
+            <version>1.12.0</version>
         </dependency>
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
index 833a5d6..83a3d4b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
@@ -98,19 +98,22 @@ import org.apache.nifi.util.StopWatch;
         + "that is selected being routed to the relationship whose name is the property name")
 public class QueryFlowFile extends AbstractProcessor {
     static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
-        .name("Record Reader")
+        .name("record-reader")
+        .displayName("Record Reader")
         .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
         .identifiesControllerService(RowRecordReaderFactory.class)
         .required(true)
         .build();
     static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
-        .name("Record Writer")
+        .name("record-writer")
+        .displayName("Record Writer")
         .description("Specifies the Controller Service to use for writing results to a FlowFile")
         .identifiesControllerService(RecordSetWriterFactory.class)
         .required(true)
         .build();
     static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
-        .name("Include Zero Record FlowFiles")
+        .name("include-zero-record-flowfiles")
+        .displayName("Include Zero Record FlowFiles")
         .description("When running the SQL statement against an incoming FlowFile, if the result has no data, "
             + "this property specifies whether or not a FlowFile will be sent to the corresponding relationship")
         .expressionLanguageSupported(false)
@@ -119,7 +122,8 @@ public class QueryFlowFile extends AbstractProcessor {
         .required(true)
         .build();
     static final PropertyDescriptor CACHE_SCHEMA = new PropertyDescriptor.Builder()
-        .name("Cache Schema")
+        .name("cache-schema")
+        .displayName("Cache Schema")
         .description("Parsing the SQL query and deriving the FlowFile's schema is relatively expensive. If this value is set to true, "
             + "the Processor will cache these values so that the Processor is much more efficient and much faster. However, if this is done, "
             + "then the schema that is derived for the first FlowFile processed must apply to all FlowFiles. If all FlowFiles will not have the exact "
@@ -391,7 +395,7 @@ public class QueryFlowFile extends AbstractProcessor {
 
         final Supplier<CalciteConnection> connectionSupplier = () -> {
             final Properties properties = new Properties();
-            properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.JAVA.name());
+            properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
 
             try {
                 final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
@@ -491,6 +495,15 @@ public class QueryFlowFile extends AbstractProcessor {
     private static class SqlValidator implements Validator {
         @Override
         public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            if (context.isExpressionLanguagePresent(input)) {
+                return new ValidationResult.Builder()
+                    .input(input)
+                    .subject(subject)
+                    .valid(true)
+                    .explanation("Expression Language Present")
+                    .build();
+            }
+
             final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
             final SqlParser parser = SqlParser.create(substituted);
             try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
index 1a62d14..7daa002 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
@@ -99,10 +99,6 @@ public class FlowFileEnumerator<InternalType> implements Enumerator<Object> {
         }
 
         // Create a new Object array that contains only the desired fields.
-        if (row.length <= fields.length) {
-            return row;
-        }
-
         final Object[] filtered = new Object[fields.length];
         for (int i = 0; i < fields.length; i++) {
             final int indexToKeep = fields[i];
@@ -125,7 +121,7 @@ public class FlowFileEnumerator<InternalType> implements Enumerator<Object> {
         rawIn = session.read(flowFile);
 
         try {
-            recordParser = recordParserFactory.createRecordReader(rawIn, logger);
+            recordParser = recordParserFactory.createRecordReader(flowFile, rawIn, logger);
         } catch (final MalformedRecordException | IOException e) {
             throw new ProcessException("Failed to reset stream", e);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
index a23dcfa..27f0c42 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
@@ -136,7 +136,7 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
 
         RecordSchema schema;
         try (final InputStream in = session.read(flowFile)) {
-            final RecordReader recordParser = recordParserFactory.createRecordReader(in, logger);
+            final RecordReader recordParser = recordParserFactory.createRecordReader(flowFile, in, logger);
             schema = recordParser.getSchema();
         } catch (final MalformedRecordException | IOException e) {
             throw new ProcessException("Failed to determine schema of data records for " + flowFile, e);
@@ -189,7 +189,7 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
                 return typeFactory.createJavaType(String.class);
             case ARRAY:
                 return typeFactory.createJavaType(Object[].class);
-            case OBJECT:
+            case RECORD:
                 return typeFactory.createJavaType(Object.class);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
index 1cc7923..0dffc0d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
@@ -41,7 +41,8 @@
     	</p>
     	
     	<p>
-    		The SQL syntax that is supported by this Processor is ANSI SQL and is powered by Apache Calcite.
+			The SQL syntax that is supported by this Processor is ANSI SQL and is powered by Apache Calcite. Please
+			note that identifiers are quoted using double-quotes, and column names/labels are case-insensitive.
     	</p>
 	</body>
 </html>
\ No newline at end of file