You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/01 20:12:06 UTC

[13/17] nifi git commit: NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet - Creating nifi-records-utils to share utility code from record services - Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter - R

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
new file mode 100644
index 0000000..a6b965d
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class MapRecord implements Record {
+    private final RecordSchema schema;
+    private final Map<String, Object> values;
+
+    public MapRecord(final RecordSchema schema, final Map<String, Object> values) {
+        this.schema = Objects.requireNonNull(schema);
+        this.values = Objects.requireNonNull(values);
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public Object[] getValues() {
+        final Object[] values = new Object[schema.getFieldCount()];
+        int i = 0;
+        for (final RecordField recordField : schema.getFields()) {
+            values[i++] = getValue(recordField);
+        }
+        return values;
+    }
+
+    @Override
+    public Object getValue(final String fieldName) {
+        final Optional<RecordField> fieldOption = schema.getField(fieldName);
+        if (fieldOption.isPresent()) {
+            return getValue(fieldOption.get());
+        }
+
+        return null;
+    }
+
+    @Override
+    public Object getValue(final RecordField field) {
+        Object explicitValue = getExplicitValue(field);
+        if (explicitValue != null) {
+            return explicitValue;
+        }
+
+        final Optional<RecordField> resolvedField = resolveField(field);
+        final boolean resolvedFieldDifferent = resolvedField.isPresent() && !resolvedField.get().equals(field);
+        if (resolvedFieldDifferent) {
+            explicitValue = getExplicitValue(resolvedField.get());
+            if (explicitValue != null) {
+                return explicitValue;
+            }
+        }
+
+        Object defaultValue = field.getDefaultValue();
+        if (defaultValue != null) {
+            return defaultValue;
+        }
+
+        if (resolvedFieldDifferent) {
+            return resolvedField.get().getDefaultValue();
+        }
+
+        return null;
+    }
+
+    private Optional<RecordField> resolveField(final RecordField field) {
+        Optional<RecordField> resolved = schema.getField(field.getFieldName());
+        if (resolved.isPresent()) {
+            return resolved;
+        }
+
+        for (final String alias : field.getAliases()) {
+            resolved = schema.getField(alias);
+            if (resolved.isPresent()) {
+                return resolved;
+            }
+        }
+
+        return Optional.empty();
+    }
+
+    private Object getExplicitValue(final RecordField field) {
+        final String canonicalFieldName = field.getFieldName();
+
+        // We use containsKey here instead of just calling get() and checking for a null value
+        // because if the true field name is set to null, we want to return null, rather than
+        // what the alias points to. Likewise for a specific alias, since aliases are defined
+        // in a List with a specific ordering.
+        Object value = values.get(canonicalFieldName);
+        if (value != null) {
+            return value;
+        }
+
+        for (final String alias : field.getAliases()) {
+            value = values.get(alias);
+            if (value != null) {
+                return value;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String getAsString(final String fieldName) {
+        final Optional<DataType> dataTypeOption = schema.getDataType(fieldName);
+        if (!dataTypeOption.isPresent()) {
+            return null;
+        }
+
+        return convertToString(getValue(fieldName), dataTypeOption.get().getFormat());
+    }
+
+    @Override
+    public String getAsString(final String fieldName, final String format) {
+        return convertToString(getValue(fieldName), format);
+    }
+
+    @Override
+    public String getAsString(final RecordField field, final String format) {
+        return convertToString(getValue(field), format);
+    }
+
+    private String getFormat(final String optionalFormat, final RecordFieldType fieldType) {
+        return (optionalFormat == null) ? fieldType.getDefaultFormat() : optionalFormat;
+    }
+
+    private String convertToString(final Object value, final String format) {
+        if (value == null) {
+            return null;
+        }
+
+        final String dateFormat = getFormat(format, RecordFieldType.DATE);
+        final String timestampFormat = getFormat(format, RecordFieldType.TIMESTAMP);
+        final String timeFormat = getFormat(format, RecordFieldType.TIME);
+        return DataTypeUtils.toString(value, dateFormat, timeFormat, timestampFormat);
+    }
+
+    @Override
+    public Long getAsLong(final String fieldName) {
+        return DataTypeUtils.toLong(getValue(fieldName), fieldName);
+    }
+
+    @Override
+    public Integer getAsInt(final String fieldName) {
+        return DataTypeUtils.toInteger(getValue(fieldName), fieldName);
+    }
+
+    @Override
+    public Double getAsDouble(final String fieldName) {
+        return DataTypeUtils.toDouble(getValue(fieldName), fieldName);
+    }
+
+    @Override
+    public Float getAsFloat(final String fieldName) {
+        return DataTypeUtils.toFloat(getValue(fieldName), fieldName);
+    }
+
+    @Override
+    public Record getAsRecord(String fieldName, final RecordSchema schema) {
+        return DataTypeUtils.toRecord(getValue(fieldName), schema, fieldName);
+    }
+
+    @Override
+    public Boolean getAsBoolean(final String fieldName) {
+        return DataTypeUtils.toBoolean(getValue(fieldName), fieldName);
+    }
+
+    @Override
+    public Date getAsDate(final String fieldName, final String format) {
+        return DataTypeUtils.toDate(getValue(fieldName), format, fieldName);
+    }
+
+    @Override
+    public Object[] getAsArray(final String fieldName) {
+        return DataTypeUtils.toArray(getValue(fieldName), fieldName);
+    }
+
+
+    @Override
+    public int hashCode() {
+        return 31 + 41 * values.hashCode() + 7 * schema.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof MapRecord)) {
+            return false;
+        }
+        final MapRecord other = (MapRecord) obj;
+        return schema.equals(other.schema) && values.equals(other.values);
+    }
+
+    @Override
+    public String toString() {
+        return "MapRecord[values=" + values + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java
new file mode 100644
index 0000000..a186611
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.IOException;
+
+public class PushBackRecordSet implements RecordSet {
+    private final RecordSet original;
+    private Record pushback;
+
+    public PushBackRecordSet(final RecordSet original) {
+        this.original = original;
+    }
+
+    @Override
+    public RecordSchema getSchema() throws IOException {
+        return original.getSchema();
+    }
+
+    @Override
+    public Record next() throws IOException {
+        if (pushback != null) {
+            final Record record = pushback;
+            pushback = null;
+            return record;
+        }
+
+        return original.next();
+    }
+
+    public void pushback(final Record record) {
+        if (pushback != null) {
+            throw new IllegalStateException("RecordSet already has a Record pushed back. Cannot push back more than one record at a time.");
+        }
+
+        this.pushback = record;
+    }
+
+    public boolean isAnotherRecord() throws IOException {
+        if (pushback != null) {
+            return true;
+        }
+
+        final Record nextRecord = next();
+        if (nextRecord == null) {
+            return false;
+        }
+
+        pushback(nextRecord);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
new file mode 100644
index 0000000..5e5e7ba
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/Record.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.util.Date;
+
+public interface Record {
+
+    RecordSchema getSchema();
+
+    /**
+     * <p>
+     * Returns a view of the the values of the fields in this Record.
+     * </p>
+     *
+     * <b>NOTE:</b> The array that is returned may be an underlying array that is backing
+     * the contents of the Record. As such, modifying the array in any way may result in
+     * modifying the record.
+     *
+     * @return a view of the values of the fields in this Record
+     */
+    Object[] getValues();
+
+    Object getValue(String fieldName);
+
+    Object getValue(RecordField field);
+
+    String getAsString(String fieldName);
+
+    String getAsString(String fieldName, String format);
+
+    String getAsString(RecordField field, String format);
+
+    Long getAsLong(String fieldName);
+
+    Integer getAsInt(String fieldName);
+
+    Double getAsDouble(String fieldName);
+
+    Float getAsFloat(String fieldName);
+
+    Record getAsRecord(String fieldName, RecordSchema schema);
+
+    Boolean getAsBoolean(String fieldName);
+
+    Date getAsDate(String fieldName, String format);
+
+    Object[] getAsArray(String fieldName);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
new file mode 100644
index 0000000..dc68b01
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+
+public class RecordField {
+    private final String fieldName;
+    private final DataType dataType;
+    private final Set<String> aliases;
+    private final Object defaultValue;
+
+    public RecordField(final String fieldName, final DataType dataType) {
+        this(fieldName, dataType, null, Collections.emptySet());
+    }
+
+    public RecordField(final String fieldName, final DataType dataType, final Object defaultValue) {
+        this(fieldName, dataType, defaultValue, Collections.emptySet());
+    }
+
+    public RecordField(final String fieldName, final DataType dataType, final Set<String> aliases) {
+        this(fieldName, dataType, null, aliases);
+    }
+
+    public RecordField(final String fieldName, final DataType dataType, final Object defaultValue, final Set<String> aliases) {
+        if (defaultValue != null && !DataTypeUtils.isCompatibleDataType(defaultValue, dataType)) {
+            throw new IllegalArgumentException("Cannot set the default value for field [" + fieldName + "] to [" + defaultValue
+                + "] because that is not a valid value for Data Type [" + dataType + "]");
+        }
+
+        this.fieldName = Objects.requireNonNull(fieldName);
+        this.dataType = Objects.requireNonNull(dataType);
+        this.aliases = Collections.unmodifiableSet(Objects.requireNonNull(aliases));
+        this.defaultValue = defaultValue;
+    }
+
+    public String getFieldName() {
+        return fieldName;
+    }
+
+    public Set<String> getAliases() {
+        return aliases;
+    }
+
+    public DataType getDataType() {
+        return dataType;
+    }
+
+    public Object getDefaultValue() {
+        return defaultValue;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + dataType.hashCode();
+        result = prime * result + fieldName.hashCode();
+        result = prime * result + aliases.hashCode();
+        result = prime * result + ((defaultValue == null) ? 0 : defaultValue.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        RecordField other = (RecordField) obj;
+        return dataType.equals(other.getDataType()) && fieldName.equals(other.getFieldName()) && aliases.equals(other.getAliases()) && Objects.equals(defaultValue, other.defaultValue);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
new file mode 100644
index 0000000..2e6b5d7
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordFieldType.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public enum RecordFieldType {
+    /**
+     * A String field type. Fields of this type use a {@code java.lang.String} value.
+     */
+    STRING("string"),
+
+    /**
+     * A boolean field type. Fields of this type use a {@code boolean} value.
+     */
+    BOOLEAN("boolean"),
+
+    /**
+     * A byte field type. Fields of this type use a {@code byte} value.
+     */
+    BYTE("byte"),
+
+    /**
+     * A char field type. Fields of this type use a {@code char} value.
+     */
+    CHAR("char"),
+
+    /**
+     * A short field type. Fields of this type use a {@code short} value.
+     */
+    SHORT("short"),
+
+    /**
+     * An int field type. Fields of this type use an {@code int} value.
+     */
+    INT("int"),
+
+    /**
+     * A bigint field type. Fields of this type use a {@code java.math.BigInteger} value.
+     */
+    BIGINT("bigint"),
+
+    /**
+     * A long field type. Fields of this type use a {@code long} value.
+     */
+    LONG("long"),
+
+    /**
+     * A float field type. Fields of this type use a {@code float} value.
+     */
+    FLOAT("float"),
+
+    /**
+     * A double field type. Fields of this type use a {@code double} value.
+     */
+    DOUBLE("double"),
+
+    /**
+     * A date field type. Fields of this type use a {@code java.sql.Date} value.
+     */
+    DATE("date", "yyyy-MM-dd"),
+
+    /**
+     * A time field type. Fields of this type use a {@code java.sql.Time} value.
+     */
+    TIME("time", "HH:mm:ss"),
+
+    /**
+     * A timestamp field type. Fields of this type use a {@code java.sql.Timestamp} value.
+     */
+    TIMESTAMP("timestamp", "yyyy-MM-dd HH:mm:ss"),
+
+    /**
+     * <p>
+     * A record field type. Fields of this type use a {@code org.apache.nifi.serialization.record.Record} value. A Record DataType should be
+     * created by providing the {@link RecordSchema} for the record:
+     * </p>
+     *
+     * <code>
+     * final DataType recordType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
+     * </code>
+     *
+     * <p>
+     * A field of type RECORD should always have a {@link RecordDataType}, so the following idiom is acceptable for use:
+     * </p>
+     *
+     * <code>
+     * <pre>
+     * final DataType dataType = ...;
+     * if (dataType.getFieldType() == RecordFieldType.RECORD) {
+     *     final RecordDataType recordDataType = (RecordDataType) dataType;
+     *     final RecordSchema childSchema = recordDataType.getChildSchema();
+     *     ...
+     * }
+     * </pre>
+     * </code>
+     */
+    RECORD("record", null, new RecordDataType(null)),
+
+    /**
+     * <p>
+     * A choice field type. A field of type choice can be one of any number of different types, which are defined by the DataType that is used.
+     * For example, if a field should allow either a Long or an Integer, this can be accomplished by using:
+     * </p>
+     *
+     * <code>
+     * final DataType choiceType = RecordFieldType.CHOICE.getChoiceDataType( RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType() );
+     * </code>
+     *
+     * <p>
+     * A field of type CHOICE should always have a {@link ChoiceDataType}, so the following idiom is acceptable for use:
+     * </p>
+     *
+     * <code>
+     * <pre>
+     * final DataType dataType = ...;
+     * if (dataType.getFieldType() == RecordFieldType.CHOICE) {
+     *     final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+     *     final List&lt;DataType&gt; allowableTypes = choiceDataType.getPossibleSubTypes();
+     *     ...
+     * }
+     * </pre>
+     * </code>
+     */
+    CHOICE("choice", null, new ChoiceDataType(Collections.emptyList())),
+
+    /**
+     * <p>
+     * An array field type. Fields of this type use a {@code Object[]} value. Note that we are explicitly indicating that
+     * Object[] should be used here and not primitive array types. For instance, setting a value of {@code int[]} is not allowed. The DataType for
+     * this field should be created using the {@link #getArrayDataType(DataType)} method:
+     * </p>
+     *
+     * <code>
+     * final DataType arrayType = RecordFieldType.ARRAY.getArrayDataType( RecordFieldType.INT.getDataType() );
+     * </code>
+     *
+     * <p>
+     * A field of type ARRAY should always have an {@link ArrayDataType}, so the following idiom is acceptable for use:
+     * </p>
+     *
+     * <code>
+     * <pre>
+     * final DataType dataType = ...;
+     * if (dataType.getFieldType() == RecordFieldType.ARRAY) {
+     *     final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+     *     final DataType elementType = arrayDataType.getElementType();
+     *     ...
+     * }
+     * </pre>
+     * </code>
+     */
+    ARRAY("array", null, new ArrayDataType(null)),
+
+    /**
+     * <p>
+     * A record field type. Fields of this type use a {@code Map<String, Object>} value. A Map DataType should be
+     * created by providing the {@link DataType} for the values:
+     * </p>
+     *
+     * <code>
+     * final DataType recordType = RecordFieldType.MAP.getRecordDataType( RecordFieldType.STRING.getDataType() );
+     * </code>
+     *
+     * <p>
+     * A field of type MAP should always have a {@link MapDataType}, so the following idiom is acceptable for use:
+     * </p>
+     *
+     * <code>
+     * <pre>
+     * final DataType dataType = ...;
+     * if (dataType.getFieldType() == RecordFieldType.MAP) {
+     *     final MapDataType mapDataType = (MapDataType) dataType;
+     *     final DataType valueType = mapDataType.getValueType();
+     *     ...
+     * }
+     * </pre>
+     * </code>
+     */
+    MAP("map", null, new MapDataType(null));
+
+
+    private static final Map<String, RecordFieldType> SIMPLE_NAME_MAP = new HashMap<String, RecordFieldType>();
+
+    static {
+        for (RecordFieldType value : values()) {
+            SIMPLE_NAME_MAP.put(value.simpleName, value);
+        }
+    }
+
+    private final String simpleName;
+    private final String defaultFormat;
+    private final DataType defaultDataType;
+
+    private RecordFieldType(final String simpleName) {
+        this(simpleName, null);
+    }
+
+    private RecordFieldType(final String simpleName, final String defaultFormat) {
+        this.simpleName = simpleName;
+        this.defaultFormat = defaultFormat;
+        this.defaultDataType = new DataType(this, defaultFormat);
+    }
+
+    private RecordFieldType(final String simpleName, final String defaultFormat, final DataType defaultDataType) {
+        this.simpleName = simpleName;
+        this.defaultFormat = defaultFormat;
+        this.defaultDataType = defaultDataType;
+    }
+
+    public String getDefaultFormat() {
+        return defaultFormat;
+    }
+
+    /**
+     * @return the DataType with the default format
+     */
+    public DataType getDataType() {
+        return defaultDataType;
+    }
+
+    public DataType getDataType(final String format) {
+        return new DataType(this, format);
+    }
+
+    /**
+     * Returns a Data Type that represents a "RECORD" or "ARRAY" type with the given schema.
+     *
+     * @param childSchema the Schema for the Record or Array
+     * @return a DataType that represents a Record or Array with the given schema, or <code>null</code> if this RecordFieldType
+     *         is not the RECORD or ARRAY type.
+     */
+    public DataType getRecordDataType(final RecordSchema childSchema) {
+        if (this != RECORD) {
+            return null;
+        }
+
+        return new RecordDataType(childSchema);
+    }
+
+    /**
+     * Returns a Data Type that represents an "ARRAY" type with the given element type.
+     *
+     * @param elementType the type of the arrays in the element
+     * @return a DataType that represents an Array with the given element type, or <code>null</code> if this RecordFieldType
+     *         is not the ARRAY type.
+     */
+    public DataType getArrayDataType(final DataType elementType) {
+        if (this != ARRAY) {
+            return null;
+        }
+
+        return new ArrayDataType(elementType);
+    }
+
+
+    /**
+     * Returns a Data Type that represents a "CHOICE" of multiple possible types. This method is
+     * only applicable for a RecordFieldType of {@link #CHOICE}.
+     *
+     * @param possibleChildTypes the possible types that are allowable
+     * @return a DataType that represents a "CHOICE" of multiple possible types, or <code>null</code> if this RecordFieldType
+     *         is not the CHOICE type.
+     */
+    public DataType getChoiceDataType(final List<DataType> possibleChildTypes) {
+        if (this != CHOICE) {
+            return null;
+        }
+
+        return new ChoiceDataType(possibleChildTypes);
+    }
+
+    /**
+     * Returns a Data Type that represents a "CHOICE" of multiple possible types. This method is
+     * only applicable for a RecordFieldType of {@link #CHOICE}.
+     *
+     * @param possibleChildTypes the possible types that are allowable
+     * @return a DataType that represents a "CHOICE" of multiple possible types, or <code>null</code> if this RecordFieldType
+     *         is not the CHOICE type.
+     */
+    public DataType getChoiceDataType(final DataType... possibleChildTypes) {
+        if (this != CHOICE) {
+            return null;
+        }
+
+        final List<DataType> list = new ArrayList<>(possibleChildTypes.length);
+        for (final DataType type : possibleChildTypes) {
+            list.add(type);
+        }
+
+        return new ChoiceDataType(list);
+    }
+
+    /**
+     * Returns a Data Type that represents a "MAP" type with the given value type.
+     *
+     * @param valueDataType the type of the values in the map
+     * @return a DataType that represents a Map with the given value type, or <code>null</code> if this RecordFieldType
+     *         is not the MAP type.
+     */
+    public DataType getMapDataType(final DataType valueDataType) {
+        if (this != MAP) {
+            return null;
+        }
+
+        return new MapDataType(valueDataType);
+    }
+
+
+    public static RecordFieldType of(final String typeString) {
+      return SIMPLE_NAME_MAP.get(typeString);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
new file mode 100644
index 0000000..367f2b0
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.util.List;
+import java.util.Optional;
+
+public interface RecordSchema {
+    /**
+     * @return the list of fields that are present in the schema
+     */
+    List<RecordField> getFields();
+
+    /**
+     * @return the number of fields in the schema
+     */
+    int getFieldCount();
+
+    /**
+     * @param index the 0-based index of which field to return
+     * @return the index'th field
+     *
+     * @throws IndexOutOfBoundsException if the index is < 0 or >= the number of fields (determined by {@link #getFieldCount()}).
+     */
+    RecordField getField(int index);
+
+    /**
+     * @return the data types of the fields
+     */
+    List<DataType> getDataTypes();
+
+    /**
+     * @return the names of the fields
+     */
+    List<String> getFieldNames();
+
+    /**
+     * @param fieldName the name of the field whose type is desired
+     * @return the RecordFieldType associated with the field that has the given name, or
+     *         <code>null</code> if the schema does not contain a field with the given name
+     */
+    Optional<DataType> getDataType(String fieldName);
+
+    /**
+     * @return the textual representation of the schema, if one is available
+     */
+    Optional<String> getSchemaText();
+
+    /**
+     * @return the format of the schema text, if schema text is present
+     */
+    Optional<String> getSchemaFormat();
+
+    /**
+     * @param fieldName the name of the field
+     * @return an Optional RecordField for the field with the given name
+     */
+    Optional<RecordField> getField(String fieldName);
+
+    /**
+     * @return the SchemaIdentifier, which provides various attributes for identifying a schema
+     */
+    SchemaIdentifier getIdentifier();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSet.java
new file mode 100644
index 0000000..9e67346
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSet.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.IOException;
+
+public interface RecordSet {
+
+    /**
+     * @return the {@link RecordSchema} that applies to the records in this RecordSet
+     */
+    RecordSchema getSchema() throws IOException;
+
+    /**
+     * @return the next {@link Record} in the set or <code>null</code> if there are no more records
+     */
+    Record next() throws IOException;
+
+    /**
+     * Returns a new Record Set that will return no more than {@code maxRecords} records from this
+     * RecordSet. Any Records that are pulled from this newly created RecordSet will also advance
+     * the cursor in this Record Set and vice versa.
+     *
+     * @param maxRecords the maximum number of records to return from the new RecordSet
+     * @return a view of this RecordSet that limits the number of records returned
+     */
+    default RecordSet limit(final int maxRecords) {
+        if (maxRecords < 0) {
+            throw new IllegalArgumentException("Cannot limit number of records to " + maxRecords + ". Limit must be a non-negative integer");
+        }
+
+        final RecordSet original = this;
+        return new RecordSet() {
+            private int count = 0;
+
+            @Override
+            public RecordSchema getSchema() throws IOException {
+                return original.getSchema();
+            }
+
+            @Override
+            public Record next() throws IOException {
+                if (count >= maxRecords) {
+                    return null;
+                }
+
+                final Record record = original.next();
+                if (record != null) {
+                    count++;
+                }
+
+                return record;
+            }
+        };
+    }
+
+    public static RecordSet of(final RecordSchema schema, final Record... records) {
+        return new RecordSet() {
+            private int index = 0;
+
+            @Override
+            public RecordSchema getSchema() {
+                return schema;
+            }
+
+            @Override
+            public Record next() {
+                if (index >= records.length) {
+                    return null;
+                }
+
+                return records[index++];
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
new file mode 100644
index 0000000..b6daab7
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.sql.Array;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResultSetRecordSet implements RecordSet, Closeable {
+    private static final Logger logger = LoggerFactory.getLogger(ResultSetRecordSet.class);
+    private final ResultSet rs;
+    private final RecordSchema schema;
+    private final Set<String> rsColumnNames;
+    private boolean moreRows;
+
+    public ResultSetRecordSet(final ResultSet rs) throws SQLException {
+        this.rs = rs;
+        moreRows = rs.next();
+        this.schema = createSchema(rs);
+
+        rsColumnNames = new HashSet<>();
+        final ResultSetMetaData metadata = rs.getMetaData();
+        for (int i = 0; i < metadata.getColumnCount(); i++) {
+            rsColumnNames.add(metadata.getColumnLabel(i + 1));
+        }
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        return schema;
+    }
+
+    @Override
+    public Record next() throws IOException {
+        try {
+            if (moreRows) {
+                final Record record = createRecord(rs);
+                moreRows = rs.next();
+                return record;
+            } else {
+                return null;
+            }
+        } catch (final SQLException e) {
+            throw new IOException("Could not obtain next record from ResultSet", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            rs.close();
+        } catch (final SQLException e) {
+            logger.error("Failed to close ResultSet", e);
+        }
+    }
+
+    private Record createRecord(final ResultSet rs) throws SQLException {
+        final Map<String, Object> values = new HashMap<>(schema.getFieldCount());
+
+        for (final RecordField field : schema.getFields()) {
+            final String fieldName = field.getFieldName();
+
+            final Object value;
+            if (rsColumnNames.contains(fieldName)) {
+                value = normalizeValue(rs.getObject(fieldName));
+            } else {
+                value = null;
+            }
+
+            values.put(fieldName, value);
+        }
+
+        return new MapRecord(schema, values);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private Object normalizeValue(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof List) {
+            return ((List) value).toArray();
+        }
+
+        return value;
+    }
+
+    private static RecordSchema createSchema(final ResultSet rs) throws SQLException {
+        final ResultSetMetaData metadata = rs.getMetaData();
+        final int numCols = metadata.getColumnCount();
+        final List<RecordField> fields = new ArrayList<>(numCols);
+
+        for (int i = 0; i < numCols; i++) {
+            final int column = i + 1;
+            final int sqlType = metadata.getColumnType(column);
+
+            final DataType dataType = getDataType(sqlType, rs, column);
+            final String fieldName = metadata.getColumnLabel(column);
+            final RecordField field = new RecordField(fieldName, dataType);
+            fields.add(field);
+        }
+
+        return new SimpleRecordSchema(fields);
+    }
+
+    private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException {
+        switch (sqlType) {
+            case Types.ARRAY:
+                // The JDBC API does not allow us to know what the base type of an array is through the metadata.
+                // As a result, we have to obtain the actual Array for this record. Once we have this, we can determine
+                // the base type. However, if the base type is, itself, an array, we will simply return a base type of
+                // String because otherwise, we need the ResultSet for the array itself, and many JDBC Drivers do not
+                // support calling Array.getResultSet() and will throw an Exception if that is not supported.
+                if (rs.isAfterLast()) {
+                    return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+                }
+
+                final Array array = rs.getArray(columnIndex);
+                if (array == null) {
+                    return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+                }
+
+                final DataType baseType = getArrayBaseType(array);
+                return RecordFieldType.ARRAY.getArrayDataType(baseType);
+            case Types.BINARY:
+            case Types.LONGVARBINARY:
+            case Types.VARBINARY:
+                return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+            case Types.OTHER:
+                // If we have no records to inspect, we can't really know its schema so we simply use the default data type.
+                if (rs.isAfterLast()) {
+                    return RecordFieldType.RECORD.getDataType();
+                }
+
+                final Object obj = rs.getObject(columnIndex);
+                if (obj == null || !(obj instanceof Record)) {
+                    return RecordFieldType.RECORD.getDataType();
+                }
+
+                final Record record = (Record) obj;
+                final RecordSchema recordSchema = record.getSchema();
+                return RecordFieldType.RECORD.getRecordDataType(recordSchema);
+            default:
+                return getFieldType(sqlType).getDataType();
+        }
+    }
+
+    private static DataType getArrayBaseType(final Array array) throws SQLException {
+        final Object arrayValue = array.getArray();
+        if (arrayValue == null) {
+            return RecordFieldType.STRING.getDataType();
+        }
+
+        if (arrayValue instanceof byte[]) {
+            return RecordFieldType.BYTE.getDataType();
+        }
+        if (arrayValue instanceof int[]) {
+            return RecordFieldType.INT.getDataType();
+        }
+        if (arrayValue instanceof long[]) {
+            return RecordFieldType.LONG.getDataType();
+        }
+        if (arrayValue instanceof boolean[]) {
+            return RecordFieldType.BOOLEAN.getDataType();
+        }
+        if (arrayValue instanceof short[]) {
+            return RecordFieldType.SHORT.getDataType();
+        }
+        if (arrayValue instanceof byte[]) {
+            return RecordFieldType.BYTE.getDataType();
+        }
+        if (arrayValue instanceof float[]) {
+            return RecordFieldType.FLOAT.getDataType();
+        }
+        if (arrayValue instanceof double[]) {
+            return RecordFieldType.DOUBLE.getDataType();
+        }
+        if (arrayValue instanceof char[]) {
+            return RecordFieldType.CHAR.getDataType();
+        }
+        if (arrayValue instanceof Object[]) {
+            final Object[] values = (Object[]) arrayValue;
+            if (values.length == 0) {
+                return RecordFieldType.STRING.getDataType();
+            }
+
+            Object valueToLookAt = null;
+            for (int i = 0; i < values.length; i++) {
+                valueToLookAt = values[i];
+                if (valueToLookAt != null) {
+                    break;
+                }
+            }
+            if (valueToLookAt == null) {
+                return RecordFieldType.STRING.getDataType();
+            }
+
+            if (valueToLookAt instanceof String) {
+                return RecordFieldType.STRING.getDataType();
+            }
+            if (valueToLookAt instanceof Long) {
+                return RecordFieldType.LONG.getDataType();
+            }
+            if (valueToLookAt instanceof Integer) {
+                return RecordFieldType.INT.getDataType();
+            }
+            if (valueToLookAt instanceof Short) {
+                return RecordFieldType.SHORT.getDataType();
+            }
+            if (valueToLookAt instanceof Byte) {
+                return RecordFieldType.BYTE.getDataType();
+            }
+            if (valueToLookAt instanceof Float) {
+                return RecordFieldType.FLOAT.getDataType();
+            }
+            if (valueToLookAt instanceof Double) {
+                return RecordFieldType.DOUBLE.getDataType();
+            }
+            if (valueToLookAt instanceof Boolean) {
+                return RecordFieldType.BOOLEAN.getDataType();
+            }
+            if (valueToLookAt instanceof Character) {
+                return RecordFieldType.CHAR.getDataType();
+            }
+            if (valueToLookAt instanceof BigInteger) {
+                return RecordFieldType.BIGINT.getDataType();
+            }
+            if (valueToLookAt instanceof Integer) {
+                return RecordFieldType.INT.getDataType();
+            }
+            if (valueToLookAt instanceof java.sql.Time) {
+                return RecordFieldType.TIME.getDataType();
+            }
+            if (valueToLookAt instanceof java.sql.Date) {
+                return RecordFieldType.DATE.getDataType();
+            }
+            if (valueToLookAt instanceof java.sql.Timestamp) {
+                return RecordFieldType.TIMESTAMP.getDataType();
+            }
+            if (valueToLookAt instanceof Record) {
+                return RecordFieldType.RECORD.getDataType();
+            }
+        }
+
+        return RecordFieldType.STRING.getDataType();
+    }
+
+
+    private static RecordFieldType getFieldType(final int sqlType) {
+        switch (sqlType) {
+            case Types.BIGINT:
+            case Types.ROWID:
+                return RecordFieldType.LONG;
+            case Types.BIT:
+            case Types.BOOLEAN:
+                return RecordFieldType.BOOLEAN;
+            case Types.CHAR:
+                return RecordFieldType.CHAR;
+            case Types.DATE:
+                return RecordFieldType.DATE;
+            case Types.DECIMAL:
+            case Types.DOUBLE:
+            case Types.NUMERIC:
+            case Types.REAL:
+                return RecordFieldType.DOUBLE;
+            case Types.FLOAT:
+                return RecordFieldType.FLOAT;
+            case Types.INTEGER:
+                return RecordFieldType.INT;
+            case Types.SMALLINT:
+                return RecordFieldType.SHORT;
+            case Types.TINYINT:
+                return RecordFieldType.BYTE;
+            case Types.LONGNVARCHAR:
+            case Types.LONGVARCHAR:
+            case Types.NCHAR:
+            case Types.NULL:
+            case Types.NVARCHAR:
+            case Types.VARCHAR:
+                return RecordFieldType.STRING;
+            case Types.OTHER:
+            case Types.JAVA_OBJECT:
+                return RecordFieldType.RECORD;
+            case Types.TIME:
+            case Types.TIME_WITH_TIMEZONE:
+                return RecordFieldType.TIME;
+            case Types.TIMESTAMP:
+            case Types.TIMESTAMP_WITH_TIMEZONE:
+                return RecordFieldType.TIMESTAMP;
+        }
+
+        return RecordFieldType.STRING;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
new file mode 100644
index 0000000..d7f5664
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+public interface SchemaIdentifier {
+
+    /**
+     * @return the name of the schema, if one has been defined.
+     */
+    Optional<String> getName();
+
+    /**
+     * @return the identifier of the schema, if one has been defined.
+     */
+    OptionalLong getIdentifier();
+
+    /**
+     * @return the version of the schema, if one has been defined.
+     */
+    OptionalInt getVersion();
+
+
+    public static SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null);
+
+    public static SchemaIdentifier ofName(final String name) {
+        return new StandardSchemaIdentifier(name, null, null);
+    }
+
+    public static SchemaIdentifier of(final String name, final long identifier, final int version) {
+        return new StandardSchemaIdentifier(name, identifier, version);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
new file mode 100644
index 0000000..86db284
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+public class StandardSchemaIdentifier implements SchemaIdentifier {
+    private final Optional<String> name;
+    private final OptionalLong identifier;
+    private final OptionalInt version;
+
+    StandardSchemaIdentifier(final String name, final Long identifier, final Integer version) {
+        this.name = Optional.ofNullable(name);
+        this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);;
+        this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);;
+    }
+
+    @Override
+    public Optional<String> getName() {
+        return name;
+    }
+
+    @Override
+    public OptionalLong getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public OptionalInt getVersion() {
+        return version;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof SchemaIdentifier)) {
+            return false;
+        }
+        final SchemaIdentifier other = (SchemaIdentifier) obj;
+        return getName().equals(other.getName()) && getIdentifier().equals(other.getIdentifier()) && getVersion().equals(other.getVersion());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
new file mode 100644
index 0000000..af5f909
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+public class TypeMismatchException extends RuntimeException {
+    public TypeMismatchException(String message) {
+        super(message);
+    }
+
+    public TypeMismatchException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
new file mode 100644
index 0000000..46dc447
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.Objects;
+
+public class ArrayDataType extends DataType {
+    private final DataType elementType;
+
+    public ArrayDataType(final DataType elementType) {
+        super(RecordFieldType.ARRAY, null);
+        this.elementType = elementType;
+    }
+
+    public DataType getElementType() {
+        return elementType;
+    }
+
+    @Override
+    public RecordFieldType getFieldType() {
+        return RecordFieldType.ARRAY;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + 41 * getFieldType().hashCode() + 41 * (elementType == null ? 0 : elementType.hashCode());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof ArrayDataType)) {
+            return false;
+        }
+
+        final ArrayDataType other = (ArrayDataType) obj;
+        return getFieldType().equals(other.getFieldType()) && Objects.equals(elementType, other.elementType);
+    }
+
+    @Override
+    public String toString() {
+        return "ARRAY[" + elementType + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
new file mode 100644
index 0000000..9fcdf73
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.List;
+import java.util.Objects;
+
+public class ChoiceDataType extends DataType {
+    private final List<DataType> possibleSubTypes;
+
+    public ChoiceDataType(final List<DataType> possibleSubTypes) {
+        super(RecordFieldType.CHOICE, null);
+        this.possibleSubTypes = Objects.requireNonNull(possibleSubTypes);
+    }
+
+    public List<DataType> getPossibleSubTypes() {
+        return possibleSubTypes;
+    }
+
+    @Override
+    public RecordFieldType getFieldType() {
+        return RecordFieldType.CHOICE;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + 41 * getFieldType().hashCode() + 41 * (possibleSubTypes == null ? 0 : possibleSubTypes.hashCode());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof ChoiceDataType)) {
+            return false;
+        }
+
+        final ChoiceDataType other = (ChoiceDataType) obj;
+        return getFieldType().equals(other.getFieldType()) && Objects.equals(possibleSubTypes, other.possibleSubTypes);
+    }
+
+    @Override
+    public String toString() {
+        return "CHOICE" + possibleSubTypes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
new file mode 100644
index 0000000..5ed1c39
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.Objects;
+
+public class MapDataType extends DataType {
+    private final DataType valueType;
+
+    public MapDataType(final DataType elementType) {
+        super(RecordFieldType.MAP, null);
+        this.valueType = elementType;
+    }
+
+    public DataType getValueType() {
+        return valueType;
+    }
+
+    @Override
+    public RecordFieldType getFieldType() {
+        return RecordFieldType.MAP;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + 41 * getFieldType().hashCode() + 41 * (valueType == null ? 0 : valueType.hashCode());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof MapDataType)) {
+            return false;
+        }
+
+        final MapDataType other = (MapDataType) obj;
+        return getValueType().equals(other.getValueType()) && Objects.equals(valueType, other.valueType);
+    }
+
+    @Override
+    public String toString() {
+        return "MAP[" + valueType + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
new file mode 100644
index 0000000..fc6993f
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.type;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.util.Objects;
+
+public class RecordDataType extends DataType {
+    private final RecordSchema childSchema;
+
+    public RecordDataType(final RecordSchema childSchema) {
+        super(RecordFieldType.RECORD, null);
+        this.childSchema = childSchema;
+    }
+
+    @Override
+    public RecordFieldType getFieldType() {
+        return RecordFieldType.RECORD;
+    }
+
+    public RecordSchema getChildSchema() {
+        return childSchema;
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 + 41 * getFieldType().hashCode() + 41 * (childSchema == null ? 0 : childSchema.hashCode());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof RecordDataType)) {
+            return false;
+        }
+
+        final RecordDataType other = (RecordDataType) obj;
+        return getFieldType().equals(other.getFieldType()) && Objects.equals(childSchema, other.childSchema);
+    }
+
+    @Override
+    public String toString() {
+        return RecordFieldType.RECORD.toString();
+    }
+}