You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/11 23:33:36 UTC

[13/19] nifi git commit: NIFI-1280 added support for RecordSchema in SchemaRegistry

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
new file mode 100644
index 0000000..2102813
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData.Array;
+import org.apache.avro.generic.GenericData.StringType;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.junit.Test;
+
+public class TestWriteAvroResult {
+
+    @Test
+    public void testLogicalTypes() throws IOException, ParseException {
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc"));
+        final WriteAvroResult writer = new WriteAvroResult(schema);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("timeMillis", RecordFieldType.TIME.getDataType()));
+        fields.add(new RecordField("timeMicros", RecordFieldType.TIME.getDataType()));
+        fields.add(new RecordField("timestampMillis", RecordFieldType.TIMESTAMP.getDataType()));
+        fields.add(new RecordField("timestampMicros", RecordFieldType.TIMESTAMP.getDataType()));
+        fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        final String expectedTime = "2017-04-04 14:20:33.000";
+        final DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+        df.setTimeZone(TimeZone.getTimeZone("gmt"));
+        final long timeLong = df.parse(expectedTime).getTime();
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("timeMillis", new Time(timeLong));
+        values.put("timeMicros", new Time(timeLong));
+        values.put("timestampMillis", new Timestamp(timeLong));
+        values.put("timestampMicros", new Timestamp(timeLong));
+        values.put("date", new Date(timeLong));
+        final Record record = new MapRecord(recordSchema, values);
+
+        final byte[] data;
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            writer.write(RecordSet.of(record.getSchema(), record), baos);
+            data = baos.toByteArray();
+        }
+
+        try (final InputStream in = new ByteArrayInputStream(data)) {
+            final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>());
+            final Schema avroSchema = dataFileStream.getSchema();
+            GenericData.setStringType(avroSchema, StringType.String);
+
+            final GenericRecord avroRecord = dataFileStream.next();
+            final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60);
+            final long millisSinceMidnight = secondsSinceMidnight * 1000L;
+
+            assertEquals((int) millisSinceMidnight, avroRecord.get("timeMillis"));
+            assertEquals(millisSinceMidnight * 1000L, avroRecord.get("timeMicros"));
+            assertEquals(timeLong, avroRecord.get("timestampMillis"));
+            assertEquals(timeLong * 1000L, avroRecord.get("timestampMicros"));
+            assertEquals(17260, avroRecord.get("date"));
+        }
+    }
+
+
+    @Test
+    public void testDataTypes() throws IOException {
+        // TODO: Test Enums
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/datatypes.avsc"));
+        final WriteAvroResult writer = new WriteAvroResult(schema);
+
+        final List<RecordField> subRecordFields = Collections.singletonList(new RecordField("field1", RecordFieldType.STRING.getDataType()));
+        final RecordSchema subRecordSchema = new SimpleRecordSchema(subRecordFields);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("string", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("int", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("long", RecordFieldType.LONG.getDataType()));
+        fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
+        fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType()));
+        fields.add(new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()));
+        fields.add(new RecordField("bytes", RecordFieldType.ARRAY.getChoiceDataType(Collections.singletonList(RecordFieldType.BYTE.getDataType()))));
+        fields.add(new RecordField("nullOrLong", RecordFieldType.LONG.getDataType()));
+        fields.add(new RecordField("array", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())));
+        fields.add(new RecordField("record", RecordFieldType.RECORD.getRecordDataType(subRecordSchema)));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        final Record innerRecord = new MapRecord(subRecordSchema, Collections.singletonMap("field1", "hello"));
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("string", "hello");
+        values.put("int", 8);
+        values.put("long", 42L);
+        values.put("double", 3.14159D);
+        values.put("float", 1.23456F);
+        values.put("boolean", true);
+        values.put("bytes", AvroTypeUtil.convertByteArray("hello".getBytes()));
+        values.put("nullOrLong", null);
+        values.put("array", new Integer[] {1, 2, 3});
+        values.put("record", innerRecord);
+
+        final Record record = new MapRecord(recordSchema, values);
+
+        final byte[] data;
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            writer.write(RecordSet.of(record.getSchema(), record), baos);
+            data = baos.toByteArray();
+        }
+
+        try (final InputStream in = new ByteArrayInputStream(data)) {
+            final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>());
+            final Schema avroSchema = dataFileStream.getSchema();
+            GenericData.setStringType(avroSchema, StringType.String);
+
+            final GenericRecord avroRecord = dataFileStream.next();
+            assertMatch(record, avroRecord);
+        }
+    }
+
+    private void assertMatch(final Record record, final GenericRecord avroRecord) {
+        for (final String fieldName : record.getSchema().getFieldNames()) {
+            Object avroValue = avroRecord.get(fieldName);
+            final Object recordValue = record.getValue(fieldName);
+
+            if (recordValue instanceof String) {
+                assertNotNull(fieldName + " should not have been null", avroValue);
+                avroValue = avroValue.toString();
+            }
+
+            if (recordValue instanceof Object[] && avroValue instanceof ByteBuffer) {
+                final ByteBuffer bb = (ByteBuffer) avroValue;
+                final Object[] objectArray = (Object[]) recordValue;
+                assertEquals("For field " + fieldName + ", byte buffer remaining should have been " + objectArray.length + " but was " + bb.remaining(),
+                    objectArray.length, bb.remaining());
+
+                for (int i = 0; i < objectArray.length; i++) {
+                    assertEquals(objectArray[i], bb.get());
+                }
+            } else if (recordValue instanceof Object[]) {
+                assertTrue(fieldName + " should have been instanceof Array", avroValue instanceof Array);
+                final Array<?> avroArray = (Array<?>) avroValue;
+                final Object[] recordArray = (Object[]) recordValue;
+                assertEquals(fieldName + " not equal", recordArray.length, avroArray.size());
+                for (int i = 0; i < recordArray.length; i++) {
+                    assertEquals(fieldName + "[" + i + "] not equal", recordArray[i], avroArray.get(i));
+                }
+            } else if (recordValue instanceof byte[]) {
+                final ByteBuffer bb = ByteBuffer.wrap((byte[]) recordValue);
+                assertEquals(fieldName + " not equal", bb, avroValue);
+            } else if (recordValue instanceof Record) {
+                assertMatch((Record) recordValue, (GenericRecord) avroValue);
+            } else {
+                assertEquals(fieldName + " not equal", recordValue, avroValue);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
index 1e53d89..cb790f1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
@@ -20,18 +20,24 @@ package org.apache.nifi.csv;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.List;
-import java.util.Map;
+import java.util.TimeZone;
 
+import org.apache.commons.csv.CSVFormat;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.junit.Assert;
@@ -39,21 +45,50 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 public class TestCSVRecordReader {
-    private final DataType stringDataType = RecordFieldType.STRING.getDataType();
     private final DataType doubleDataType = RecordFieldType.DOUBLE.getDataType();
-    private final DataType timeDataType = RecordFieldType.TIME.getDataType();
+    private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"');
+
+    private List<RecordField> getDefaultFields() {
+        final List<RecordField> fields = new ArrayList<>();
+        for (final String fieldName : new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) {
+            fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
+        }
+        return fields;
+    }
+
+    @Test
+    public void testDate() throws IOException, MalformedRecordException {
+        final String text = "date\n11/30/1983";
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
+            final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format,
+                "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+
+            final Record record = reader.nextRecord();
+            final java.sql.Date date = (Date) record.getValue("date");
+            final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("gmt"));
+            calendar.setTimeInMillis(date.getTime());
+
+            assertEquals(1983, calendar.get(Calendar.YEAR));
+            assertEquals(10, calendar.get(Calendar.MONTH));
+            assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+        }
+    }
 
     @Test
     public void testSimpleParse() throws IOException, MalformedRecordException {
-        final Map<String, DataType> overrides = new HashMap<>();
-        overrides.put("balance", doubleDataType);
-        overrides.put("other", timeDataType);
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
 
-        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"))) {
-            final CSVRecordReader reader = new CSVRecordReader(fis, null, overrides);
+        final RecordSchema schema = new SimpleRecordSchema(fields);
 
-            final RecordSchema schema = reader.getSchema();
-            verifyFields(schema);
+        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"))) {
+            final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format,
+                RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
 
             final Object[] record = reader.nextRecord().getValues();
             final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
@@ -65,14 +100,14 @@ public class TestCSVRecordReader {
 
     @Test
     public void testMultipleRecords() throws IOException, MalformedRecordException {
-        final Map<String, DataType> overrides = new HashMap<>();
-        overrides.put("balance", doubleDataType);
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
 
-        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"))) {
-            final CSVRecordReader reader = new CSVRecordReader(fis, null, overrides);
+        final RecordSchema schema = new SimpleRecordSchema(fields);
 
-            final RecordSchema schema = reader.getSchema();
-            verifyFields(schema);
+        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"))) {
+            final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format,
+                RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
 
             final Object[] firstRecord = reader.nextRecord().getValues();
             final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
@@ -88,14 +123,14 @@ public class TestCSVRecordReader {
 
     @Test
     public void testExtraWhiteSpace() throws IOException, MalformedRecordException {
-        final Map<String, DataType> overrides = new HashMap<>();
-        overrides.put("balance", doubleDataType);
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);
 
-        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"))) {
-            final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), overrides);
+        final RecordSchema schema = new SimpleRecordSchema(fields);
 
-            final RecordSchema schema = reader.getSchema();
-            verifyFields(schema);
+        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"))) {
+            final CSVRecordReader reader = new CSVRecordReader(fis, Mockito.mock(ComponentLog.class), schema, format,
+                RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
 
             final Object[] firstRecord = reader.nextRecord().getValues();
             final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
@@ -108,15 +143,4 @@ public class TestCSVRecordReader {
             assertNull(reader.nextRecord());
         }
     }
-
-    private void verifyFields(final RecordSchema schema) {
-        final List<String> fieldNames = schema.getFieldNames();
-        final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
-        assertEquals(expectedFieldNames, fieldNames);
-
-        final List<DataType> dataTypes = schema.getDataTypes();
-        final List<DataType> expectedDataTypes = Arrays.asList(stringDataType, stringDataType, doubleDataType,
-            stringDataType, stringDataType, stringDataType, stringDataType, stringDataType);
-        assertEquals(expectedDataTypes, dataTypes);
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
index 04f8479..1e8997b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
@@ -26,12 +26,16 @@ import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TimeZone;
 
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
@@ -47,7 +51,8 @@ public class TestWriteCSVResult {
 
     @Test
     public void testDataTypes() throws IOException {
-        final WriteCSVResult result = new WriteCSVResult(RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
+        final CSVFormat csvFormat = CSVFormat.DEFAULT.withQuoteMode(QuoteMode.ALL).withRecordSeparator("\n");
+        final WriteCSVResult result = new WriteCSVResult(csvFormat, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
 
         final StringBuilder headerBuilder = new StringBuilder();
         final List<RecordField> fields = new ArrayList<>();
@@ -57,7 +62,7 @@ public class TestWriteCSVResult {
                 possibleTypes.add(RecordFieldType.INT.getDataType());
                 possibleTypes.add(RecordFieldType.LONG.getDataType());
 
-                fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType(possibleTypes)));
+                fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getChoiceDataType(possibleTypes)));
             } else {
                 fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType()));
             }
@@ -81,7 +86,7 @@ public class TestWriteCSVResult {
         valueMap.put("date", new Date(now));
         valueMap.put("time", new Time(now));
         valueMap.put("timestamp", new Timestamp(now));
-        valueMap.put("object", null);
+        valueMap.put("record", null);
         valueMap.put("choice", 48L);
         valueMap.put("array", null);
 
@@ -105,9 +110,9 @@ public class TestWriteCSVResult {
         final StringBuilder expectedBuilder = new StringBuilder();
         expectedBuilder.append("\"string\",\"true\",\"1\",\"c\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",");
 
-        final String dateValue = new SimpleDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now);
-        final String timeValue = new SimpleDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now);
-        final String timestampValue = new SimpleDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()).format(now);
+        final String dateValue = getDateFormat(RecordFieldType.DATE.getDefaultFormat()).format(now);
+        final String timeValue = getDateFormat(RecordFieldType.TIME.getDefaultFormat()).format(now);
+        final String timestampValue = getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()).format(now);
 
         expectedBuilder.append('"').append(dateValue).append('"').append(',');
         expectedBuilder.append('"').append(timeValue).append('"').append(',');
@@ -118,4 +123,10 @@ public class TestWriteCSVResult {
         assertEquals(expectedValues, values);
     }
 
+    private DateFormat getDateFormat(final String format) {
+        final DateFormat df = new SimpleDateFormat(format);
+        df.setTimeZone(TimeZone.getTimeZone("gmt"));
+        return df;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
index 3757ab1..a741ad1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
@@ -28,7 +28,6 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
-import java.util.Collections;
 
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.record.Record;
@@ -46,7 +45,7 @@ public class TestGrokRecordReader {
             grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
             grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}");
 
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, Collections.emptyMap());
+            final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, null);
 
             final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", "FATAL", "FINE"};
             final String[] messages = new String[] {"Test Message 1", "Red", "Green", "Blue", "Yellow"};
@@ -76,7 +75,7 @@ public class TestGrokRecordReader {
         final String msg = "2016-08-04 13:26:32,473 INFO [Leader Election Notification Thread-1] o.a.n.LoggerClass \n"
             + "org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces";
         final InputStream bais = new ByteArrayInputStream(msg.getBytes(StandardCharsets.UTF_8));
-        final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, Collections.emptyMap());
+        final GrokRecordReader deserializer = new GrokRecordReader(bais, grok, null);
 
         final Object[] values = deserializer.nextRecord().getValues();
 
@@ -99,7 +98,7 @@ public class TestGrokRecordReader {
             grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
             grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}");
 
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, Collections.emptyMap());
+            final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, null);
 
             final String[] logLevels = new String[] {"INFO", "INFO", "INFO", "WARN", "WARN"};
 
@@ -123,7 +122,7 @@ public class TestGrokRecordReader {
             grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
             grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{DATA:class} %{GREEDYDATA:message}?");
 
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, Collections.emptyMap());
+            final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, null);
 
             final String[] logLevels = new String[] {"INFO", "INFO", "ERROR", "WARN", "WARN"};
 
@@ -155,7 +154,7 @@ public class TestGrokRecordReader {
             grok.addPatternFromFile("src/main/resources/default-grok-patterns.txt");
             grok.compile("%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}");
 
-            final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, Collections.emptyMap());
+            final GrokRecordReader deserializer = new GrokRecordReader(fis, grok, null);
 
             final String[] logLevels = new String[] {"INFO", "ERROR", "INFO"};
             final String[] messages = new String[] {"message without stack trace",

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
index fa41396..11e2828 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
@@ -26,17 +26,18 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.junit.Assert;
@@ -47,6 +48,10 @@ import org.mockito.Mockito;
 import com.jayway.jsonpath.JsonPath;
 
 public class TestJsonPathRowRecordReader {
+    private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
+    private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
+    private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
+
     private final LinkedHashMap<String, JsonPath> allJsonPaths = new LinkedHashMap<>();
 
     @Before
@@ -63,12 +68,36 @@ public class TestJsonPathRowRecordReader {
         allJsonPaths.put("country", JsonPath.compile("$.country"));
     }
 
+
+    private List<RecordField> getDefaultFields() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+        fields.add(new RecordField("address", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("city", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("state", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("zipCode", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("country", RecordFieldType.STRING.getDataType()));
+        return fields;
+    }
+
+    private RecordSchema getAccountSchema() {
+        final List<RecordField> accountFields = new ArrayList<>();
+        accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+
+        final RecordSchema accountSchema = new SimpleRecordSchema(accountFields);
+        return accountSchema;
+    }
+
+
     @Test
     public void testReadArray() throws IOException, MalformedRecordException {
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
-            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-            final RecordSchema schema = reader.getSchema();
+        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
+            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -91,10 +120,10 @@ public class TestJsonPathRowRecordReader {
 
     @Test
     public void testSingleJsonElement() throws IOException, MalformedRecordException {
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
-            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-            final RecordSchema schema = reader.getSchema();
+        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
+            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -119,17 +148,20 @@ public class TestJsonPathRowRecordReader {
         final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
         jsonPaths.put("account", JsonPath.compile("$.account"));
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json"));
-            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) {
+        final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("account", accountType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
 
-            final RecordSchema schema = reader.getSchema();
+        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json"));
+            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "account"});
             assertEquals(expectedFieldNames, fieldNames);
 
             final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.STRING,
+            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE,
                 RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD});
             assertEquals(expectedTypes, dataTypes);
 
@@ -152,10 +184,15 @@ public class TestJsonPathRowRecordReader {
         final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
         jsonPaths.put("accounts", JsonPath.compile("$.accounts"));
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json"));
-            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) {
+        final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("accounts", accountsType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
 
-            final RecordSchema schema = reader.getSchema();
+
+        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json"));
+            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {
@@ -163,7 +200,7 @@ public class TestJsonPathRowRecordReader {
             assertEquals(expectedFieldNames, fieldNames);
 
             final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.STRING,
+            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE,
                 RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY});
             assertEquals(expectedTypes, dataTypes);
 
@@ -177,17 +214,17 @@ public class TestJsonPathRowRecordReader {
             final Object[] array = (Object[]) lastRecord;
             assertEquals(2, array.length);
             final Object firstElement = array[0];
-            assertTrue(firstElement instanceof Map);
+            assertTrue(firstElement instanceof Record);
 
-            final Map<?, ?> firstMap = (Map<?, ?>) firstElement;
-            assertEquals(42, firstMap.get("id"));
-            assertEquals(4750.89D, firstMap.get("balance"));
+            final Record firstRecord = (Record) firstElement;
+            assertEquals(42, firstRecord.getValue("id"));
+            assertEquals(4750.89D, firstRecord.getValue("balance"));
 
             final Object secondElement = array[1];
-            assertTrue(secondElement instanceof Map);
-            final Map<?, ?> secondMap = (Map<?, ?>) secondElement;
-            assertEquals(43, secondMap.get("id"));
-            assertEquals(48212.38D, secondMap.get("balance"));
+            assertTrue(secondElement instanceof Record);
+            final Record secondRecord = (Record) secondElement;
+            assertEquals(43, secondRecord.getValue("id"));
+            assertEquals(48212.38D, secondRecord.getValue("balance"));
 
             assertNull(reader.nextRecord());
         }
@@ -195,10 +232,10 @@ public class TestJsonPathRowRecordReader {
 
     @Test
     public void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
-            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-            final RecordSchema schema = reader.getSchema();
+        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
+            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -226,11 +263,14 @@ public class TestJsonPathRowRecordReader {
     public void testReadArrayDifferentSchemasWithOverride() throws IOException, MalformedRecordException {
         final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
         jsonPaths.put("address2", JsonPath.compile("$.address2"));
-        final Map<String, DataType> typeOverrides = Collections.singletonMap("address2", RecordFieldType.STRING.getDataType());
+
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("address2", RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
 
         try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
-            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, typeOverrides, in, Mockito.mock(ComponentLog.class))) {
-            final RecordSchema schema = reader.getSchema();
+            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"});
@@ -259,10 +299,13 @@ public class TestJsonPathRowRecordReader {
         final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
         jsonPaths.put("accountIds", JsonPath.compile("$.accountIds"));
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/primitive-type-array.json"));
-            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, Collections.emptyMap(), in, Mockito.mock(ComponentLog.class))) {
+        final List<RecordField> fields = getDefaultFields();
+        final DataType idsType = RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType());
+        fields.add(new RecordField("accountIds", idsType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
 
-            final RecordSchema schema = reader.getSchema();
+        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/primitive-type-array.json"));
+            final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "accountIds"});

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index c5ee0e3..2422206 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -25,8 +25,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -34,8 +34,10 @@ import java.util.stream.Collectors;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.junit.Assert;
@@ -43,13 +45,38 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 public class TestJsonTreeRowRecordReader {
+    private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
+    private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
+    private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
+
+    private List<RecordField> getDefaultFields() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+        fields.add(new RecordField("address", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("city", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("state", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("zipCode", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("country", RecordFieldType.STRING.getDataType()));
+        return fields;
+    }
+
+    private RecordSchema getAccountSchema() {
+        final List<RecordField> accountFields = new ArrayList<>();
+        accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+
+        final RecordSchema accountSchema = new SimpleRecordSchema(accountFields);
+        return accountSchema;
+    }
 
     @Test
     public void testReadArray() throws IOException, MalformedRecordException {
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-            final RecordSchema schema = reader.getSchema();
+        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
+            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -72,10 +99,10 @@ public class TestJsonTreeRowRecordReader {
 
     @Test
     public void testSingleJsonElement() throws IOException, MalformedRecordException {
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-            final RecordSchema schema = reader.getSchema();
+        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
+            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -95,14 +122,14 @@ public class TestJsonTreeRowRecordReader {
 
     @Test
     public void testElementWithNestedData() throws IOException, MalformedRecordException {
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) {
-
-            final RecordSchema schema = reader.getSchema();
+        final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("account", accountType));
+        fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
 
-            final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "address", "city", "state", "zipCode", "country", "account"});
-            assertEquals(expectedFieldNames, fieldNames);
+        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json"));
+            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
             final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
@@ -125,10 +152,16 @@ public class TestJsonTreeRowRecordReader {
 
     @Test
     public void testElementWithNestedArray() throws IOException, MalformedRecordException {
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) {
+        final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("accounts", accountsType));
+        fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
 
-            final RecordSchema schema = reader.getSchema();
+        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json"));
+            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {
@@ -153,10 +186,10 @@ public class TestJsonTreeRowRecordReader {
 
     @Test
     public void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-            final RecordSchema schema = reader.getSchema();
+        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
+            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -185,10 +218,12 @@ public class TestJsonTreeRowRecordReader {
         final Map<String, DataType> overrides = new HashMap<>();
         overrides.put("address2", RecordFieldType.STRING.getDataType());
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), overrides)) {
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("address2", RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
 
-            final RecordSchema schema = reader.getSchema();
+        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
+            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"});
@@ -214,13 +249,10 @@ public class TestJsonTreeRowRecordReader {
 
     @Test
     public void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws IOException, MalformedRecordException {
-        final Map<String, DataType> overrides = new HashMap<>();
-        overrides.put("balance", RecordFieldType.DOUBLE.getDataType());
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
         try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-optional-balance.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), overrides)) {
-
-            final RecordSchema schema = reader.getSchema();
+            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
             final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
@@ -247,8 +279,22 @@ public class TestJsonTreeRowRecordReader {
 
     @Test
     public void testReadUnicodeCharacters() throws IOException, MalformedRecordException {
+
+        final List<RecordField> fromFields = new ArrayList<>();
+        fromFields.add(new RecordField("id", RecordFieldType.LONG.getDataType()));
+        fromFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+        final RecordSchema fromSchema = new SimpleRecordSchema(fromFields);
+        final DataType fromType = RecordFieldType.RECORD.getRecordDataType(fromSchema);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("created_at", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("id", RecordFieldType.LONG.getDataType()));
+        fields.add(new RecordField("unicode", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("from", fromType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
         try (final InputStream in = new FileInputStream(new File("src/test/resources/json/json-with-unicode.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), Collections.emptyMap())) {
+            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index f9849ba..6119d36 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -27,12 +27,14 @@ import java.nio.file.Paths;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TimeZone;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -60,14 +62,17 @@ public class TestWriteJsonResult {
                 possibleTypes.add(RecordFieldType.INT.getDataType());
                 possibleTypes.add(RecordFieldType.LONG.getDataType());
 
-                fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType(possibleTypes)));
+                fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getChoiceDataType(possibleTypes)));
             } else {
                 fields.add(new RecordField(fieldType.name().toLowerCase(), fieldType.getDataType()));
             }
         }
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        final long time = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS").parse("2017/01/01 17:00:00.000").getTime();
+        final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
+        df.setTimeZone(TimeZone.getTimeZone("gmt"));
+        final long time = df.parse("2017/01/01 17:00:00.000").getTime();
+
         final Map<String, Object> valueMap = new LinkedHashMap<>();
         valueMap.put("string", "string");
         valueMap.put("boolean", true);

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc
new file mode 100644
index 0000000..cc7f60e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/datatypes.avsc
@@ -0,0 +1,47 @@
+{
+  "namespace": "nifi",
+  "name": "data_types",
+  "type": "record",
+  "fields": [
+    {
+      "name": "string",
+      "type": "string"
+    }, {
+      "name": "int",
+      "type": "int"
+    }, {
+      "name": "long",
+      "type": "long"
+    }, {
+      "name": "double",
+      "type": "double"
+    }, {
+      "name": "float",
+      "type": "float"
+    }, {
+      "name": "boolean",
+      "type": "boolean"
+    }, {
+      "name": "bytes",
+      "type": "bytes"
+    }, {
+      "name": "nullOrLong",
+      "type": [ "null", "long" ]
+    }, {
+      "name": "array",
+      "type" : {
+      	"type": "array",
+      	"items": "int"
+      }
+    }, {
+      "name": "record",
+      "type": {
+	      "type": "record",
+	      "name": "subRecord",
+	      "fields": [
+	      	 { "name": "field1", "type": "string" }
+	      ]
+      }
+    }
+ ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc
new file mode 100644
index 0000000..d8315b2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/logical-types.avsc
@@ -0,0 +1,34 @@
+{
+  "namespace": "nifi",
+  "name": "data_types",
+  "type": "record",
+  "fields": [
+    {
+    	"name" : "timeMillis",
+    	"type": {
+    		"type": "int",
+    		"logicalType": "time-millis"
+    	}
+    }, {
+    	"name" : "timeMicros", "type": {
+    		"type" : "long",
+    		"logicalType" : "time-micros"
+		}
+    }, {
+    	"name" : "timestampMillis", "type": {
+    		"type" : "long",
+    		"logicalType" : "timestamp-millis"
+    	}
+	}, {
+    	"name" : "timestampMicros", "type": {
+			"type" : "long",
+			"logicalType" : "timestamp-micros"
+    	}
+    }, {
+		"name" : "date", "type": {
+			"type" : "int",
+			"logicalType" : "date"
+		}
+	}
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
new file mode 100644
index 0000000..265eb71
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/pom.xml
@@ -0,0 +1,32 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+        contributor license agreements. See the NOTICE file distributed with this 
+        work for additional information regarding copyright ownership. The ASF licenses 
+        this file to You under the Apache License, Version 2.0 (the "License"); you 
+        may not use this file except in compliance with the License. You may obtain 
+        a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
+        required by applicable law or agreed to in writing, software distributed 
+        under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+        OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+        the specific language governing permissions and limitations under the License. -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-standard-services</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-schema-registry-service-api</artifactId>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
new file mode 100644
index 0000000..68c2461
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import java.util.Map;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+/**
+ * Represents {@link ControllerService} strategy to expose internal and/or
+ * integrate with external Schema Registry
+ */
+public interface SchemaRegistry extends ControllerService, AutoCloseable {
+
+    public static final String SCHEMA_NAME_ATTR = "schema.name";
+
+
+    /**
+     * Retrieves and returns the textual representation of the schema based on
+     * the provided name of the schema available in Schema Registry. Will throw
+     * an runtime exception if schema can not be found.
+     */
+    String retrieveSchemaText(String schemaName);
+
+    /**
+     * Retrieves and returns the textual representation of the schema based on
+     * the provided name of the schema available in Schema Registry and optional
+     * additional attributes. Will throw an runtime exception if schema can not
+     * be found.
+     */
+    String retrieveSchemaText(String schemaName, Map<String, String> attributes);
+
+
+    RecordSchema retrieveSchema(String schemaName);
+
+
+    RecordSchema retrieveSchema(String schemaName, Map<String, String> attributes);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
index eae3515..5cee52e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
@@ -58,6 +58,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-record-serialization-service-api</artifactId>
             <scope>compile</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/pom.xml
index 3948a1b..4fac7d2 100644
--- a/nifi-nar-bundles/nifi-standard-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/pom.xml
@@ -35,6 +35,7 @@
         <module>nifi-dbcp-service-bundle</module>
         <module>nifi-hbase-client-service-api</module>
         <module>nifi-hbase_1_1_2-client-service-bundle</module>
+        <module>nifi-schema-registry-service-api</module>
         <module>nifi-record-serialization-service-api</module>
         <module>nifi-record-serialization-services-bundle</module>
     </modules>

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 057832b..0173b04 100644
--- a/pom.xml
+++ b/pom.xml
@@ -681,7 +681,7 @@ language governing permissions and limitations under the License. -->
             <dependency>
                 <groupId>org.apache.avro</groupId>
                 <artifactId>avro</artifactId>
-                <version>1.7.7</version>
+                <version>1.8.1</version>
             </dependency>
             <dependency>
                 <groupId>com.sun.jersey</groupId>
@@ -921,6 +921,11 @@ language governing permissions and limitations under the License. -->
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-registry-service</artifactId>
+                <version>1.2.0-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-resources</artifactId>
                 <version>1.2.0-SNAPSHOT</version>
                 <classifier>resources</classifier>
@@ -971,6 +976,11 @@ language governing permissions and limitations under the License. -->
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-schema-registry-service-api</artifactId>
+                <version>1.2.0-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-distributed-cache-services-nar</artifactId>
                 <version>1.2.0-SNAPSHOT</version>
                 <type>nar</type>