You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/04/24 21:03:51 UTC

[5/7] nifi git commit: NIFI-3682: This closes #1682. Add Schema Access Strategy and Schema Write Strategy Record Readers and Writers; bug fixes.

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
deleted file mode 100644
index 8e1c7ed..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.standard.util.record.MockRecordParser;
-import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestQueryFlowFile {
-
-    static {
-        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
-        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard.SQLTransform", "debug");
-    }
-
-    private static final String REL_NAME = "success";
-
-    @Test
-    public void testSimple() throws InitializationException, IOException, SQLException {
-        final MockRecordParser parser = new MockRecordParser();
-        parser.addSchemaField("name", RecordFieldType.STRING);
-        parser.addSchemaField("age", RecordFieldType.INT);
-        parser.addRecord("Tom", 49);
-
-        final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
-
-        final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
-        runner.addControllerService("parser", parser);
-        runner.enableControllerService(parser);
-        runner.addControllerService("writer", writer);
-        runner.enableControllerService(writer);
-
-        runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''");
-        runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
-        runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
-
-        final int numIterations = 1;
-        for (int i = 0; i < numIterations; i++) {
-            runner.enqueue(new byte[0]);
-        }
-
-        runner.setThreadCount(4);
-        runner.run(2 * numIterations);
-
-        runner.assertTransferCount(REL_NAME, 1);
-        final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
-        System.out.println(new String(out.toByteArray()));
-        out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
-    }
-
-    @Test
-    public void testParseFailure() throws InitializationException, IOException, SQLException {
-        final MockRecordParser parser = new MockRecordParser();
-        parser.addSchemaField("name", RecordFieldType.STRING);
-        parser.addSchemaField("age", RecordFieldType.INT);
-        parser.addRecord("Tom", 49);
-
-        final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
-
-        final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
-        runner.addControllerService("parser", parser);
-        runner.enableControllerService(parser);
-        runner.addControllerService("writer", writer);
-        runner.enableControllerService(writer);
-
-        runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''");
-        runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
-        runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
-
-        final int numIterations = 1;
-        for (int i = 0; i < numIterations; i++) {
-            runner.enqueue(new byte[0]);
-        }
-
-        runner.setThreadCount(4);
-        runner.run(2 * numIterations);
-
-        runner.assertTransferCount(REL_NAME, 1);
-        final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
-        System.out.println(new String(out.toByteArray()));
-        out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
-    }
-
-
-    @Test
-    public void testTransformCalc() throws InitializationException, IOException, SQLException {
-        final MockRecordParser parser = new MockRecordParser();
-        parser.addSchemaField("ID", RecordFieldType.INT);
-        parser.addSchemaField("AMOUNT1", RecordFieldType.FLOAT);
-        parser.addSchemaField("AMOUNT2", RecordFieldType.FLOAT);
-        parser.addSchemaField("AMOUNT3", RecordFieldType.FLOAT);
-
-        parser.addRecord("008", 10.05F, 15.45F, 89.99F);
-        parser.addRecord("100", 20.25F, 25.25F, 45.25F);
-        parser.addRecord("105", 20.05F, 25.05F, 45.05F);
-        parser.addRecord("200", 34.05F, 25.05F, 75.05F);
-
-        final MockRecordWriter writer = new MockRecordWriter("\"NAME\",\"POINTS\"");
-
-        final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
-        runner.addControllerService("parser", parser);
-        runner.enableControllerService(parser);
-        runner.addControllerService("writer", writer);
-        runner.enableControllerService(writer);
-
-        runner.setProperty(REL_NAME, "select ID, AMOUNT1+AMOUNT2+AMOUNT3 as TOTAL from FLOWFILE where ID=100");
-        runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
-        runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
-
-        runner.enqueue(new byte[0]);
-        runner.run();
-
-        runner.assertTransferCount(REL_NAME, 1);
-        final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
-
-        out.assertContentEquals("\"NAME\",\"POINTS\"\n\"100\",\"90.75\"\n");
-    }
-
-
-    @Test
-    public void testAggregateFunction() throws InitializationException, IOException {
-        final MockRecordParser parser = new MockRecordParser();
-        parser.addSchemaField("name", RecordFieldType.STRING);
-        parser.addSchemaField("points", RecordFieldType.INT);
-        parser.addRecord("Tom", 1);
-        parser.addRecord("Jerry", 2);
-        parser.addRecord("Tom", 99);
-
-        final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
-
-        final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
-        runner.addControllerService("parser", parser);
-        runner.enableControllerService(parser);
-        runner.addControllerService("writer", writer);
-        runner.enableControllerService(writer);
-
-        runner.setProperty(REL_NAME, "select name, sum(points) as points from FLOWFILE GROUP BY name");
-        runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
-        runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
-
-        runner.enqueue("");
-        runner.run();
-
-        runner.assertTransferCount(REL_NAME, 1);
-        final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS).get(0);
-        flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n");
-    }
-
-    @Test
-    public void testColumnNames() throws InitializationException, IOException {
-        final MockRecordParser parser = new MockRecordParser();
-        parser.addSchemaField("name", RecordFieldType.STRING);
-        parser.addSchemaField("points", RecordFieldType.INT);
-        parser.addSchemaField("greeting", RecordFieldType.STRING);
-        parser.addRecord("Tom", 1, "Hello");
-        parser.addRecord("Jerry", 2, "Hi");
-        parser.addRecord("Tom", 99, "Howdy");
-
-        final List<String> colNames = new ArrayList<>();
-        colNames.add("name");
-        colNames.add("points");
-        colNames.add("greeting");
-        colNames.add("FAV_GREETING");
-        final ResultSetValidatingRecordWriter writer = new ResultSetValidatingRecordWriter(colNames);
-
-        final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class);
-        runner.addControllerService("parser", parser);
-        runner.enableControllerService(parser);
-        runner.addControllerService("writer", writer);
-        runner.enableControllerService(writer);
-
-        runner.setProperty(REL_NAME, "select *, greeting AS FAV_GREETING from FLOWFILE");
-        runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
-        runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
-
-        runner.enqueue("");
-        runner.run();
-
-        runner.assertTransferCount(REL_NAME, 1);
-    }
-
-
-    private static class ResultSetValidatingRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
-        private final List<String> columnNames;
-
-        public ResultSetValidatingRecordWriter(final List<String> colNames) {
-            this.columnNames = new ArrayList<>(colNames);
-        }
-
-        @Override
-        public RecordSetWriter createWriter(ComponentLog logger) {
-            return new RecordSetWriter() {
-                @Override
-                public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
-                    final int colCount = rs.getSchema().getFieldCount();
-                    Assert.assertEquals(columnNames.size(), colCount);
-
-                    final List<String> colNames = new ArrayList<>(colCount);
-                    for (int i = 0; i < colCount; i++) {
-                        colNames.add(rs.getSchema().getField(i).getFieldName());
-                    }
-
-                    Assert.assertEquals(columnNames, colNames);
-
-                    // Iterate over the rest of the records to ensure that we read the entire stream. If we don't
-                    // do this, we won't consume all of the data and as a result we will not close the stream properly
-                    Record record;
-                    while ((record = rs.next()) != null) {
-                        System.out.println(record);
-                    }
-
-                    return WriteResult.of(0, Collections.emptyMap());
-                }
-
-                @Override
-                public String getMimeType() {
-                    return "text/plain";
-                }
-
-                @Override
-                public WriteResult write(Record record, OutputStream out) throws IOException {
-                    return null;
-                }
-            };
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
new file mode 100644
index 0000000..32c3635
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.standard.util.record.MockRecordParser;
+import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestQueryRecord {
+
+    static {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard.SQLTransform", "debug");
+    }
+
+    private static final String REL_NAME = "success";
+
+    @Test
+    public void testSimple() throws InitializationException, IOException, SQLException {
+        final MockRecordParser parser = new MockRecordParser();
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("age", RecordFieldType.INT);
+        parser.addRecord("Tom", 49);
+
+        final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+        final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''");
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+        final int numIterations = 1;
+        for (int i = 0; i < numIterations; i++) {
+            runner.enqueue(new byte[0]);
+        }
+
+        runner.setThreadCount(4);
+        runner.run(2 * numIterations);
+
+        runner.assertTransferCount(REL_NAME, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        System.out.println(new String(out.toByteArray()));
+        out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
+    }
+
+    @Test
+    public void testParseFailure() throws InitializationException, IOException, SQLException {
+        final MockRecordParser parser = new MockRecordParser();
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("age", RecordFieldType.INT);
+        parser.addRecord("Tom", 49);
+
+        final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+        final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''");
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+        final int numIterations = 1;
+        for (int i = 0; i < numIterations; i++) {
+            runner.enqueue(new byte[0]);
+        }
+
+        runner.setThreadCount(4);
+        runner.run(2 * numIterations);
+
+        runner.assertTransferCount(REL_NAME, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        System.out.println(new String(out.toByteArray()));
+        out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
+    }
+
+
+    @Test
+    public void testTransformCalc() throws InitializationException, IOException, SQLException {
+        final MockRecordParser parser = new MockRecordParser();
+        parser.addSchemaField("ID", RecordFieldType.INT);
+        parser.addSchemaField("AMOUNT1", RecordFieldType.FLOAT);
+        parser.addSchemaField("AMOUNT2", RecordFieldType.FLOAT);
+        parser.addSchemaField("AMOUNT3", RecordFieldType.FLOAT);
+
+        parser.addRecord("008", 10.05F, 15.45F, 89.99F);
+        parser.addRecord("100", 20.25F, 25.25F, 45.25F);
+        parser.addRecord("105", 20.05F, 25.05F, 45.05F);
+        parser.addRecord("200", 34.05F, 25.05F, 75.05F);
+
+        final MockRecordWriter writer = new MockRecordWriter("\"NAME\",\"POINTS\"");
+
+        final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(REL_NAME, "select ID, AMOUNT1+AMOUNT2+AMOUNT3 as TOTAL from FLOWFILE where ID=100");
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
+
+        out.assertContentEquals("\"NAME\",\"POINTS\"\n\"100\",\"90.75\"\n");
+    }
+
+    @Test
+    public void testHandlingWithInvalidSchema() throws InitializationException {
+        final MockRecordParser parser = new MockRecordParser();
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("favorite_color", RecordFieldType.STRING);
+        parser.addSchemaField("address", RecordFieldType.STRING);
+        parser.addRecord("Tom", "blue", null);
+        parser.addRecord("Jerry", "red", null);
+
+        final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+        final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
+        runner.enforceReadStreamsClosed(false);
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "false");
+        runner.setProperty("rel1", "select * from FLOWFILE where address IS NOT NULL");
+        runner.setProperty("rel2", "select name, CAST(favorite_color AS DOUBLE) AS num from FLOWFILE");
+        runner.setProperty("rel3", "select * from FLOWFILE where address IS NOT NULL");
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(QueryRecord.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testAggregateFunction() throws InitializationException, IOException {
+        final MockRecordParser parser = new MockRecordParser();
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("points", RecordFieldType.INT);
+        parser.addRecord("Tom", 1);
+        parser.addRecord("Jerry", 2);
+        parser.addRecord("Tom", 99);
+
+        final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\"");
+
+        final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(REL_NAME, "select name, sum(points) as points from FLOWFILE GROUP BY name");
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+        final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS).get(0);
+        flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n");
+    }
+
+    @Test
+    public void testColumnNames() throws InitializationException, IOException {
+        final MockRecordParser parser = new MockRecordParser();
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("points", RecordFieldType.INT);
+        parser.addSchemaField("greeting", RecordFieldType.STRING);
+        parser.addRecord("Tom", 1, "Hello");
+        parser.addRecord("Jerry", 2, "Hi");
+        parser.addRecord("Tom", 99, "Howdy");
+
+        final List<String> colNames = new ArrayList<>();
+        colNames.add("name");
+        colNames.add("points");
+        colNames.add("greeting");
+        colNames.add("FAV_GREETING");
+        final ResultSetValidatingRecordWriter writer = new ResultSetValidatingRecordWriter(colNames);
+
+        final TestRunner runner = TestRunners.newTestRunner(QueryRecord.class);
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(REL_NAME, "select *, greeting AS FAV_GREETING from FLOWFILE");
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+    }
+
+
+    private static class ResultSetValidatingRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
+        private final List<String> columnNames;
+
+        public ResultSetValidatingRecordWriter(final List<String> colNames) {
+            this.columnNames = new ArrayList<>(colNames);
+        }
+
+        @Override
+        public RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream in) {
+            return new RecordSetWriter() {
+                @Override
+                public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
+                    final int colCount = rs.getSchema().getFieldCount();
+                    Assert.assertEquals(columnNames.size(), colCount);
+
+                    final List<String> colNames = new ArrayList<>(colCount);
+                    for (int i = 0; i < colCount; i++) {
+                        colNames.add(rs.getSchema().getField(i).getFieldName());
+                    }
+
+                    Assert.assertEquals(columnNames, colNames);
+
+                    // Iterate over the rest of the records to ensure that we read the entire stream. If we don't
+                    // do this, we won't consume all of the data and as a result we will not close the stream properly
+                    Record record;
+                    while ((record = rs.next()) != null) {
+                        System.out.println(record);
+                    }
+
+                    return WriteResult.of(0, Collections.emptyMap());
+                }
+
+                @Override
+                public String getMimeType() {
+                    return "text/plain";
+                }
+
+                @Override
+                public WriteResult write(Record record, OutputStream out) throws IOException {
+                    return null;
+                }
+            };
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
index 1a39b82..fcf0d10 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
@@ -28,9 +28,10 @@ import java.util.Map;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
@@ -38,7 +39,7 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 
-public class MockRecordParser extends AbstractControllerService implements RowRecordReaderFactory {
+public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
     private final List<Object[]> records = new ArrayList<>();
     private final List<RecordField> fields = new ArrayList<>();
     private final int failAfterN;
@@ -61,7 +62,7 @@ public class MockRecordParser extends AbstractControllerService implements RowRe
     }
 
     @Override
-    public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException {
+    public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
         final Iterator<Object[]> itr = records.iterator();
 
         return new RecordReader() {
@@ -99,9 +100,4 @@ public class MockRecordParser extends AbstractControllerService implements RowRe
             }
         };
     }
-
-    @Override
-    public RecordSchema getSchema(FlowFile flowFile) throws MalformedRecordException, IOException {
-        return null;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
index 0a57b29..1dbfd04 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
@@ -18,10 +18,12 @@
 package org.apache.nifi.processors.standard.util.record;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collections;
 
 import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -49,7 +51,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger) {
+    public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream in) {
         return new RecordSetWriter() {
             @Override
             public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
@@ -69,9 +71,11 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
                         final String val = record.getAsString(fieldName);
                         if (quoteValues) {
                             out.write("\"".getBytes());
-                            out.write(val.getBytes());
+                            if (val != null) {
+                                out.write(val.getBytes());
+                            }
                             out.write("\"".getBytes());
-                        } else {
+                        } else if (val != null) {
                             out.write(val.getBytes());
                         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaField.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaField.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaField.java
new file mode 100644
index 0000000..2fe06f4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaField.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+public enum SchemaField {
+    SCHEMA_TEXT("Schema Text"),
+    SCHEMA_TEXT_FORMAT("Schema Text Format"),
+    SCHEMA_NAME("Schema Name"),
+    SCHEMA_IDENTIFIER("Schema Identifier"),
+    SCHEMA_VERSION("Schema Version");
+
+    private final String description;
+
+    private SchemaField(final String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
new file mode 100644
index 0000000..9a064ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+public class SchemaNotFoundException extends Exception {
+    public SchemaNotFoundException(final String message) {
+        super(message);
+    }
+
+    public SchemaNotFoundException(final String message, final Throwable cause) {
+        super(cause);
+    }
+
+    public SchemaNotFoundException(final Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java
new file mode 100644
index 0000000..7d7268e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+
+/**
+ * <p>
+ * A Controller Service that is responsible for creating a {@link RecordReader}.
+ * </p>
+ */
+public interface RecordReaderFactory extends ControllerService {
+
+    RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
index 2286f3f..e23ad20 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
@@ -17,14 +17,47 @@
 
 package org.apache.nifi.serialization;
 
+import java.io.IOException;
+import java.io.InputStream;
+
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
 
 /**
  * <p>
- * A Controller Service that is responsible for creating a {@link RecordSetWriter}.
+ * A Controller Service that is responsible for creating a {@link RecordSetWriter}. The writer is created
+ * based on a FlowFile and an InputStream for that FlowFile, but it is important to note that this the FlowFile passed
+ * to the {@link #createWriter(ComponentLog, FlowFile, InputStream)} may not be the FlowFile that the Writer will writer to.
+ * Rather, it is the FlowFile and InputStream from which the Writer's Schema should be determined. This is done because most
+ * Processors that make use of Record Writers also make use of Record Readers and the schema for the output is often determined
+ * by either reading the schema from the content of the input FlowFile or from referencing attributes of the
+ * input FlowFile.
+ * </p>
+ *
+ * <p>
+ * PLEASE NOTE: This interface is still considered 'unstable' and may change in a non-backward-compatible
+ * manner between minor or incremental releases of NiFi.
  * </p>
  */
 public interface RecordSetWriterFactory extends ControllerService {
-    RecordSetWriter createWriter(ComponentLog logger);
+
+    /**
+     * <p>
+     * Creates a new RecordSetWriter that is capable of writing record contents to an OutputStream. Note that the
+     * FlowFile and InputStream that are given may well be different than the FlowFile that the writer is intended
+     * to write to. The given FlowFile and InputStream are intended to be used for determining the schema that should
+     * be used when writing records.
+     * </p>
+     *
+     * @param logger the logger to use when logging information. This is passed in, rather than using the logger of the Controller Service
+     *            because it allows messages to be logged for the component that is calling this Controller Service.
+     * @param schemaFlowFile the FlowFile from which the schema should be determined.
+     * @param schemaFlowFileContent the contents of the FlowFile from which to determine the schema
+     * @return a RecordSetWriter that can write record sets to an OutputStream
+     * @throws SchemaNotFoundException if unable to find the schema
+     * @throws IOException if unable to read from the given InputStream
+     */
+    RecordSetWriter createWriter(ComponentLog logger, FlowFile schemaFlowFile, InputStream schemaFlowFileContent) throws SchemaNotFoundException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
deleted file mode 100644
index fbd8a21..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-/**
- * <p>
- * A Controller Service that is responsible for creating a {@link RecordReader}.
- * </p>
- */
-public interface RowRecordReaderFactory extends ControllerService {
-
-    RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException;
-
-    RecordSchema getSchema(FlowFile flowFile) throws MalformedRecordException, IOException;
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
index 246e0af..017aef1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
@@ -29,21 +29,69 @@ import java.util.stream.Collectors;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
 
 public class SimpleRecordSchema implements RecordSchema {
     private final List<RecordField> fields;
     private final Map<String, Integer> fieldIndices;
+    private final boolean textAvailable;
+    private final String text;
+    private final String schemaFormat;
+    private final SchemaIdentifier schemaIdentifier;
 
     public SimpleRecordSchema(final List<RecordField> fields) {
+        this(fields, createText(fields), null, false, SchemaIdentifier.EMPTY);
+    }
+
+    public SimpleRecordSchema(final List<RecordField> fields, final SchemaIdentifier id) {
+        this(fields, createText(fields), null, false, id);
+    }
+
+    public SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final SchemaIdentifier id) {
+        this(fields, text, schemaFormat, true, id);
+    }
+
+    private SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) {
+        this.text = text;
+        this.schemaFormat = schemaFormat;
+        this.schemaIdentifier = id;
+        this.textAvailable = textAvailable;
         this.fields = Collections.unmodifiableList(new ArrayList<>(fields));
         this.fieldIndices = new HashMap<>(fields.size());
 
         int index = 0;
         for (final RecordField field : fields) {
-            fieldIndices.put(field.getFieldName(), index++);
+            Integer previousValue = fieldIndices.put(field.getFieldName(), index);
+            if (previousValue != null) {
+                throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
+            }
+
+            for (final String alias : field.getAliases()) {
+                previousValue = fieldIndices.put(alias, index);
+                if (previousValue != null) {
+                    throw new IllegalArgumentException("Two fields are given with the same name (or alias) of '" + field.getFieldName() + "'");
+                }
+            }
+
+            index++;
+        }
+    }
+
+    @Override
+    public Optional<String> getSchemaText() {
+        if (textAvailable) {
+            return Optional.ofNullable(text);
+        } else {
+            return Optional.empty();
         }
     }
 
+
+    @Override
+    public Optional<String> getSchemaFormat() {
+        return Optional.ofNullable(schemaFormat);
+    }
+
     @Override
     public List<RecordField> getFields() {
         return fields;
@@ -77,6 +125,16 @@ public class SimpleRecordSchema implements RecordSchema {
         return idx.isPresent() ? Optional.of(fields.get(idx.getAsInt()).getDataType()) : Optional.empty();
     }
 
+    @Override
+    public Optional<RecordField> getField(final String fieldName) {
+        final OptionalInt indexOption = getFieldIndex(fieldName);
+        if (indexOption.isPresent()) {
+            return Optional.of(fields.get(indexOption.getAsInt()));
+        }
+
+        return Optional.empty();
+    }
+
     private OptionalInt getFieldIndex(final String fieldName) {
         final Integer index = fieldIndices.get(fieldName);
         return index == null ? OptionalInt.empty() : OptionalInt.of(index);
@@ -103,8 +161,7 @@ public class SimpleRecordSchema implements RecordSchema {
         return 143 + 3 * fields.hashCode();
     }
 
-    @Override
-    public String toString() {
+    private static String createText(final List<RecordField> fields) {
         final StringBuilder sb = new StringBuilder("[");
 
         for (int i = 0; i < fields.size(); i++) {
@@ -123,4 +180,14 @@ public class SimpleRecordSchema implements RecordSchema {
         sb.append("]");
         return sb.toString();
     }
+
+    @Override
+    public String toString() {
+        return text;
+    }
+
+    @Override
+    public SchemaIdentifier getIdentifier() {
+        return schemaIdentifier;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
index b72c107..6ed4bd6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
@@ -17,6 +17,8 @@
 
 package org.apache.nifi.serialization.record;
 
+import java.util.Objects;
+
 public class DataType {
     private final RecordFieldType fieldType;
     private final String format;
@@ -36,7 +38,7 @@ public class DataType {
 
     @Override
     public int hashCode() {
-        return 31 + 41 * fieldType.hashCode() + 41 * (format == null ? 0 : format.hashCode());
+        return 31 + 41 * getFieldType().hashCode() + 41 * (getFormat() == null ? 0 : getFormat().hashCode());
     }
 
     @Override
@@ -52,15 +54,15 @@ public class DataType {
         }
 
         final DataType other = (DataType) obj;
-        return fieldType.equals(other.fieldType) && ((format == null && other.format == null) || (format != null && format.equals(other.format)));
+        return getFieldType().equals(other.getFieldType()) && Objects.equals(getFormat(), other.getFormat());
     }
 
     @Override
     public String toString() {
-        if (format == null) {
-            return fieldType.toString();
+        if (getFormat() == null) {
+            return getFieldType().toString();
         } else {
-            return fieldType.toString() + ":" + format;
+            return getFieldType().toString() + ":" + getFormat();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index 0bbb534..56cf909 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -42,15 +42,86 @@ public class MapRecord implements Record {
     public Object[] getValues() {
         final Object[] values = new Object[schema.getFieldCount()];
         int i = 0;
-        for (final String fieldName : schema.getFieldNames()) {
-            values[i++] = getValue(fieldName);
+        for (final RecordField recordField : schema.getFields()) {
+            values[i++] = getValue(recordField);
         }
         return values;
     }
 
     @Override
     public Object getValue(final String fieldName) {
-        return values.get(fieldName);
+        final Optional<RecordField> fieldOption = schema.getField(fieldName);
+        if (fieldOption.isPresent()) {
+            return getValue(fieldOption.get());
+        }
+
+        return null;
+    }
+
+    @Override
+    public Object getValue(final RecordField field) {
+        Object explicitValue = getExplicitValue(field);
+        if (explicitValue != null) {
+            return explicitValue;
+        }
+
+        final Optional<RecordField> resolvedField = resolveField(field);
+        final boolean resolvedFieldDifferent = resolvedField.isPresent() && !resolvedField.get().equals(field);
+        if (resolvedFieldDifferent) {
+            explicitValue = getExplicitValue(resolvedField.get());
+            if (explicitValue != null) {
+                return explicitValue;
+            }
+        }
+
+        Object defaultValue = field.getDefaultValue();
+        if (defaultValue != null) {
+            return defaultValue;
+        }
+
+        if (resolvedFieldDifferent) {
+            return resolvedField.get().getDefaultValue();
+        }
+
+        return null;
+    }
+
+    private Optional<RecordField> resolveField(final RecordField field) {
+        Optional<RecordField> resolved = schema.getField(field.getFieldName());
+        if (resolved.isPresent()) {
+            return resolved;
+        }
+
+        for (final String alias : field.getAliases()) {
+            resolved = schema.getField(alias);
+            if (resolved.isPresent()) {
+                return resolved;
+            }
+        }
+
+        return Optional.empty();
+    }
+
+    private Object getExplicitValue(final RecordField field) {
+        final String canonicalFieldName = field.getFieldName();
+
+        // We use containsKey here instead of just calling get() and checking for a null value
+        // because if the true field name is set to null, we want to return null, rather than
+        // what the alias points to. Likewise for a specific alias, since aliases are defined
+        // in a List with a specific ordering.
+        Object value = values.get(canonicalFieldName);
+        if (value != null) {
+            return value;
+        }
+
+        for (final String alias : field.getAliases()) {
+            value = values.get(alias);
+            if (value != null) {
+                return value;
+            }
+        }
+
+        return null;
     }
 
     @Override
@@ -68,6 +139,11 @@ public class MapRecord implements Record {
         return convertToString(getValue(fieldName), format);
     }
 
+    @Override
+    public String getAsString(final RecordField field, final String format) {
+        return convertToString(getValue(field), format);
+    }
+
     private String getFormat(final String optionalFormat, final RecordFieldType fieldType) {
         return (optionalFormat == null) ? fieldType.getDefaultFormat() : optionalFormat;
     }
@@ -85,42 +161,42 @@ public class MapRecord implements Record {
 
     @Override
     public Long getAsLong(final String fieldName) {
-        return DataTypeUtils.toLong(getValue(fieldName));
+        return DataTypeUtils.toLong(getValue(fieldName), fieldName);
     }
 
     @Override
     public Integer getAsInt(final String fieldName) {
-        return DataTypeUtils.toInteger(getValue(fieldName));
+        return DataTypeUtils.toInteger(getValue(fieldName), fieldName);
     }
 
     @Override
     public Double getAsDouble(final String fieldName) {
-        return DataTypeUtils.toDouble(getValue(fieldName));
+        return DataTypeUtils.toDouble(getValue(fieldName), fieldName);
     }
 
     @Override
     public Float getAsFloat(final String fieldName) {
-        return DataTypeUtils.toFloat(getValue(fieldName));
+        return DataTypeUtils.toFloat(getValue(fieldName), fieldName);
     }
 
     @Override
     public Record getAsRecord(String fieldName, final RecordSchema schema) {
-        return DataTypeUtils.toRecord(getValue(fieldName), schema);
+        return DataTypeUtils.toRecord(getValue(fieldName), schema, fieldName);
     }
 
     @Override
     public Boolean getAsBoolean(final String fieldName) {
-        return DataTypeUtils.toBoolean(getValue(fieldName));
+        return DataTypeUtils.toBoolean(getValue(fieldName), fieldName);
     }
 
     @Override
     public Date getAsDate(final String fieldName, final String format) {
-        return DataTypeUtils.toDate(getValue(fieldName), format);
+        return DataTypeUtils.toDate(getValue(fieldName), format, fieldName);
     }
 
     @Override
     public Object[] getAsArray(final String fieldName) {
-        return DataTypeUtils.toArray(getValue(fieldName));
+        return DataTypeUtils.toArray(getValue(fieldName), fieldName);
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
index e1d52e9..5e5e7ba 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
@@ -38,10 +38,14 @@ public interface Record {
 
     Object getValue(String fieldName);
 
+    Object getValue(RecordField field);
+
     String getAsString(String fieldName);
 
     String getAsString(String fieldName, String format);
 
+    String getAsString(RecordField field, String format);
+
     Long getAsLong(String fieldName);
 
     Integer getAsInt(String fieldName);

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordField.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordField.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordField.java
index 135ae66..fe3d8e5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordField.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordField.java
@@ -17,29 +17,66 @@
 
 package org.apache.nifi.serialization.record;
 
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
 public class RecordField {
     private final String fieldName;
     private final DataType dataType;
+    private final Set<String> aliases;
+    private final Object defaultValue;
 
     public RecordField(final String fieldName, final DataType dataType) {
-        this.fieldName = fieldName;
-        this.dataType = dataType;
+        this(fieldName, dataType, null, Collections.emptySet());
+    }
+
+    public RecordField(final String fieldName, final DataType dataType, final Object defaultValue) {
+        this(fieldName, dataType, defaultValue, Collections.emptySet());
+    }
+
+    public RecordField(final String fieldName, final DataType dataType, final Set<String> aliases) {
+        this(fieldName, dataType, null, aliases);
+    }
+
+    public RecordField(final String fieldName, final DataType dataType, final Object defaultValue, final Set<String> aliases) {
+        if (defaultValue != null && !DataTypeUtils.isCompatibleDataType(defaultValue, dataType)) {
+            throw new IllegalArgumentException("Cannot set the default value for field [" + fieldName + "] to [" + defaultValue
+                + "] because that is not a valid value for Data Type [" + dataType + "]");
+        }
+
+        this.fieldName = Objects.requireNonNull(fieldName);
+        this.dataType = Objects.requireNonNull(dataType);
+        this.aliases = Collections.unmodifiableSet(Objects.requireNonNull(aliases));
+        this.defaultValue = defaultValue;
     }
 
     public String getFieldName() {
         return fieldName;
     }
 
+    public Set<String> getAliases() {
+        return aliases;
+    }
+
     public DataType getDataType() {
         return dataType;
     }
 
+    public Object getDefaultValue() {
+        return defaultValue;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((dataType == null) ? 0 : dataType.hashCode());
-        result = prime * result + ((fieldName == null) ? 0 : fieldName.hashCode());
+        result = prime * result + dataType.hashCode();
+        result = prime * result + fieldName.hashCode();
+        result = prime * result + aliases.hashCode();
+        result = prime * result + ((defaultValue == null) ? 0 : defaultValue.hashCode());
         return result;
     }
 
@@ -57,7 +94,7 @@ public class RecordField {
         }
 
         RecordField other = (RecordField) obj;
-        return dataType.equals(other.getDataType()) && fieldName.equals(other.getFieldName());
+        return dataType.equals(other.getDataType()) && fieldName.equals(other.getFieldName()) && aliases.equals(other.getAliases()) && Objects.equals(defaultValue, other.defaultValue);
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
index cc83a41..785b8d2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 
 public enum RecordFieldType {
@@ -149,7 +150,7 @@ public enum RecordFieldType {
 
     /**
      * <p>
-     * An array field type. Records should be updated using an {@code Object[]} value for this field. Note that we are explicitly indicating that
+     * An array field type. Fields of this type use a {@code Object[]} value. Note that we are explicitly indicating that
      * Object[] should be used here and not primitive array types. For instance, setting a value of {@code int[]} is not allowed. The DataType for
      * this field should be created using the {@link #getArrayDataType(DataType)} method:
      * </p>
@@ -173,7 +174,34 @@ public enum RecordFieldType {
      * </pre>
      * </code>
      */
-    ARRAY("array", null, new ArrayDataType(null));
+    ARRAY("array", null, new ArrayDataType(null)),
+
+    /**
+     * <p>
+     * A record field type. Fields of this type use a {@code Map<String, Object>} value. A Map DataType should be
+     * created by providing the {@link DataType} for the values:
+     * </p>
+     *
+     * <code>
+     * final DataType recordType = RecordFieldType.MAP.getRecordDataType( RecordFieldType.STRING.getDataType() );
+     * </code>
+     *
+     * <p>
+     * A field of type MAP should always have a {@link MapDataType}, so the following idiom is acceptable for use:
+     * </p>
+     *
+     * <code>
+     * <pre>
+     * final DataType dataType = ...;
+     * if (dataType.getFieldType() == RecordFieldType.MAP) {
+     *     final MapDataType mapDataType = (MapDataType) dataType;
+     *     final DataType valueType = mapDataType.getValueType();
+     *     ...
+     * }
+     * </pre>
+     * </code>
+     */
+    MAP("map", null, new MapDataType(null));
 
 
     private static final Map<String, RecordFieldType> SIMPLE_NAME_MAP = new HashMap<String, RecordFieldType>();
@@ -235,11 +263,11 @@ public enum RecordFieldType {
     }
 
     /**
-     * Returns a Data Type that represents a "RECORD" or "ARRAY" type with the given schema.
+     * Returns a Data Type that represents an "ARRAY" type with the given element type.
      *
      * @param elementType the type of the arrays in the element
-     * @return a DataType that represents a Record or Array with the given schema, or <code>null</code> if this RecordFieldType
-     *         is not the RECORD or ARRAY type.
+     * @return a DataType that represents an Array with the given element type, or <code>null</code> if this RecordFieldType
+     *         is not the ARRAY type.
      */
     public DataType getArrayDataType(final DataType elementType) {
         if (this != ARRAY) {
@@ -287,6 +315,21 @@ public enum RecordFieldType {
         return new ChoiceDataType(list);
     }
 
+    /**
+     * Returns a Data Type that represents a "MAP" type with the given value type.
+     *
+     * @param valueDataType the type of the values in the map
+     * @return a DataType that represents a Map with the given value type, or <code>null</code> if this RecordFieldType
+     *         is not the MAP type.
+     */
+    public DataType getMapDataType(final DataType valueDataType) {
+        if (this != MAP) {
+            return null;
+        }
+
+        return new MapDataType(valueDataType);
+    }
+
 
     public static RecordFieldType of(final String typeString) {
       return SIMPLE_NAME_MAP.get(typeString);

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
index 115fb51..367f2b0 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
@@ -55,4 +55,25 @@ public interface RecordSchema {
      *         <code>null</code> if the schema does not contain a field with the given name
      */
     Optional<DataType> getDataType(String fieldName);
+
+    /**
+     * @return the textual representation of the schema, if one is available
+     */
+    Optional<String> getSchemaText();
+
+    /**
+     * @return the format of the schema text, if schema text is present
+     */
+    Optional<String> getSchemaFormat();
+
+    /**
+     * @param fieldName the name of the field
+     * @return an Optional RecordField for the field with the given name
+     */
+    Optional<RecordField> getField(String fieldName);
+
+    /**
+     * @return the SchemaIdentifier, which provides various attributes for identifying a schema
+     */
+    SchemaIdentifier getIdentifier();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index be064ab..b6daab7 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -157,6 +157,20 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
             case Types.LONGVARBINARY:
             case Types.VARBINARY:
                 return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+            case Types.OTHER:
+                // If we have no records to inspect, we can't really know its schema so we simply use the default data type.
+                if (rs.isAfterLast()) {
+                    return RecordFieldType.RECORD.getDataType();
+                }
+
+                final Object obj = rs.getObject(columnIndex);
+                if (obj == null || !(obj instanceof Record)) {
+                    return RecordFieldType.RECORD.getDataType();
+                }
+
+                final Record record = (Record) obj;
+                final RecordSchema recordSchema = record.getSchema();
+                return RecordFieldType.RECORD.getRecordDataType(recordSchema);
             default:
                 return getFieldType(sqlType).getDataType();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
new file mode 100644
index 0000000..b711952
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+public interface SchemaIdentifier {
+
+    /**
+     * @return the name of the schema, if one has been defined.
+     */
+    Optional<String> getName();
+
+    /**
+     * @return the identifier of the schema, if one has been defined.
+     */
+    OptionalLong getIdentifier();
+
+    /**
+     * @return the version of the schema, if one has been defined.
+     */
+    OptionalInt getVersion();
+
+
+    public static SchemaIdentifier EMPTY = new SchemaIdentifier() {
+        @Override
+        public Optional<String> getName() {
+            return Optional.empty();
+        }
+
+        @Override
+        public OptionalLong getIdentifier() {
+            return OptionalLong.empty();
+        }
+
+        @Override
+        public OptionalInt getVersion() {
+            return OptionalInt.empty();
+        }
+    };
+
+    public static SchemaIdentifier ofName(final String name) {
+        return new SchemaIdentifier() {
+            @Override
+            public Optional<String> getName() {
+                return Optional.ofNullable(name);
+            }
+
+            @Override
+            public OptionalLong getIdentifier() {
+                return OptionalLong.empty();
+            }
+
+            @Override
+            public OptionalInt getVersion() {
+                return OptionalInt.empty();
+            }
+        };
+    }
+
+    public static SchemaIdentifier of(final String name, final long identifier, final int version) {
+        return new SchemaIdentifier() {
+            @Override
+            public Optional<String> getName() {
+                return Optional.ofNullable(name);
+            }
+
+            @Override
+            public OptionalLong getIdentifier() {
+                return OptionalLong.of(identifier);
+            }
+
+            @Override
+            public OptionalInt getVersion() {
+                return OptionalInt.of(version);
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
index f507f23..0c21239 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
@@ -52,7 +52,7 @@ public class ArrayDataType extends DataType {
         if (obj == null) {
             return false;
         }
-        if (!(obj instanceof RecordDataType)) {
+        if (!(obj instanceof ArrayDataType)) {
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
index b74cdcc..038b147 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
@@ -53,7 +53,7 @@ public class ChoiceDataType extends DataType {
         if (obj == null) {
             return false;
         }
-        if (!(obj instanceof RecordDataType)) {
+        if (!(obj instanceof ChoiceDataType)) {
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
new file mode 100644
index 0000000..a85fb5e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import java.util.Objects;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class MapDataType extends DataType {
+    private final DataType valueType;
+
+    public MapDataType(final DataType elementType) {
+        super(RecordFieldType.MAP, null);
+        this.valueType = elementType;
+    }
+
+    public DataType getValueType() {
+        return valueType;
+    }
+
+    @Override
+    public RecordFieldType getFieldType() {
+        return RecordFieldType.MAP;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + 41 * getFieldType().hashCode() + 41 * (valueType == null ? 0 : valueType.hashCode());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof MapDataType)) {
+            return false;
+        }
+
+        final MapDataType other = (MapDataType) obj;
+        return getValueType().equals(other.getValueType()) && Objects.equals(valueType, other.valueType);
+    }
+
+    @Override
+    public String toString() {
+        return "MAP[" + valueType + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/50ea1083/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
index f24d036..006d34c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
@@ -60,4 +60,9 @@ public class RecordDataType extends DataType {
         final RecordDataType other = (RecordDataType) obj;
         return getFieldType().equals(other.getFieldType()) && Objects.equals(childSchema, other.childSchema);
     }
+
+    @Override
+    public String toString() {
+        return RecordFieldType.RECORD.toString();
+    }
 }