You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/11 23:33:41 UTC
[18/19] nifi git commit: NIFI-1280 added support for RecordSchema in
SchemaRegistry
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java
deleted file mode 100644
index f54a4b5..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-
-@Tags({ "registry", "schema", "avro", "json", "transform" })
-@CapabilityDescription("Transforms JSON content of the Flow File to Avro using the schema provided by the Schema Registry Service.")
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-public final class TransformJsonToAvro extends AbstractContentTransformer {
-
- /**
- *
- */
- @Override
- protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) {
- GenericRecord avroRecord = JsonUtils.read(in, schema);
- AvroUtils.write(avroRecord, out);
- return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "binary/avro");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java
deleted file mode 100644
index c026570..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-
-@Tags({ "registry", "schema", "csv", "json", "transform" })
-@CapabilityDescription("Transforms JSON content of the Flow File to CSV using the schema provided by the Schema Registry Service.")
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-public final class TransformJsonToCSV extends AbstractCSVTransformer {
-
- /**
- *
- */
- @Override
- protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) {
- GenericRecord avroRecord = JsonUtils.read(in, schema);
- CSVUtils.write(avroRecord, this.delimiter, out);
- return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "text/csv");
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
deleted file mode 100644
index 0bb067e..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.schemaregistry.processors.TransformJsonToAvro
-org.apache.nifi.schemaregistry.processors.TransformAvroToJson
-org.apache.nifi.schemaregistry.processors.TransformCSVToAvro
-org.apache.nifi.schemaregistry.processors.TransformCSVToJson
-org.apache.nifi.schemaregistry.processors.TransformAvroToCSV
-org.apache.nifi.schemaregistry.processors.TransformJsonToCSV
-org.apache.nifi.schemaregistry.processors.ExtractAvroFields
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java
deleted file mode 100644
index 058af62..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.processors;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import junitparams.JUnitParamsRunner;
-import junitparams.Parameters;
-import static org.junit.Assume.assumeFalse;
-
-@RunWith(JUnitParamsRunner.class)
-public class TransformersTest {
-
- private final ClassLoader classLoader = getClass().getClassLoader();
-
- @Test
- public void validateCSVtoAvroPair() throws Exception {
- String data = "John Dow|13|blue";
- String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
- + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, "
- + "{\"name\": \"favorite_number\", \"type\": \"int\"}, "
- + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}";
-
- Schema schema = new Schema.Parser().parse(fooSchemaText);
-
- // CSV -> AVRO -> CSV
- ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
- GenericRecord record = CSVUtils.read(in, '|', schema, '\"');
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- AvroUtils.write(record, out);
- byte[] avro = out.toByteArray();
-
- in = new ByteArrayInputStream(avro);
- record = AvroUtils.read(in, schema);
- out = new ByteArrayOutputStream();
- CSVUtils.write(record, '|', out);
- byte[] csv = out.toByteArray();
- assertEquals(data, new String(csv, StandardCharsets.UTF_8));
- }
-
- @Test
- public void validateCSVtoJsonPair() throws Exception {
- String data = "John Dow|13|blue";
- String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
- + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, "
- + "{\"name\": \"favorite_number\", \"type\": \"int\"}, "
- + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}";
-
- Schema schema = new Schema.Parser().parse(fooSchemaText);
-
- // CSV -> JSON -> CSV
- ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
- GenericRecord record = CSVUtils.read(in, '|', schema, '\"');
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- JsonUtils.write(record, out);
- byte[] json = out.toByteArray();
-
- assertEquals("{\"name\":\"John Dow\",\"favorite_number\":13,\"favorite_color\":\"blue\"}", new String(json, StandardCharsets.UTF_8));
-
- in = new ByteArrayInputStream(json);
- record = JsonUtils.read(in, schema);
- out = new ByteArrayOutputStream();
- CSVUtils.write(record, '|', out);
- byte[] csv = out.toByteArray();
- assertEquals(data, new String(csv, StandardCharsets.UTF_8));
- }
-
- @Test
- public void validateJsonToAvroPair() throws Exception {
- String data = "{\"name\":\"John Dow\",\"favorite_number\":13,\"favorite_color\":\"blue\"}";
- String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
- + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, "
- + "{\"name\": \"favorite_number\", \"type\": \"int\"}, "
- + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}";
-
- Schema schema = new Schema.Parser().parse(fooSchemaText);
-
- // JSON -> AVRO -> JSON
- ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
- GenericRecord record = JsonUtils.read(in, schema);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- AvroUtils.write(record, out);
- byte[] avro = out.toByteArray();
-
- in = new ByteArrayInputStream(avro);
- record = AvroUtils.read(in, schema);
- out = new ByteArrayOutputStream();
- JsonUtils.write(record, out);
- byte[] csv = out.toByteArray();
- assertEquals(data, new String(csv, StandardCharsets.UTF_8));
- }
-
- @Test
- @Parameters({"input_csv/union_null_last_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_last_field_with_default.txt",
- "input_csv/union_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_with_default.txt",
- "input_csv/union_null_middle_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_middle_field_with_default.txt",
- "input_csv/primitive_types.txt,input_avro/primitive_types_no_defaults.txt,expected_ouput_csv/primitive_types.txt",
- "input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_matching_default.txt,expected_ouput_csv/primitive_types_with_matching_default.txt",
- "input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_with_default.txt"})
- public void testCSVRoundtrip(final String inputCSVFileName, final String inputAvroSchema, final String expectedOuput) throws Exception {
- assumeFalse(isWindowsEnvironment());
- final String data = getResourceAsString(inputCSVFileName);
- final String schemaText = getResourceAsString(inputAvroSchema);
- final String result = getResourceAsString(expectedOuput);
- csvRoundTrip(data, schemaText, result);
- }
-
- private boolean isWindowsEnvironment() {
- return System.getProperty("os.name").toLowerCase().startsWith("windows");
- }
-
- @Test
- @Parameters({"input_csv/union_with_missing_value.txt,input_avro/union_and_mismatch_defaults.txt",
- "input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_mismatch_default.txt"})
- public void testCSVMismatchDefaults(final String inputCSVFileName, final String inputAvroSchema) {
- assumeFalse(isWindowsEnvironment());
- try {
- final String data = getResourceAsString(inputCSVFileName);
- final String schemaText = getResourceAsString(inputAvroSchema);
- Schema schema = new Schema.Parser().parse(schemaText);
-
- ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
- CSVUtils.read(in, '|', schema, '\"');
- }catch (IOException ioe){
- assertTrue(false);
- }catch(IllegalArgumentException iae){
- assertTrue(true);
- }
- }
-
- @Test
- public void testCSVRoundTrip() throws IOException {
- assumeFalse(isWindowsEnvironment());
- NumberFormat numberFormat = DecimalFormat.getInstance();
- numberFormat.setGroupingUsed(false);
- ((DecimalFormat) numberFormat).setParseBigDecimal(true);
-
- //"input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_invalid_scale.txt",
- String decimalLogicalType = "\"fake_transactionid\"|" + numberFormat.format(new BigDecimal(11234567.89));
- String data = getResourceAsString("input_csv/decimal_logicalType.txt");
- String schemaText = getResourceAsString("input_avro/decimal_logicalType_invalid_scale_with_default.txt");
- csvRoundTrip(data, schemaText, decimalLogicalType);
-
- // needs to be set now because scale < precision
- numberFormat.setMaximumIntegerDigits(10);
- numberFormat.setMaximumFractionDigits(3);
- numberFormat.setMinimumFractionDigits(3);
-
- //"input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_valid_scale_with_no_default.txt,expected_ouput_csv/decimal_logicalType.txt",
- decimalLogicalType = "\"fake_transactionid\"|" + numberFormat.format(new BigDecimal(11234567.890));
- data = getResourceAsString("input_csv/decimal_logicalType.txt");
- schemaText = getResourceAsString("input_avro/decimal_logicalType_valid_scale_with_no_default.txt");
-
- //"input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_valid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt",
- decimalLogicalType = "\"fake_transactionid\"|" + numberFormat.format(new BigDecimal(0.000));
- data = getResourceAsString("input_csv/decimal_logicalType_missing_value.txt");
- schemaText = getResourceAsString("input_avro/decimal_logicalType_valid_scale_with_default.txt");
- csvRoundTrip(data, schemaText, decimalLogicalType);
- }
-
- private void csvRoundTrip(final String data, final String schemaText, final String result) {
- Schema schema = new Schema.Parser().parse(schemaText);
-
- // CSV -> AVRO -> CSV
- ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
- GenericRecord record = CSVUtils.read(in, '|', schema, '\"');
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- AvroUtils.write(record, out);
- byte[] avro = out.toByteArray();
-
- in = new ByteArrayInputStream(avro);
- record = AvroUtils.read(in, schema);
- out = new ByteArrayOutputStream();
- CSVUtils.write(record, '|', out);
- byte[] csv = out.toByteArray();
- assertEquals(result, new String(csv, StandardCharsets.UTF_8));
- }
-
- /**
- * Simple wrapper around getting the test resource file that is used by the above test cases
- *
- * @param fileName - the filename of the file to read
- * @return A string that contains the body of the file.
- * @throws IOException - if an error occurs reading the file.
- */
- private String getResourceAsString(String fileName) throws IOException {
- return new String(Files.readAllBytes(FileSystems.getDefault().getPath(classLoader.getResource(fileName).getPath())));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt
deleted file mode 100644
index 1a53f85..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt
+++ /dev/null
@@ -1 +0,0 @@
-"fake_transactionid"|11234567.890
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt
deleted file mode 100644
index 9506ad4..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt
+++ /dev/null
@@ -1 +0,0 @@
-"fake_transactionid"|11234567.89
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt
deleted file mode 100644
index 2309e71..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-"fake_transactionid"|0.000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt
deleted file mode 100644
index 3a9689c..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-"fake_transactionid"|0
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt
deleted file mode 100644
index 77f353f..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt
+++ /dev/null
@@ -1 +0,0 @@
-"this is a simple string."|10|21474836470|1.7976931348623157E308|true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt
deleted file mode 100644
index 095f81e..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-"default_string"|1234|21474836470|1.7976931348623157E308|true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt
deleted file mode 100644
index 83cbf75..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew|13|
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt
deleted file mode 100644
index 1b03c97..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew|21474|blue
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt
deleted file mode 100644
index 9c7abb5..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew|13|blue
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt
deleted file mode 100644
index 54ba8b1..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{
- "name": "trx_table",
- "type": "record",
- "fields": [
- {
- "name": "transactionid",
- "type": ["string", "null"]
- }, {
- "name": "amount",
- "type": "bytes",
- "logicalType": "decimal",
- "precision": 10,
- "scale": 13,
- "default": 0.0
- }]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt
deleted file mode 100644
index 8385fb1..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt
+++ /dev/null
@@ -1,16 +0,0 @@
-{
- "name": "trx_table",
- "type": "record",
- "fields": [
- {
- "name": "transactionid",
- "type": ["string", "null"]
- }, {
- "name": "amount",
- "type": "bytes",
- "logicalType": "decimal",
- "precision": 10,
- "scale": 3,
- "default": 0.0
- }]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt
deleted file mode 100644
index 9878590..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-{
- "name": "trx_table",
- "type": "record",
- "fields": [
- {
- "name": "transactionid",
- "type": ["string", "null"]
- }, {
- "name": "amount",
- "type": "bytes",
- "logicalType": "decimal",
- "precision": 10,
- "scale": 3
- }]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt
deleted file mode 100644
index 934a53c..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-{
- "type":"record",
- "name":"basic_primitive_type_check",
- "fields":[
- {"name":"string","type":"string"},
- {"name":"integer","type":"int"},
- {"name":"long","type":"long"},
- {"name":"double","type":"double"},
- {"name":"boolean","type":"boolean"}
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt
deleted file mode 100644
index abc80ca..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-{
- "type":"record",
- "name":"basic_primitive_type_check",
- "fields":[
- {"name":"string","type":["null","string"],"default":null},
- {"name":"integer","type":["null","int"],"default":null},
- {"name":"long","type":["null","long"],"default":null},
- {"name":"double","type":["null","double"],"default":null},
- {"name":"boolean","type":["null","boolean"],"default":null}
- ]
- }
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt
deleted file mode 100644
index b3ea951..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-{
- "type":"record",
- "name":"basic_primitive_type_check",
- "fields":[
- {"name":"string","type":"string","default":"default_string"},
- {"name":"integer","type":"int","default":1234},
- {"name":"long","type":"long","default":21474836470},
- {"name":"double","type":"double","default":1.7976931348623157E308},
- {"name":"boolean","type":"boolean","default":true}
- ]
- }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt
deleted file mode 100644
index e8f0e28..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-{
- "type":"record",
- "name":"basic_primitive_type_check",
- "fields":[
- {"name":"string","type":"string","default":1234},
- {"name":"integer","type":"int","default":"mismatch_int"},
- {"name":"long","type":"long","default":"mismatch_long"},
- {"name":"double","type":"double","default":"mismatch_double"},
- {"name":"boolean","type":"boolean","default":"mismatch_boolean"}
- ]
- }
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt
deleted file mode 100644
index 442a3a4..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt
+++ /dev/null
@@ -1,18 +0,0 @@
-{
- "namespace": "example.avro",
- "type": "record",
- "name": "User",
- "fields": [{
- "name": "name",
- "type": "string",
- "default": "default_name"
- }, {
- "name": "favorite_number",
- "type": "int",
- "default": 21474
- }, {
- "name": "favorite_color",
- "type": ["null", "string"],
- "default": null
- }]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt
deleted file mode 100644
index 5222074..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt
+++ /dev/null
@@ -1,18 +0,0 @@
-{
- "namespace": "example.avro",
- "type": "record",
- "name": "User",
- "fields": [{
- "name": "name",
- "type": "string",
- "default": "default_name"
- }, {
- "name": "favorite_number",
- "type": "int",
- "default": "mismatched_int_default"
- }, {
- "name": "favorite_color",
- "type": ["null", "string"],
- "default": null
- }]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt
deleted file mode 100644
index 1a53f85..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt
+++ /dev/null
@@ -1 +0,0 @@
-"fake_transactionid"|11234567.890
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt
deleted file mode 100644
index 1ee2a9b..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt
+++ /dev/null
@@ -1 +0,0 @@
-"fake_transactionid"|
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt
deleted file mode 100644
index 77f353f..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt
+++ /dev/null
@@ -1 +0,0 @@
-"this is a simple string."|10|21474836470|1.7976931348623157E308|true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt
deleted file mode 100644
index b60c01b..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-"default_string"||21474836470||true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt
deleted file mode 100644
index 83cbf75..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew|13|
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt
deleted file mode 100644
index 5a706ac..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew||blue
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt
deleted file mode 100644
index 9c7abb5..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew|13|blue
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt
deleted file mode 100644
index 5a706ac..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt
+++ /dev/null
@@ -1 +0,0 @@
-andrew||blue
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
index d99dc64..3cfae30 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
@@ -23,6 +23,18 @@
<packaging>jar</packaging>
<dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
new file mode 100644
index 0000000..13b1d5d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+@Tags({"schema", "registry", "avro", "json", "csv"})
+@CapabilityDescription("Provides a service for registering and accessing schemas. You can register a schema "
+ + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual "
+ + "representation of the actual schema following the syntax and semantics of Avro's Schema format.")
+public class AvroSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
+
+ private final Map<String, String> schemaNameToSchemaMap;
+
+ private static final String LOGICAL_TYPE_DATE = "date";
+ private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
+ private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
+ private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
+ private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
+
+
+ public AvroSchemaRegistry() {
+ this.schemaNameToSchemaMap = new HashMap<>();
+ }
+
+ @OnEnabled
+ public void enable(ConfigurationContext configuratiponContext) throws InitializationException {
+ this.schemaNameToSchemaMap.putAll(configuratiponContext.getProperties().entrySet().stream()
+ .filter(propEntry -> propEntry.getKey().isDynamic())
+ .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue())));
+ }
+
+ @Override
+ public String retrieveSchemaText(String schemaName) {
+ if (!this.schemaNameToSchemaMap.containsKey(schemaName)) {
+ throw new IllegalArgumentException("Failed to find schema; Name: '" + schemaName + ".");
+ } else {
+ return this.schemaNameToSchemaMap.get(schemaName);
+ }
+ }
+
+ @Override
+ public String retrieveSchemaText(String schemaName, Map<String, String> attributes) {
+ throw new UnsupportedOperationException("This version of schema registry does not "
+ + "support this operation, since schemas are only identofied by name.");
+ }
+
+ @Override
+ @OnDisabled
+ public void close() throws Exception {
+ this.schemaNameToSchemaMap.clear();
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(new AvroSchemaValidator())
+ .dynamic(true)
+ .expressionLanguageSupported(true)
+ .build();
+ }
+
+
+ @Override
+ public RecordSchema retrieveSchema(String schemaName) {
+ final String schemaText = this.retrieveSchemaText(schemaName);
+ final Schema schema = new Schema.Parser().parse(schemaText);
+ return createRecordSchema(schema);
+ }
+
+ /**
+ * Converts an Avro Schema to a RecordSchema
+ *
+ * @param avroSchema the Avro Schema to convert
+ * @return the Corresponding Record Schema
+ */
+ private RecordSchema createRecordSchema(final Schema avroSchema) {
+ final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
+ for (final Field field : avroSchema.getFields()) {
+ final String fieldName = field.name();
+ final DataType dataType = determineDataType(field.schema());
+ recordFields.add(new RecordField(fieldName, dataType));
+ }
+
+ final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+ return recordSchema;
+ }
+
+ /**
+ * Returns a DataType for the given Avro Schema
+ *
+ * @param avroSchema the Avro Schema to convert
+ * @return a Data Type that corresponds to the given Avro Schema
+ */
+ private DataType determineDataType(final Schema avroSchema) {
+ final Type avroType = avroSchema.getType();
+
+ final LogicalType logicalType = avroSchema.getLogicalType();
+ if (logicalType != null) {
+ final String logicalTypeName = logicalType.getName();
+ switch (logicalTypeName) {
+ case LOGICAL_TYPE_DATE:
+ return RecordFieldType.DATE.getDataType();
+ case LOGICAL_TYPE_TIME_MILLIS:
+ case LOGICAL_TYPE_TIME_MICROS:
+ return RecordFieldType.TIME.getDataType();
+ case LOGICAL_TYPE_TIMESTAMP_MILLIS:
+ case LOGICAL_TYPE_TIMESTAMP_MICROS:
+ return RecordFieldType.TIMESTAMP.getDataType();
+ }
+ }
+
+ switch (avroType) {
+ case ARRAY:
+ return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType()));
+ case BYTES:
+ case FIXED:
+ return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+ case BOOLEAN:
+ return RecordFieldType.BOOLEAN.getDataType();
+ case DOUBLE:
+ return RecordFieldType.DOUBLE.getDataType();
+ case ENUM:
+ case STRING:
+ return RecordFieldType.STRING.getDataType();
+ case FLOAT:
+ return RecordFieldType.FLOAT.getDataType();
+ case INT:
+ return RecordFieldType.INT.getDataType();
+ case LONG:
+ return RecordFieldType.LONG.getDataType();
+ case RECORD: {
+ final List<Field> avroFields = avroSchema.getFields();
+ final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
+
+ for (final Field field : avroFields) {
+ final String fieldName = field.name();
+ final Schema fieldSchema = field.schema();
+ final DataType fieldType = determineDataType(fieldSchema);
+ recordFields.add(new RecordField(fieldName, fieldType));
+ }
+
+ final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+ return RecordFieldType.RECORD.getRecordDataType(recordSchema);
+ }
+ case NULL:
+ case MAP:
+ return RecordFieldType.RECORD.getDataType();
+ case UNION: {
+ final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream()
+ .filter(s -> s.getType() != Type.NULL)
+ .collect(Collectors.toList());
+
+ if (nonNullSubSchemas.size() == 1) {
+ return determineDataType(nonNullSubSchemas.get(0));
+ }
+
+ final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
+ for (final Schema subSchema : nonNullSubSchemas) {
+ final DataType childDataType = determineDataType(subSchema);
+ possibleChildTypes.add(childDataType);
+ }
+
+ return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
+ }
+ }
+
+ return null;
+ }
+
+ /*
+ * For this implementation 'attributes' argument is ignored since the underlying storage mechanisms
+ * is based strictly on key/value pairs. In other implementation additional attributes may play a role (e.g., version id,)
+ */
+ @Override
+ public RecordSchema retrieveSchema(String schemaName, Map<String, String> attributes) {
+ return this.retrieveSchema(schemaName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
new file mode 100644
index 0000000..32b700f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schemaregistry.services;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+public class AvroSchemaValidator implements Validator {
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(true)
+ .explanation("Expression Language is present")
+ .build();
+ }
+
+ try {
+ new Schema.Parser().parse(input);
+
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(true)
+ .explanation("Schema is valid")
+ .build();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(false)
+ .explanation("Not a valid Avro Schema: " + e.getMessage())
+ .build();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
deleted file mode 100644
index fd5d0c5..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.services;
-
-import java.util.Properties;
-
-import org.apache.nifi.controller.ControllerService;
-
-/**
- * Represents {@link ControllerService} strategy to expose internal and/or
- * integrate with external Schema Registry
- */
-public interface SchemaRegistry extends ControllerService, AutoCloseable {
-
- public static final String SCHEMA_NAME_ATTR = "schema.name";
-
-
- /**
- * Retrieves and returns the textual representation of the schema based on
- * the provided name of the schema available in Schema Registry. Will throw
- * an runtime exception if schema can not be found.
- */
- String retrieveSchemaText(String schemaName);
-
- /**
- * Retrieves and returns the textual representation of the schema based on
- * the provided name of the schema available in Schema Registry and optional
- * additional attributes. Will throw an runtime exception if schema can not
- * be found.
- */
- String retrieveSchemaText(String schemaName, Properties attributes);
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java
deleted file mode 100644
index aaedea2..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.services;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.InitializationException;
-
-@Tags({ "schema", "registry", "avro", "json", "csv" })
-@CapabilityDescription("Provides a service for registering and accessing schemas. You can register schema "
- + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual "
- + "representation of the actual schema.")
-public class SimpleKeyValueSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
-
- private static final List<PropertyDescriptor> propertyDescriptors;
-
- static {
- propertyDescriptors = Collections.emptyList();
- }
-
- private final Map<String, String> schemaNameToSchemaMap;
-
- public SimpleKeyValueSchemaRegistry() {
- this.schemaNameToSchemaMap = new HashMap<>();
- }
-
- @OnEnabled
- public void enable(ConfigurationContext configuratiponContext) throws InitializationException {
- this.schemaNameToSchemaMap.putAll(configuratiponContext.getProperties().entrySet().stream()
- .filter(propEntry -> propEntry.getKey().isDynamic())
- .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue())));
- }
-
- /**
- *
- */
- @Override
- public String retrieveSchemaText(String schemaName) {
- if (!this.schemaNameToSchemaMap.containsKey(schemaName)) {
- throw new IllegalArgumentException("Failed to find schema; Name: '" + schemaName + ".");
- } else {
- return this.schemaNameToSchemaMap.get(schemaName);
- }
- }
-
- @Override
- public String retrieveSchemaText(String schemaName, Properties attributes) {
- throw new UnsupportedOperationException("This version of schema registry does not "
- + "support this operation, since schemas are only identofied by name.");
- }
-
- @Override
- @OnDisabled
- public void close() throws Exception {
- this.schemaNameToSchemaMap.clear();
- }
-
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder().required(false).name(propertyDescriptorName)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true).expressionLanguageSupported(true)
- .build();
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return propertyDescriptors;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 1775b76..a000cd7 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,4 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.schemaregistry.services.SimpleKeyValueSchemaRegistry
\ No newline at end of file
+org.apache.nifi.schemaregistry.services.AvroSchemaRegistry
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java
deleted file mode 100644
index 29179ab..0000000
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.schemaregistry.services;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.schemaregistry.services.SimpleKeyValueSchemaRegistry;
-import org.junit.Test;
-
-public class SimpleKeyValueSchemaRegistryTest {
-
- @Test
- public void validateSchemaRegistrationFromrDynamicProperties() throws Exception {
- String schemaName = "fooSchema";
- ConfigurationContext configContext = mock(ConfigurationContext.class);
- Map<PropertyDescriptor, String> properties = new HashMap<>();
- PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
- .name(schemaName)
- .dynamic(true)
- .build();
- String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
- + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, "
- + "{\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]}, "
- + "{\"name\": \"foo\", \"type\": [\"int\", \"null\"]}, "
- + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}";
- PropertyDescriptor barSchema = new PropertyDescriptor.Builder()
- .name("barSchema")
- .dynamic(false)
- .build();
- properties.put(fooSchema, fooSchemaText);
- properties.put(barSchema, "");
- when(configContext.getProperties()).thenReturn(properties);
- SchemaRegistry delegate = new SimpleKeyValueSchemaRegistry();
- ((SimpleKeyValueSchemaRegistry)delegate).enable(configContext);
-
- String locatedSchemaText = delegate.retrieveSchemaText(schemaName);
- assertEquals(fooSchemaText, locatedSchemaText);
- try {
- locatedSchemaText = delegate.retrieveSchemaText("barSchema");
- fail();
- } catch (Exception e) {
- // ignore
- }
- delegate.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
new file mode 100644
index 0000000..929aab9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Test;
+
+public class TestAvroSchemaRegistry {
+
+ @Test
+ public void validateSchemaRegistrationFromrDynamicProperties() throws Exception {
+ String schemaName = "fooSchema";
+ ConfigurationContext configContext = mock(ConfigurationContext.class);
+ Map<PropertyDescriptor, String> properties = new HashMap<>();
+ PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
+ .name(schemaName)
+ .dynamic(true)
+ .build();
+ String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
+ + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, "
+ + "{\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]}, "
+ + "{\"name\": \"foo\", \"type\": [\"int\", \"null\"]}, "
+ + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}";
+ PropertyDescriptor barSchema = new PropertyDescriptor.Builder()
+ .name("barSchema")
+ .dynamic(false)
+ .build();
+ properties.put(fooSchema, fooSchemaText);
+ properties.put(barSchema, "");
+ when(configContext.getProperties()).thenReturn(properties);
+ SchemaRegistry delegate = new AvroSchemaRegistry();
+ ((AvroSchemaRegistry) delegate).enable(configContext);
+
+ String locatedSchemaText = delegate.retrieveSchemaText(schemaName);
+ assertEquals(fooSchemaText, locatedSchemaText);
+ try {
+ locatedSchemaText = delegate.retrieveSchemaText("barSchema");
+ fail();
+ } catch (Exception e) {
+ // ignore
+ }
+ delegate.close();
+ }
+
+
+ @Test
+ public void validateRecordSchemaRetrieval() throws Exception {
+ String schemaName = "fooSchema";
+ ConfigurationContext configContext = mock(ConfigurationContext.class);
+ Map<PropertyDescriptor, String> properties = new HashMap<>();
+ PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
+ .name(schemaName)
+ .dynamic(true)
+ .build();
+ String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
+ + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, "
+ + "{\"name\": \"favorite_number\", \"type\": \"int\"}, "
+ + "{\"name\": \"foo\", \"type\": \"boolean\"}, "
+ + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}";
+ PropertyDescriptor barSchema = new PropertyDescriptor.Builder()
+ .name("barSchema")
+ .dynamic(false)
+ .build();
+ properties.put(fooSchema, fooSchemaText);
+ properties.put(barSchema, "");
+ when(configContext.getProperties()).thenReturn(properties);
+ SchemaRegistry delegate = new AvroSchemaRegistry();
+ ((AvroSchemaRegistry) delegate).enable(configContext);
+
+ RecordSchema locatedSchema = delegate.retrieveSchema(schemaName);
+ List<RecordField> recordFields = locatedSchema.getFields();
+ assertEquals(4, recordFields.size());
+ assertEquals(RecordFieldType.STRING.getDataType(), recordFields.get(0).getDataType());
+ assertEquals("name", recordFields.get(0).getFieldName());
+ assertEquals(RecordFieldType.INT.getDataType(), recordFields.get(1).getDataType());
+ assertEquals("favorite_number", recordFields.get(1).getFieldName());
+ assertEquals(RecordFieldType.BOOLEAN.getDataType(), recordFields.get(2).getDataType());
+ assertEquals("foo", recordFields.get(2).getFieldName());
+ assertEquals(RecordFieldType.STRING.getDataType(), recordFields.get(3).getDataType());
+ assertEquals("favorite_color", recordFields.get(3).getFieldName());
+ delegate.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-registry-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/pom.xml
index fa6d30f..5287a02 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-registry-bundle/pom.xml
@@ -21,12 +21,7 @@
<packaging>pom</packaging>
<description>A bundle of processors that rely on external service to obtain schema.</description>
- <properties>
- <commons-lang3.version>3.0</commons-lang3.version>
- </properties>
-
<modules>
- <module>nifi-registry-processors</module>
<module>nifi-registry-service</module>
<module>nifi-registry-nar</module>
</modules>
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index e390097..295ae96 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -290,8 +290,8 @@
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
- <artifactId>calcite-example-csv</artifactId>
- <version>1.11.0</version>
+ <artifactId>calcite-core</artifactId>
+ <version>1.12.0</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
index 833a5d6..83a3d4b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
@@ -98,19 +98,22 @@ import org.apache.nifi.util.StopWatch;
+ "that is selected being routed to the relationship whose name is the property name")
public class QueryFlowFile extends AbstractProcessor {
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
- .name("Record Reader")
+ .name("record-reader")
+ .displayName("Record Reader")
.description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
.identifiesControllerService(RowRecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
- .name("Record Writer")
+ .name("record-writer")
+ .displayName("Record Writer")
.description("Specifies the Controller Service to use for writing results to a FlowFile")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
- .name("Include Zero Record FlowFiles")
+ .name("include-zero-record-flowfiles")
+ .displayName("Include Zero Record FlowFiles")
.description("When running the SQL statement against an incoming FlowFile, if the result has no data, "
+ "this property specifies whether or not a FlowFile will be sent to the corresponding relationship")
.expressionLanguageSupported(false)
@@ -119,7 +122,8 @@ public class QueryFlowFile extends AbstractProcessor {
.required(true)
.build();
static final PropertyDescriptor CACHE_SCHEMA = new PropertyDescriptor.Builder()
- .name("Cache Schema")
+ .name("cache-schema")
+ .displayName("Cache Schema")
.description("Parsing the SQL query and deriving the FlowFile's schema is relatively expensive. If this value is set to true, "
+ "the Processor will cache these values so that the Processor is much more efficient and much faster. However, if this is done, "
+ "then the schema that is derived for the first FlowFile processed must apply to all FlowFiles. If all FlowFiles will not have the exact "
@@ -391,7 +395,7 @@ public class QueryFlowFile extends AbstractProcessor {
final Supplier<CalciteConnection> connectionSupplier = () -> {
final Properties properties = new Properties();
- properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.JAVA.name());
+ properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.MYSQL_ANSI.name());
try {
final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
@@ -491,6 +495,15 @@ public class QueryFlowFile extends AbstractProcessor {
private static class SqlValidator implements Validator {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(true)
+ .explanation("Expression Language Present")
+ .build();
+ }
+
final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
final SqlParser parser = SqlParser.create(substituted);
try {
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
index 1a62d14..7daa002 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java
@@ -99,10 +99,6 @@ public class FlowFileEnumerator<InternalType> implements Enumerator<Object> {
}
// Create a new Object array that contains only the desired fields.
- if (row.length <= fields.length) {
- return row;
- }
-
final Object[] filtered = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
final int indexToKeep = fields[i];
@@ -125,7 +121,7 @@ public class FlowFileEnumerator<InternalType> implements Enumerator<Object> {
rawIn = session.read(flowFile);
try {
- recordParser = recordParserFactory.createRecordReader(rawIn, logger);
+ recordParser = recordParserFactory.createRecordReader(flowFile, rawIn, logger);
} catch (final MalformedRecordException | IOException e) {
throw new ProcessException("Failed to reset stream", e);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
index a23dcfa..27f0c42 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java
@@ -136,7 +136,7 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
RecordSchema schema;
try (final InputStream in = session.read(flowFile)) {
- final RecordReader recordParser = recordParserFactory.createRecordReader(in, logger);
+ final RecordReader recordParser = recordParserFactory.createRecordReader(flowFile, in, logger);
schema = recordParser.getSchema();
} catch (final MalformedRecordException | IOException e) {
throw new ProcessException("Failed to determine schema of data records for " + flowFile, e);
@@ -189,7 +189,7 @@ public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable
return typeFactory.createJavaType(String.class);
case ARRAY:
return typeFactory.createJavaType(Object[].class);
- case OBJECT:
+ case RECORD:
return typeFactory.createJavaType(Object.class);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
index 1cc7923..0dffc0d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html
@@ -41,7 +41,8 @@
</p>
<p>
- The SQL syntax that is supported by this Processor is ANSI SQL and is powered by Apache Calcite.
+ The SQL syntax that is supported by this Processor is ANSI SQL and is powered by Apache Calcite. Please
+ note that identifiers are quoted using double-quotes, and column names/labels are case-insensitive.
</p>
</body>
</html>
\ No newline at end of file