You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/11 23:33:40 UTC

[17/19] nifi git commit: NIFI-1280 added support for RecordSchema in SchemaRegistry

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
index 41469ba..8e1c7ed 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
@@ -17,31 +17,22 @@
 package org.apache.nifi.processors.standard;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.standard.util.record.MockRecordParser;
+import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
 import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.RowRecordReaderFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -247,58 +238,14 @@ public class TestQueryFlowFile {
 
                     Assert.assertEquals(columnNames, colNames);
 
-                    return WriteResult.of(0, Collections.emptyMap());
-                }
-
-                @Override
-                public String getMimeType() {
-                    return "text/plain";
-                }
-
-                @Override
-                public WriteResult write(Record record, OutputStream out) throws IOException {
-                    return null;
-                }
-            };
-        }
-
-    }
-
-    private static class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
-        private final String header;
-
-        public MockRecordWriter(final String header) {
-            this.header = header;
-        }
-
-        @Override
-        public RecordSetWriter createWriter(final ComponentLog logger) {
-            return new RecordSetWriter() {
-                @Override
-                public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
-                    out.write(header.getBytes());
-                    out.write("\n".getBytes());
-
-                    int recordCount = 0;
-                    final int numCols = rs.getSchema().getFieldCount();
-                    Record record = null;
+                    // Iterate over the rest of the records to ensure that we read the entire stream. If we don't
+                    // do this, we won't consume all of the data and as a result we will not close the stream properly
+                    Record record;
                     while ((record = rs.next()) != null) {
-                        recordCount++;
-                        int i = 0;
-                        for (final String fieldName : record.getSchema().getFieldNames()) {
-                            final String val = record.getAsString(fieldName);
-                            out.write("\"".getBytes());
-                            out.write(val.getBytes());
-                            out.write("\"".getBytes());
-
-                            if (i++ < numCols - 1) {
-                                out.write(",".getBytes());
-                            }
-                        }
-                        out.write("\n".getBytes());
+                        System.out.println(record);
                     }
 
-                    return WriteResult.of(recordCount, Collections.emptyMap());
+                    return WriteResult.of(0, Collections.emptyMap());
                 }
 
                 @Override
@@ -312,68 +259,7 @@ public class TestQueryFlowFile {
                 }
             };
         }
-    }
-
-    private static class MockRecordParser extends AbstractControllerService implements RowRecordReaderFactory {
-        private final List<Object[]> records = new ArrayList<>();
-        private final List<RecordField> fields = new ArrayList<>();
-        private final int failAfterN;
 
-        public MockRecordParser() {
-            this(-1);
-        }
-
-        public MockRecordParser(final int failAfterN) {
-            this.failAfterN = failAfterN;
-        }
-
-
-        public void addSchemaField(final String fieldName, final RecordFieldType type) {
-            fields.add(new RecordField(fieldName, type.getDataType()));
-        }
-
-        public void addRecord(Object... values) {
-            records.add(values);
-        }
-
-        @Override
-        public RecordReader createRecordReader(InputStream in, ComponentLog logger) throws IOException {
-            final Iterator<Object[]> itr = records.iterator();
-
-            return new RecordReader() {
-                private int recordCount = 0;
-
-                @Override
-                public void close() throws IOException {
-                }
-
-                @Override
-                public Record nextRecord() throws IOException, MalformedRecordException {
-                    if (failAfterN >= recordCount) {
-                        throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
-                    }
-                    recordCount++;
-
-                    if (!itr.hasNext()) {
-                        return null;
-                    }
-
-                    final Object[] values = itr.next();
-                    final Map<String, Object> valueMap = new HashMap<>();
-                    int i = 0;
-                    for (final RecordField field : fields) {
-                        final String fieldName = field.getFieldName();
-                        valueMap.put(fieldName, values[i++]);
-                    }
-
-                    return new MapRecord(new SimpleRecordSchema(fields), valueMap);
-                }
-
-                @Override
-                public RecordSchema getSchema() {
-                    return new SimpleRecordSchema(fields);
-                }
-            };
-        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
new file mode 100644
index 0000000..1a39b82
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util.record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class MockRecordParser extends AbstractControllerService implements RowRecordReaderFactory {
+    private final List<Object[]> records = new ArrayList<>();
+    private final List<RecordField> fields = new ArrayList<>();
+    private final int failAfterN;
+
+    public MockRecordParser() {
+        this(-1);
+    }
+
+    public MockRecordParser(final int failAfterN) {
+        this.failAfterN = failAfterN;
+    }
+
+
+    public void addSchemaField(final String fieldName, final RecordFieldType type) {
+        fields.add(new RecordField(fieldName, type.getDataType()));
+    }
+
+    public void addRecord(Object... values) {
+        records.add(values);
+    }
+
+    @Override
+    public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException {
+        final Iterator<Object[]> itr = records.iterator();
+
+        return new RecordReader() {
+            private int recordCount = 0;
+
+            @Override
+            public void close() throws IOException {
+            }
+
+            @Override
+            public Record nextRecord() throws IOException, MalformedRecordException {
+                if (failAfterN >= recordCount) {
+                    throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
+                }
+                recordCount++;
+
+                if (!itr.hasNext()) {
+                    return null;
+                }
+
+                final Object[] values = itr.next();
+                final Map<String, Object> valueMap = new HashMap<>();
+                int i = 0;
+                for (final RecordField field : fields) {
+                    final String fieldName = field.getFieldName();
+                    valueMap.put(fieldName, values[i++]);
+                }
+
+                return new MapRecord(new SimpleRecordSchema(fields), valueMap);
+            }
+
+            @Override
+            public RecordSchema getSchema() {
+                return new SimpleRecordSchema(fields);
+            }
+        };
+    }
+
+    @Override
+    public RecordSchema getSchema(FlowFile flowFile) throws MalformedRecordException, IOException {
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
new file mode 100644
index 0000000..1cf2a28
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard.util.record;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory {
+    private final String header;
+
+    public MockRecordWriter(final String header) {
+        this.header = header;
+    }
+
+    @Override
+    public RecordSetWriter createWriter(final ComponentLog logger) {
+        return new RecordSetWriter() {
+            @Override
+            public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
+                out.write(header.getBytes());
+                out.write("\n".getBytes());
+
+                int recordCount = 0;
+                final int numCols = rs.getSchema().getFieldCount();
+                Record record = null;
+                while ((record = rs.next()) != null) {
+                    recordCount++;
+                    int i = 0;
+                    for (final String fieldName : record.getSchema().getFieldNames()) {
+                        final String val = record.getAsString(fieldName);
+                        out.write("\"".getBytes());
+                        out.write(val.getBytes());
+                        out.write("\"".getBytes());
+
+                        if (i++ < numCols - 1) {
+                            out.write(",".getBytes());
+                        }
+                    }
+                    out.write("\n".getBytes());
+                }
+
+                return WriteResult.of(recordCount, Collections.emptyMap());
+            }
+
+            @Override
+            public String getMimeType() {
+                return "text/plain";
+            }
+
+            @Override
+            public WriteResult write(Record record, OutputStream out) throws IOException {
+                return null;
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml
index d7d5605..78c0381 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/pom.xml
@@ -17,7 +17,7 @@
     <parent>
         <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-standard-services</artifactId>
-        <version>1.1.0-SNAPSHOT</version>
+        <version>1.2.0-SNAPSHOT</version>
     </parent>
     
     <artifactId>nifi-record-serialization-service-api</artifactId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
index a0cfc79..b728498 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
@@ -39,7 +39,6 @@ public interface RecordReader extends Closeable {
     /**
      * Returns the next record in the stream or <code>null</code> if no more records are available.
      *
-     * @param schema the schema to use in order to determine how to interprets the fields in a record
      * @return the next record in the stream or <code>null</code> if no more records are available.
      *
      * @throws IOException if unable to read from the underlying data

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java
index eef8d82..aa298d9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordWriter.java
@@ -26,7 +26,7 @@ public interface RecordWriter {
     /**
      * Writes the given result set to the given output stream
      *
-     * @param recordSet the record set to serialize
+     * @param record the record set to serialize
      * @param out the OutputStream to write to
      * @return the results of writing the data
      * @throws IOException if unable to write to the given OutputStream

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
index 5ef4c7c..fbd8a21 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RowRecordReaderFactory.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.io.InputStream;
 
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.record.RecordSchema;
 
 /**
  * <p>
@@ -29,5 +31,8 @@ import org.apache.nifi.logging.ComponentLog;
  * </p>
  */
 public interface RowRecordReaderFactory extends ControllerService {
-    RecordReader createRecordReader(InputStream in, ComponentLog logger) throws MalformedRecordException, IOException;
+
+    RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException;
+
+    RecordSchema getSchema(FlowFile flowFile) throws MalformedRecordException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
index 0c187f1..b72c107 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/DataType.java
@@ -17,36 +17,15 @@
 
 package org.apache.nifi.serialization.record;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
 public class DataType {
     private final RecordFieldType fieldType;
     private final String format;
 
-    private final RecordSchema childSchema;
-    private final List<DataType> childTypes;
-
-    DataType(final RecordFieldType fieldType, final String format) {
-        this(fieldType, format, (RecordSchema) null);
-    }
-
-    DataType(final RecordFieldType fieldType, final String format, final RecordSchema childSchema) {
+    protected DataType(final RecordFieldType fieldType, final String format) {
         this.fieldType = fieldType;
         this.format = format;
-        this.childSchema = childSchema;
-        this.childTypes = Collections.emptyList();
     }
 
-    DataType(final RecordFieldType fieldType, final String format, final List<DataType> childTypes) {
-        this.fieldType = fieldType;
-        this.format = format;
-        this.childSchema = null;
-        this.childTypes = Collections.unmodifiableList(childTypes);
-    }
-
-
     public String getFormat() {
         return format;
     }
@@ -55,14 +34,6 @@ public class DataType {
         return fieldType;
     }
 
-    public Optional<RecordSchema> getChildRecordSchema() {
-        return Optional.ofNullable(childSchema);
-    }
-
-    public List<DataType> getPossibleTypes() {
-        return childTypes;
-    }
-
     @Override
     public int hashCode() {
         return 31 + 41 * fieldType.hashCode() + 41 * (format == null ? 0 : format.hashCode());

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index f3f9024..0bbb534 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -17,16 +17,13 @@
 
 package org.apache.nifi.serialization.record;
 
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.Date;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
 public class MapRecord implements Record {
     private final RecordSchema schema;
     private final Map<String, Object> values;
@@ -80,220 +77,52 @@ public class MapRecord implements Record {
             return null;
         }
 
-        if (value instanceof java.sql.Date) {
-            java.sql.Date date = (java.sql.Date) value;
-            final long time = date.getTime();
-            return new SimpleDateFormat(getFormat(format, RecordFieldType.DATE)).format(new java.util.Date(time));
-        }
-        if (value instanceof java.util.Date) {
-            return new SimpleDateFormat(getFormat(format, RecordFieldType.DATE)).format((java.util.Date) value);
-        }
-        if (value instanceof Timestamp) {
-            java.sql.Timestamp date = (java.sql.Timestamp) value;
-            final long time = date.getTime();
-            return new SimpleDateFormat(getFormat(format, RecordFieldType.TIMESTAMP)).format(new java.util.Date(time));
-        }
-        if (value instanceof Time) {
-            java.sql.Time date = (java.sql.Time) value;
-            final long time = date.getTime();
-            return new SimpleDateFormat(getFormat(format, RecordFieldType.TIME)).format(new java.util.Date(time));
-        }
-
-        return value.toString();
+        final String dateFormat = getFormat(format, RecordFieldType.DATE);
+        final String timestampFormat = getFormat(format, RecordFieldType.TIMESTAMP);
+        final String timeFormat = getFormat(format, RecordFieldType.TIME);
+        return DataTypeUtils.toString(value, dateFormat, timeFormat, timestampFormat);
     }
 
     @Override
     public Long getAsLong(final String fieldName) {
-        return convertToLong(getValue(fieldName), fieldName);
-    }
-
-    private Long convertToLong(final Object value, final Object fieldDesc) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Number) {
-            return ((Number) value).longValue();
-        }
-        if (value instanceof String) {
-            return Long.parseLong((String) value);
-        }
-        if (value instanceof Date) {
-            return ((Date) value).getTime();
-        }
-
-        throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Long for field " + fieldDesc);
+        return DataTypeUtils.toLong(getValue(fieldName));
     }
 
     @Override
     public Integer getAsInt(final String fieldName) {
-        return convertToInt(getValue(fieldName), fieldName);
-    }
-
-    private Integer convertToInt(final Object value, final Object fieldDesc) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Number) {
-            return ((Number) value).intValue();
-        }
-        if (value instanceof String) {
-            return Integer.parseInt((String) value);
-        }
-
-        throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Integer for field " + fieldDesc);
+        return DataTypeUtils.toInteger(getValue(fieldName));
     }
 
-
     @Override
     public Double getAsDouble(final String fieldName) {
-        return convertToDouble(getValue(fieldName), fieldName);
-    }
-
-    private Double convertToDouble(final Object value, final Object fieldDesc) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Number) {
-            return ((Number) value).doubleValue();
-        }
-        if (value instanceof String) {
-            return Double.parseDouble((String) value);
-        }
-
-        throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Double for field " + fieldDesc);
+        return DataTypeUtils.toDouble(getValue(fieldName));
     }
 
     @Override
     public Float getAsFloat(final String fieldName) {
-        return convertToFloat(getValue(fieldName), fieldName);
-    }
-
-    private Float convertToFloat(final Object value, final Object fieldDesc) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Number) {
-            return ((Number) value).floatValue();
-        }
-        if (value instanceof String) {
-            return Float.parseFloat((String) value);
-        }
-
-        throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Float for field " + fieldDesc);
+        return DataTypeUtils.toFloat(getValue(fieldName));
     }
 
     @Override
-    public Record getAsRecord(String fieldName) {
-        return convertToRecord(getValue(fieldName), fieldName);
-    }
-
-    private Record convertToRecord(final Object value, final Object fieldDesc) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Record) {
-            return (Record) value;
-        }
-
-        throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Record for field " + fieldDesc);
+    public Record getAsRecord(String fieldName, final RecordSchema schema) {
+        return DataTypeUtils.toRecord(getValue(fieldName), schema);
     }
 
-
     @Override
     public Boolean getAsBoolean(final String fieldName) {
-        return convertToBoolean(getValue(fieldName), fieldName);
-    }
-
-    private Boolean convertToBoolean(final Object value, final Object fieldDesc) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Boolean) {
-            return (Boolean) value;
-        }
-        if (value instanceof String) {
-            final String string = (String) value;
-            if (string.equalsIgnoreCase("true") || string.equalsIgnoreCase("t")) {
-                return Boolean.TRUE;
-            }
-
-            if (string.equalsIgnoreCase("false") || string.equals("f")) {
-                return Boolean.FALSE;
-            }
-
-            throw new TypeMismatchException("Cannot convert String value to Boolean for field " + fieldDesc + " because it is not a valid boolean value");
-        }
-
-        throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Boolean for field " + fieldDesc);
-    }
-
-    @Override
-    public Date getAsDate(final String fieldName) {
-        final Optional<DataType> dataTypeOption = schema.getDataType(fieldName);
-        if (!dataTypeOption.isPresent()) {
-            return null;
-        }
-
-        return convertToDate(getValue(fieldName), fieldName, dataTypeOption.get().getFormat());
+        return DataTypeUtils.toBoolean(getValue(fieldName));
     }
 
     @Override
     public Date getAsDate(final String fieldName, final String format) {
-        return convertToDate(getValue(fieldName), fieldName, format);
-    }
-
-    private Date convertToDate(final Object value, final Object fieldDesc, final String format) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Date) {
-            return (Date) value;
-        }
-        if (value instanceof Number) {
-            final Long time = ((Number) value).longValue();
-            return new Date(time);
-        }
-        if (value instanceof java.sql.Date) {
-            return new Date(((java.sql.Date) value).getTime());
-        }
-        if (value instanceof String) {
-            try {
-                return new SimpleDateFormat(getFormat(format, RecordFieldType.DATE)).parse((String) value);
-            } catch (final ParseException e) {
-                throw new TypeMismatchException("Cannot convert String value to date for field " + fieldDesc + " because it is not in the correct format of: " + format, e);
-            }
-        }
-
-        throw new TypeMismatchException("Cannot convert value of type " + value.getClass() + " to Boolean for field " + fieldDesc);
+        return DataTypeUtils.toDate(getValue(fieldName), format);
     }
 
     @Override
     public Object[] getAsArray(final String fieldName) {
-        return convertToArray(getValue(fieldName));
+        return DataTypeUtils.toArray(getValue(fieldName));
     }
 
-    private Object[] convertToArray(final Object value) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Object[]) {
-            return (Object[]) value;
-        }
-
-        if (value instanceof List) {
-            return ((List<?>) value).toArray();
-        }
-
-        return new Object[] {value};
-    }
 
     @Override
     public int hashCode() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
index ca85741..e1d52e9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/Record.java
@@ -50,12 +50,10 @@ public interface Record {
 
     Float getAsFloat(String fieldName);
 
-    Record getAsRecord(String fieldName);
+    Record getAsRecord(String fieldName, RecordSchema schema);
 
     Boolean getAsBoolean(String fieldName);
 
-    Date getAsDate(String fieldName);
-
     Date getAsDate(String fieldName, String format);
 
     Object[] getAsArray(String fieldName);

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
index 8ad212b..cc83a41 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
@@ -17,35 +17,171 @@
 
 package org.apache.nifi.serialization.record;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
 public enum RecordFieldType {
+    /**
+     * A String field type. Fields of this type use a {@code java.lang.String} value.
+     */
     STRING("string"),
+
+    /**
+     * A boolean field type. Fields of this type use a {@code boolean} value.
+     */
     BOOLEAN("boolean"),
+
+    /**
+     * A byte field type. Fields of this type use a {@code byte} value.
+     */
     BYTE("byte"),
+
+    /**
+     * A char field type. Fields of this type use a {@code char} value.
+     */
     CHAR("char"),
+
+    /**
+     * A short field type. Fields of this type use a {@code short} value.
+     */
     SHORT("short"),
+
+    /**
+     * An int field type. Fields of this type use an {@code int} value.
+     */
     INT("int"),
+
+    /**
+     * A bigint field type. Fields of this type use a {@code java.math.BigInteger} value.
+     */
     BIGINT("bigint"),
+
+    /**
+     * A long field type. Fields of this type use a {@code long} value.
+     */
     LONG("long"),
+
+    /**
+     * A float field type. Fields of this type use a {@code float} value.
+     */
     FLOAT("float"),
+
+    /**
+     * A double field type. Fields of this type use a {@code double} value.
+     */
     DOUBLE("double"),
+
+    /**
+     * A date field type. Fields of this type use a {@code java.sql.Date} value.
+     */
     DATE("date", "yyyy-MM-dd"),
+
+    /**
+     * A time field type. Fields of this type use a {@code java.sql.Time} value.
+     */
     TIME("time", "HH:mm:ss"),
+
+    /**
+     * A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value.
+     */
     TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
-    RECORD("record"),
-    CHOICE("choice"),
-    ARRAY("array");
+
+    /**
+     * <p>
+     * A record field type. Fields of this type use a {@code org.apache.nifi.serialization.record.Record} value. A Record DataType should be
+     * created by providing the {@link RecordSchema} for the record:
+     * </p>
+     *
+     * <code>
+     * final DataType recordType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
+     * </code>
+     *
+     * <p>
+     * A field of type RECORD should always have a {@link RecordDataType}, so the following idiom is acceptable for use:
+     * </p>
+     *
+     * <code>
+     * <pre>
+     * final DataType dataType = ...;
+     * if (dataType.getFieldType() == RecordFieldType.RECORD) {
+     *     final RecordDataType recordDataType = (RecordDataType) dataType;
+     *     final RecordSchema childSchema = recordDataType.getChildSchema();
+     *     ...
+     * }
+     * </pre>
+     * </code>
+     */
+    RECORD("record", null, new RecordDataType(null)),
+
+    /**
+     * <p>
+     * A choice field type. A field of type choice can be one of any number of different types, which are defined by the DataType that is used.
+     * For example, if a field should allow either a Long or an Integer, this can be accomplished by using:
+     * </p>
+     *
+     * <code>
+     * final DataType choiceType = RecordFieldType.CHOICE.getChoiceDataType( RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType() );
+     * </code>
+     *
+     * <p>
+     * A field of type CHOICE should always have a {@link ChoiceDataType}, so the following idiom is acceptable for use:
+     * </p>
+     *
+     * <code>
+     * <pre>
+     * final DataType dataType = ...;
+     * if (dataType.getFieldType() == RecordFieldType.CHOICE) {
+     *     final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+     *     final List&lt;DataType&gt; allowableTypes = choiceDataType.getPossibleSubTypes();
+     *     ...
+     * }
+     * </pre>
+     * </code>
+     */
+    CHOICE("choice", null, new ChoiceDataType(Collections.emptyList())),
+
+    /**
+     * <p>
+     * An array field type. Records should be updated using an {@code Object[]} value for this field. Note that we are explicitly indicating that
+     * Object[] should be used here and not primitive array types. For instance, setting a value of {@code int[]} is not allowed. The DataType for
+     * this field should be created using the {@link #getArrayDataType(DataType)} method:
+     * </p>
+     *
+     * <code>
+     * final DataType arrayType = RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.INT.getDataType() );
+     * </code>
+     *
+     * <p>
+     * A field of type ARRAY should always have an {@link ArrayDataType}, so the following idiom is acceptable for use:
+     * </p>
+     *
+     * <code>
+     * <pre>
+     * final DataType dataType = ...;
+     * if (dataType.getFieldType() == RecordFieldType.ARRAY) {
+     *     final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+     *     final DataType elementType = arrayDataType.getElementType();
+     *     ...
+     * }
+     * </pre>
+     * </code>
+     */
+    ARRAY("array", null, new ArrayDataType(null));
 
 
     private static final Map<String, RecordFieldType> SIMPLE_NAME_MAP = new HashMap<String, RecordFieldType>();
 
     static {
-      for (RecordFieldType value : values()) {
-        SIMPLE_NAME_MAP.put(value.simpleName, value);
-      }
+        for (RecordFieldType value : values()) {
+            SIMPLE_NAME_MAP.put(value.simpleName, value);
+        }
     }
 
     private final String simpleName;
@@ -62,6 +198,12 @@ public enum RecordFieldType {
         this.defaultDataType = new DataType(this, defaultFormat);
     }
 
+    private RecordFieldType(final String simpleName, final String defaultFormat, final DataType defaultDataType) {
+        this.simpleName = simpleName;
+        this.defaultFormat = defaultFormat;
+        this.defaultDataType = defaultDataType;
+    }
+
     public String getDefaultFormat() {
         return defaultFormat;
     }
@@ -78,18 +220,50 @@ public enum RecordFieldType {
     }
 
     /**
-     * Returns a Data Type that represents a "RECORD" type with the given schema.
+     * Returns a Data Type that represents a "RECORD" or "ARRAY" type with the given schema.
      *
-     * @param childSchema the Schema for the Record
-     * @return a DataType that represents a Record with the given schema, or <code>null</code> if this RecordFieldType
-     *         is not the RECORD type.
+     * @param childSchema the Schema for the Record or Array
+     * @return a DataType that represents a Record or Array with the given schema, or <code>null</code> if this RecordFieldType
+     *         is not the RECORD or ARRAY type.
      */
-    public DataType getDataType(final RecordSchema childSchema) {
+    public DataType getRecordDataType(final RecordSchema childSchema) {
         if (this != RECORD) {
             return null;
         }
 
-        return new DataType(this, getDefaultFormat(), childSchema);
+        return new RecordDataType(childSchema);
+    }
+
+    /**
+     * Returns a Data Type that represents a "RECORD" or "ARRAY" type with the given schema.
+     *
+     * @param elementType the type of the arrays in the element
+     * @return a DataType that represents a Record or Array with the given schema, or <code>null</code> if this RecordFieldType
+     *         is not the RECORD or ARRAY type.
+     */
+    public DataType getArrayDataType(final DataType elementType) {
+        if (this != ARRAY) {
+            return null;
+        }
+
+        return new ArrayDataType(elementType);
+    }
+
+
+    /**
+     * Returns a Data Type that represents a "CHOICE" of multiple possible types. This method is
+     * only applicable for a RecordFieldType of {@link #CHOICE}.
+     *
+     * @param possibleChildTypes the possible types that are allowable
+     * @return a DataType that represents a "CHOICE" of multiple possible types, or <code>null</code> if this RecordFieldType
+     *         is not the CHOICE type.
+     */
+    public DataType getChoiceDataType(final List<DataType> possibleChildTypes) {
+        if (this != CHOICE) {
+            return null;
+        }
+
+        return new ChoiceDataType(possibleChildTypes);
     }
 
     /**
@@ -100,14 +274,20 @@ public enum RecordFieldType {
      * @return a DataType that represents a "CHOICE" of multiple possible types, or <code>null</code> if this RecordFieldType
      *         is not the CHOICE type.
      */
-    public DataType getDataType(final List<DataType> possibleChildTypes) {
+    public DataType getChoiceDataType(final DataType... possibleChildTypes) {
         if (this != CHOICE) {
             return null;
         }
 
-        return new DataType(this, getDefaultFormat(), possibleChildTypes);
+        final List<DataType> list = new ArrayList<>(possibleChildTypes.length);
+        for (final DataType type : possibleChildTypes) {
+            list.add(type);
+        }
+
+        return new ChoiceDataType(list);
     }
 
+
     public static RecordFieldType of(final String typeString) {
       return SIMPLE_NAME_MAP.get(typeString);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index e166918..be064ab 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -19,6 +19,8 @@ package org.apache.nifi.serialization.record;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.math.BigInteger;
+import java.sql.Array;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -39,9 +41,11 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
     private final ResultSet rs;
     private final RecordSchema schema;
     private final Set<String> rsColumnNames;
+    private boolean moreRows;
 
     public ResultSetRecordSet(final ResultSet rs) throws SQLException {
         this.rs = rs;
+        moreRows = rs.next();
         this.schema = createSchema(rs);
 
         rsColumnNames = new HashSet<>();
@@ -59,14 +63,16 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
     @Override
     public Record next() throws IOException {
         try {
-            if (rs.next()) {
-                return createRecord(rs);
+            if (moreRows) {
+                final Record record = createRecord(rs);
+                moreRows = rs.next();
+                return record;
+            } else {
+                return null;
             }
         } catch (final SQLException e) {
             throw new IOException("Could not obtain next record from ResultSet", e);
         }
-
-        return null;
     }
 
     @Override
@@ -86,7 +92,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
 
             final Object value;
             if (rsColumnNames.contains(fieldName)) {
-                value = rs.getObject(field.getFieldName());
+                value = normalizeValue(rs.getObject(fieldName));
             } else {
                 value = null;
             }
@@ -97,6 +103,19 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
         return new MapRecord(schema, values);
     }
 
+    @SuppressWarnings("rawtypes")
+    private Object normalizeValue(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof List) {
+            return ((List) value).toArray();
+        }
+
+        return value;
+    }
+
     private static RecordSchema createSchema(final ResultSet rs) throws SQLException {
         final ResultSetMetaData metadata = rs.getMetaData();
         final int numCols = metadata.getColumnCount();
@@ -106,26 +125,149 @@ public class ResultSetRecordSet implements RecordSet, Closeable {
             final int column = i + 1;
             final int sqlType = metadata.getColumnType(column);
 
-            final RecordFieldType fieldType = getFieldType(sqlType);
+            final DataType dataType = getDataType(sqlType, rs, column);
             final String fieldName = metadata.getColumnLabel(column);
-            final RecordField field = new RecordField(fieldName, fieldType.getDataType());
+            final RecordField field = new RecordField(fieldName, dataType);
             fields.add(field);
         }
 
         return new SimpleRecordSchema(fields);
     }
 
-    private static RecordFieldType getFieldType(final int sqlType) {
+    private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException {
         switch (sqlType) {
             case Types.ARRAY:
-                return RecordFieldType.ARRAY;
-            case Types.BIGINT:
-            case Types.ROWID:
-                return RecordFieldType.LONG;
+                // The JDBC API does not allow us to know what the base type of an array is through the metadata.
+                // As a result, we have to obtain the actual Array for this record. Once we have this, we can determine
+                // the base type. However, if the base type is, itself, an array, we will simply return a base type of
+                // String because otherwise, we need the ResultSet for the array itself, and many JDBC Drivers do not
+                // support calling Array.getResultSet() and will throw an Exception if that is not supported.
+                if (rs.isAfterLast()) {
+                    return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+                }
+
+                final Array array = rs.getArray(columnIndex);
+                if (array == null) {
+                    return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+                }
+
+                final DataType baseType = getArrayBaseType(array);
+                return RecordFieldType.ARRAY.getArrayDataType(baseType);
             case Types.BINARY:
             case Types.LONGVARBINARY:
             case Types.VARBINARY:
-                return RecordFieldType.ARRAY;
+                return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+            default:
+                return getFieldType(sqlType).getDataType();
+        }
+    }
+
+    private static DataType getArrayBaseType(final Array array) throws SQLException {
+        final Object arrayValue = array.getArray();
+        if (arrayValue == null) {
+            return RecordFieldType.STRING.getDataType();
+        }
+
+        if (arrayValue instanceof byte[]) {
+            return RecordFieldType.BYTE.getDataType();
+        }
+        if (arrayValue instanceof int[]) {
+            return RecordFieldType.INT.getDataType();
+        }
+        if (arrayValue instanceof long[]) {
+            return RecordFieldType.LONG.getDataType();
+        }
+        if (arrayValue instanceof boolean[]) {
+            return RecordFieldType.BOOLEAN.getDataType();
+        }
+        if (arrayValue instanceof short[]) {
+            return RecordFieldType.SHORT.getDataType();
+        }
+        if (arrayValue instanceof byte[]) {
+            return RecordFieldType.BYTE.getDataType();
+        }
+        if (arrayValue instanceof float[]) {
+            return RecordFieldType.FLOAT.getDataType();
+        }
+        if (arrayValue instanceof double[]) {
+            return RecordFieldType.DOUBLE.getDataType();
+        }
+        if (arrayValue instanceof char[]) {
+            return RecordFieldType.CHAR.getDataType();
+        }
+        if (arrayValue instanceof Object[]) {
+            final Object[] values = (Object[]) arrayValue;
+            if (values.length == 0) {
+                return RecordFieldType.STRING.getDataType();
+            }
+
+            Object valueToLookAt = null;
+            for (int i = 0; i < values.length; i++) {
+                valueToLookAt = values[i];
+                if (valueToLookAt != null) {
+                    break;
+                }
+            }
+            if (valueToLookAt == null) {
+                return RecordFieldType.STRING.getDataType();
+            }
+
+            if (valueToLookAt instanceof String) {
+                return RecordFieldType.STRING.getDataType();
+            }
+            if (valueToLookAt instanceof Long) {
+                return RecordFieldType.LONG.getDataType();
+            }
+            if (valueToLookAt instanceof Integer) {
+                return RecordFieldType.INT.getDataType();
+            }
+            if (valueToLookAt instanceof Short) {
+                return RecordFieldType.SHORT.getDataType();
+            }
+            if (valueToLookAt instanceof Byte) {
+                return RecordFieldType.BYTE.getDataType();
+            }
+            if (valueToLookAt instanceof Float) {
+                return RecordFieldType.FLOAT.getDataType();
+            }
+            if (valueToLookAt instanceof Double) {
+                return RecordFieldType.DOUBLE.getDataType();
+            }
+            if (valueToLookAt instanceof Boolean) {
+                return RecordFieldType.BOOLEAN.getDataType();
+            }
+            if (valueToLookAt instanceof Character) {
+                return RecordFieldType.CHAR.getDataType();
+            }
+            if (valueToLookAt instanceof BigInteger) {
+                return RecordFieldType.BIGINT.getDataType();
+            }
+            if (valueToLookAt instanceof Integer) {
+                return RecordFieldType.INT.getDataType();
+            }
+            if (valueToLookAt instanceof java.sql.Time) {
+                return RecordFieldType.TIME.getDataType();
+            }
+            if (valueToLookAt instanceof java.sql.Date) {
+                return RecordFieldType.DATE.getDataType();
+            }
+            if (valueToLookAt instanceof java.sql.Timestamp) {
+                return RecordFieldType.TIMESTAMP.getDataType();
+            }
+            if (valueToLookAt instanceof Record) {
+                return RecordFieldType.RECORD.getDataType();
+            }
+        }
+
+        return RecordFieldType.STRING.getDataType();
+    }
+
+
+    private static RecordFieldType getFieldType(final int sqlType) {
+        switch (sqlType) {
+            case Types.BIGINT:
+            case Types.ROWID:
+                return RecordFieldType.LONG;
             case Types.BIT:
             case Types.BOOLEAN:
                 return RecordFieldType.BOOLEAN;

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
new file mode 100644
index 0000000..f507f23
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import java.util.Objects;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class ArrayDataType extends DataType {
+    private final DataType elementType;
+
+    public ArrayDataType(final DataType elementType) {
+        super(RecordFieldType.ARRAY, null);
+        this.elementType = elementType;
+    }
+
+    public DataType getElementType() {
+        return elementType;
+    }
+
+    @Override
+    public RecordFieldType getFieldType() {
+        return RecordFieldType.ARRAY;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + 41 * getFieldType().hashCode() + 41 * (elementType == null ? 0 : elementType.hashCode());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof RecordDataType)) {
+            return false;
+        }
+
+        final ArrayDataType other = (ArrayDataType) obj;
+        return getFieldType().equals(other.getFieldType()) && Objects.equals(elementType, other.elementType);
+    }
+
+    @Override
+    public String toString() {
+        return "ARRAY[" + elementType + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
new file mode 100644
index 0000000..b74cdcc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class ChoiceDataType extends DataType {
+    private final List<DataType> possibleSubTypes;
+
+    public ChoiceDataType(final List<DataType> possibleSubTypes) {
+        super(RecordFieldType.CHOICE, null);
+        this.possibleSubTypes = Objects.requireNonNull(possibleSubTypes);
+    }
+
+    public List<DataType> getPossibleSubTypes() {
+        return possibleSubTypes;
+    }
+
+    @Override
+    public RecordFieldType getFieldType() {
+        return RecordFieldType.CHOICE;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + 41 * getFieldType().hashCode() + 41 * (possibleSubTypes == null ? 0 : possibleSubTypes.hashCode());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof RecordDataType)) {
+            return false;
+        }
+
+        final ChoiceDataType other = (ChoiceDataType) obj;
+        return getFieldType().equals(other.getFieldType()) && Objects.equals(possibleSubTypes, other.possibleSubTypes);
+    }
+
+    @Override
+    public String toString() {
+        return "CHOICE" + possibleSubTypes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
new file mode 100644
index 0000000..f24d036
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import java.util.Objects;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class RecordDataType extends DataType {
+    private final RecordSchema childSchema;
+
+    public RecordDataType(final RecordSchema childSchema) {
+        super(RecordFieldType.RECORD, null);
+        this.childSchema = childSchema;
+    }
+
+    @Override
+    public RecordFieldType getFieldType() {
+        return RecordFieldType.RECORD;
+    }
+
+    public RecordSchema getChildSchema() {
+        return childSchema;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + 41 * getFieldType().hashCode() + 41 * (childSchema == null ? 0 : childSchema.hashCode());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof RecordDataType)) {
+            return false;
+        }
+
+        final RecordDataType other = (RecordDataType) obj;
+        return getFieldType().equals(other.getFieldType()) && Objects.equals(childSchema, other.childSchema);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
new file mode 100644
index 0000000..1cdefb8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -0,0 +1,608 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.util;
+
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Consumer;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+public class DataTypeUtils {
+
+    private static final TimeZone gmt = TimeZone.getTimeZone("gmt");
+
+    public static Object convertType(final Object value, final DataType dataType) {
+        return convertType(value, dataType, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
+    }
+
+    public static Object convertType(final Object value, final DataType dataType, final String dateFormat, final String timeFormat, final String timestampFormat) {
+        switch (dataType.getFieldType()) {
+            case BIGINT:
+                return toBigInt(value);
+            case BOOLEAN:
+                return toBoolean(value);
+            case BYTE:
+                return toByte(value);
+            case CHAR:
+                return toCharacter(value);
+            case DATE:
+                return toDate(value, dateFormat);
+            case DOUBLE:
+                return toDouble(value);
+            case FLOAT:
+                return toFloat(value);
+            case INT:
+                return toInteger(value);
+            case LONG:
+                return toLong(value);
+            case SHORT:
+                return toShort(value);
+            case STRING:
+                return toString(value, dateFormat, timeFormat, timestampFormat);
+            case TIME:
+                return toTime(value, timeFormat);
+            case TIMESTAMP:
+                return toTimestamp(value, timestampFormat);
+            case ARRAY:
+                return toArray(value);
+            case RECORD:
+                final RecordDataType recordType = (RecordDataType) dataType;
+                final RecordSchema childSchema = recordType.getChildSchema();
+                return toRecord(value, childSchema);
+            case CHOICE: {
+                if (value == null) {
+                    return null;
+                }
+
+                final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+                final DataType chosenDataType = chooseDataType(value, choiceDataType);
+                if (chosenDataType == null) {
+                    throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass()
+                        + " to any of the following available Sub-Types for a Choice: " + choiceDataType.getPossibleSubTypes());
+                }
+
+                return convertType(value, chosenDataType);
+            }
+        }
+
+        return null;
+    }
+
+
+    public static boolean isCompatibleDataType(final Object value, final DataType dataType) {
+        switch (dataType.getFieldType()) {
+            case ARRAY:
+                return isArrayTypeCompatible(value);
+            case BIGINT:
+                return isBigIntTypeCompatible(value);
+            case BOOLEAN:
+                return isBooleanTypeCompatible(value);
+            case BYTE:
+                return isByteTypeCompatible(value);
+            case CHAR:
+                return isCharacterTypeCompatible(value);
+            case DATE:
+                return isDateTypeCompatible(value, dataType.getFormat());
+            case DOUBLE:
+                return isDoubleTypeCompatible(value);
+            case FLOAT:
+                return isFloatTypeCompatible(value);
+            case INT:
+                return isIntegerTypeCompatible(value);
+            case LONG:
+                return isLongTypeCompatible(value);
+            case RECORD:
+                return isRecordTypeCompatible(value);
+            case SHORT:
+                return isShortTypeCompatible(value);
+            case TIME:
+                return isTimeTypeCompatible(value, dataType.getFormat());
+            case TIMESTAMP:
+                return isTimestampTypeCompatible(value, dataType.getFormat());
+            case STRING:
+                return isStringTypeCompatible(value);
+            case CHOICE: {
+                final DataType chosenDataType = chooseDataType(value, (ChoiceDataType) dataType);
+                return chosenDataType != null;
+            }
+        }
+
+        return false;
+    }
+
+    public static DataType chooseDataType(final Object value, final ChoiceDataType choiceType) {
+        for (final DataType subType : choiceType.getPossibleSubTypes()) {
+            if (isCompatibleDataType(value, subType)) {
+                return subType;
+            }
+        }
+
+        return null;
+    }
+
+    public static Record toRecord(final Object value, final RecordSchema recordSchema) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Record) {
+            return ((Record) value);
+        }
+
+        if (value instanceof Map) {
+            if (recordSchema == null) {
+                throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass()
+                    + " to Record because the value is a Map but no Record Schema was provided");
+            }
+
+            final Map<?, ?> map = (Map<?, ?>) value;
+            final Map<String, Object> coercedValues = new HashMap<>();
+
+            for (final Map.Entry<?, ?> entry : map.entrySet()) {
+                final Object keyValue = entry.getKey();
+                if (keyValue == null) {
+                    continue;
+                }
+
+                final String key = keyValue.toString();
+                final Optional<DataType> desiredTypeOption = recordSchema.getDataType(key);
+                if (!desiredTypeOption.isPresent()) {
+                    continue;
+                }
+
+                final Object rawValue = entry.getValue();
+                final Object coercedValue = convertType(rawValue, desiredTypeOption.get());
+                coercedValues.put(key, coercedValue);
+            }
+
+            return new MapRecord(recordSchema, coercedValues);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Record");
+    }
+
+    public static boolean isRecordTypeCompatible(final Object value) {
+        return value != null && value instanceof Record;
+    }
+
+    public static Object[] toArray(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Object[]) {
+            return (Object[]) value;
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array");
+    }
+
+    public static boolean isArrayTypeCompatible(final Object value) {
+        return value != null && value instanceof Object[];
+    }
+
+    public static String toString(final Object value, final String dateFormat, final String timeFormat, final String timestampFormat) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return (String) value;
+        }
+
+        if (value instanceof java.sql.Date) {
+            return getDateFormat(dateFormat).format((java.util.Date) value);
+        }
+        if (value instanceof java.sql.Time) {
+            return getDateFormat(timeFormat).format((java.util.Date) value);
+        }
+        if (value instanceof java.sql.Timestamp) {
+            return getDateFormat(timestampFormat).format((java.util.Date) value);
+        }
+        if (value instanceof java.util.Date) {
+            return getDateFormat(timestampFormat).format((java.util.Date) value);
+        }
+
+        return value.toString();
+    }
+
+    public static boolean isStringTypeCompatible(final Object value) {
+        return value != null && (value instanceof String || value instanceof java.util.Date);
+    }
+
+    public static java.sql.Date toDate(final Object value, final String format) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Date) {
+            return (Date) value;
+        }
+
+        if (value instanceof Number) {
+            final long longValue = ((Number) value).longValue();
+            return new Date(longValue);
+        }
+
+        if (value instanceof String) {
+            try {
+                final java.util.Date utilDate = getDateFormat(format).parse((String) value);
+                return new Date(utilDate.getTime());
+            } catch (final ParseException e) {
+                throw new IllegalTypeConversionException("Could not convert value [" + value
+                    + "] of type java.lang.String to Date because the value is not in the expected date format: " + format);
+            }
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date");
+    }
+
+    public static boolean isDateTypeCompatible(final Object value, final String format) {
+        if (value == null) {
+            return false;
+        }
+
+        if (value instanceof java.util.Date || value instanceof Number) {
+            return true;
+        }
+
+        if (value instanceof String) {
+            try {
+                getDateFormat(format).parse((String) value);
+                return true;
+            } catch (final ParseException e) {
+                return false;
+            }
+        }
+
+        return false;
+    }
+
+    public static Time toTime(final Object value, final String format) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Time) {
+            return (Time) value;
+        }
+
+        if (value instanceof Number) {
+            final long longValue = ((Number) value).longValue();
+            return new Time(longValue);
+        }
+
+        if (value instanceof String) {
+            try {
+                final java.util.Date utilDate = getDateFormat(format).parse((String) value);
+                return new Time(utilDate.getTime());
+            } catch (final ParseException e) {
+                throw new IllegalTypeConversionException("Could not convert value [" + value
+                    + "] of type java.lang.String to Time because the value is not in the expected date format: " + format);
+            }
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Time");
+    }
+
+    private static DateFormat getDateFormat(final String format) {
+        final DateFormat df = new SimpleDateFormat(format);
+        df.setTimeZone(gmt);
+        return df;
+    }
+
+    public static boolean isTimeTypeCompatible(final Object value, final String format) {
+        return isDateTypeCompatible(value, format);
+    }
+
+    public static Timestamp toTimestamp(final Object value, final String format) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Timestamp) {
+            return (Timestamp) value;
+        }
+
+        if (value instanceof Number) {
+            final long longValue = ((Number) value).longValue();
+            return new Timestamp(longValue);
+        }
+
+        if (value instanceof String) {
+            try {
+                final java.util.Date utilDate = getDateFormat(format).parse((String) value);
+                return new Timestamp(utilDate.getTime());
+            } catch (final ParseException e) {
+                throw new IllegalTypeConversionException("Could not convert value [" + value
+                    + "] of type java.lang.String to Timestamp because the value is not in the expected date format: " + format);
+            }
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Timestamp");
+    }
+
+    public static boolean isTimestampTypeCompatible(final Object value, final String format) {
+        return isDateTypeCompatible(value, format);
+    }
+
+
+    public static BigInteger toBigInt(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof BigInteger) {
+            return (BigInteger) value;
+        }
+        if (value instanceof Long) {
+            return BigInteger.valueOf((Long) value);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigInteger");
+    }
+
+    public static boolean isBigIntTypeCompatible(final Object value) {
+        return value == null && (value instanceof BigInteger || value instanceof Long);
+    }
+
+    public static Boolean toBoolean(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Boolean) {
+            return (Boolean) value;
+        }
+        if (value instanceof String) {
+            final String string = (String) value;
+            if (string.equalsIgnoreCase("true")) {
+                return Boolean.TRUE;
+            } else if (string.equalsIgnoreCase("false")) {
+                return Boolean.FALSE;
+            }
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Boolean");
+    }
+
+    public static boolean isBooleanTypeCompatible(final Object value) {
+        if (value == null) {
+            return false;
+        }
+        if (value instanceof Boolean) {
+            return true;
+        }
+        if (value instanceof String) {
+            final String string = (String) value;
+            return string.equalsIgnoreCase("true") || string.equalsIgnoreCase("false");
+        }
+        return false;
+    }
+
+    public static Double toDouble(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).doubleValue();
+        }
+
+        if (value instanceof String) {
+            return Double.parseDouble((String) value);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Double");
+    }
+
+    public static boolean isDoubleTypeCompatible(final Object value) {
+        return isNumberTypeCompatible(value, s -> Double.parseDouble(s));
+    }
+
+    private static boolean isNumberTypeCompatible(final Object value, final Consumer<String> stringValueVerifier) {
+        if (value == null) {
+            return false;
+        }
+
+        if (value instanceof Number) {
+            return true;
+        }
+
+        if (value instanceof String) {
+            try {
+                stringValueVerifier.accept((String) value);
+                return true;
+            } catch (final NumberFormatException nfe) {
+                return false;
+            }
+        }
+
+        return false;
+    }
+
+    public static Float toFloat(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).floatValue();
+        }
+
+        if (value instanceof String) {
+            return Float.parseFloat((String) value);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Float");
+    }
+
+    public static boolean isFloatTypeCompatible(final Object value) {
+        return isNumberTypeCompatible(value, s -> Float.parseFloat(s));
+    }
+
+    public static Long toLong(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).longValue();
+        }
+
+        if (value instanceof String) {
+            return Long.parseLong((String) value);
+        }
+
+        if (value instanceof java.util.Date) {
+            return ((java.util.Date) value).getTime();
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Long");
+    }
+
+    public static boolean isLongTypeCompatible(final Object value) {
+        if (value == null) {
+            return false;
+        }
+
+        if (value instanceof Number) {
+            return true;
+        }
+
+        if (value instanceof java.util.Date) {
+            return true;
+        }
+
+        if (value instanceof String) {
+            try {
+                Long.parseLong((String) value);
+                return true;
+            } catch (final NumberFormatException nfe) {
+                return false;
+            }
+        }
+
+        return false;
+    }
+
+
+    public static Integer toInteger(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).intValue();
+        }
+
+        if (value instanceof String) {
+            return Integer.parseInt((String) value);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Integer");
+    }
+
+    public static boolean isIntegerTypeCompatible(final Object value) {
+        return isNumberTypeCompatible(value, s -> Integer.parseInt(s));
+    }
+
+
+    public static Short toShort(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).shortValue();
+        }
+
+        if (value instanceof String) {
+            return Short.parseShort((String) value);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Short");
+    }
+
+    public static boolean isShortTypeCompatible(final Object value) {
+        return isNumberTypeCompatible(value, s -> Short.parseShort(s));
+    }
+
+    public static Byte toByte(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).byteValue();
+        }
+
+        if (value instanceof String) {
+            return Byte.parseByte((String) value);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Byte");
+    }
+
+    public static boolean isByteTypeCompatible(final Object value) {
+        return isNumberTypeCompatible(value, s -> Byte.parseByte(s));
+    }
+
+
+    public static Character toCharacter(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Character) {
+            return ((Character) value);
+        }
+
+        if (value instanceof CharSequence) {
+            final CharSequence charSeq = (CharSequence) value;
+            if (charSeq.length() == 0) {
+                throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Character because it has a length of 0");
+            }
+
+            return charSeq.charAt(0);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Character");
+    }
+
+    public static boolean isCharacterTypeCompatible(final Object value) {
+        return value != null && (value instanceof Character || (value instanceof CharSequence && ((CharSequence) value).length() > 0));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
new file mode 100644
index 0000000..38b5d20
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.util;
+
+public class IllegalTypeConversionException extends RuntimeException {
+
+    public IllegalTypeConversionException(final String message) {
+        super(message);
+    }
+
+    public IllegalTypeConversionException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}