You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/01 20:11:56 UTC

[03/17] nifi git commit: NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet - Creating nifi-records-utils to share utility code from record services - Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter - R

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
deleted file mode 100644
index b6daab7..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.sql.Array;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ResultSetRecordSet implements RecordSet, Closeable {
-    private static final Logger logger = LoggerFactory.getLogger(ResultSetRecordSet.class);
-    private final ResultSet rs;
-    private final RecordSchema schema;
-    private final Set<String> rsColumnNames;
-    private boolean moreRows;
-
-    public ResultSetRecordSet(final ResultSet rs) throws SQLException {
-        this.rs = rs;
-        moreRows = rs.next();
-        this.schema = createSchema(rs);
-
-        rsColumnNames = new HashSet<>();
-        final ResultSetMetaData metadata = rs.getMetaData();
-        for (int i = 0; i < metadata.getColumnCount(); i++) {
-            rsColumnNames.add(metadata.getColumnLabel(i + 1));
-        }
-    }
-
-    @Override
-    public RecordSchema getSchema() {
-        return schema;
-    }
-
-    @Override
-    public Record next() throws IOException {
-        try {
-            if (moreRows) {
-                final Record record = createRecord(rs);
-                moreRows = rs.next();
-                return record;
-            } else {
-                return null;
-            }
-        } catch (final SQLException e) {
-            throw new IOException("Could not obtain next record from ResultSet", e);
-        }
-    }
-
-    @Override
-    public void close() {
-        try {
-            rs.close();
-        } catch (final SQLException e) {
-            logger.error("Failed to close ResultSet", e);
-        }
-    }
-
-    private Record createRecord(final ResultSet rs) throws SQLException {
-        final Map<String, Object> values = new HashMap<>(schema.getFieldCount());
-
-        for (final RecordField field : schema.getFields()) {
-            final String fieldName = field.getFieldName();
-
-            final Object value;
-            if (rsColumnNames.contains(fieldName)) {
-                value = normalizeValue(rs.getObject(fieldName));
-            } else {
-                value = null;
-            }
-
-            values.put(fieldName, value);
-        }
-
-        return new MapRecord(schema, values);
-    }
-
-    @SuppressWarnings("rawtypes")
-    private Object normalizeValue(final Object value) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof List) {
-            return ((List) value).toArray();
-        }
-
-        return value;
-    }
-
-    private static RecordSchema createSchema(final ResultSet rs) throws SQLException {
-        final ResultSetMetaData metadata = rs.getMetaData();
-        final int numCols = metadata.getColumnCount();
-        final List<RecordField> fields = new ArrayList<>(numCols);
-
-        for (int i = 0; i < numCols; i++) {
-            final int column = i + 1;
-            final int sqlType = metadata.getColumnType(column);
-
-            final DataType dataType = getDataType(sqlType, rs, column);
-            final String fieldName = metadata.getColumnLabel(column);
-            final RecordField field = new RecordField(fieldName, dataType);
-            fields.add(field);
-        }
-
-        return new SimpleRecordSchema(fields);
-    }
-
-    private static DataType getDataType(final int sqlType, final ResultSet rs, final int columnIndex) throws SQLException {
-        switch (sqlType) {
-            case Types.ARRAY:
-                // The JDBC API does not allow us to know what the base type of an array is through the metadata.
-                // As a result, we have to obtain the actual Array for this record. Once we have this, we can determine
-                // the base type. However, if the base type is, itself, an array, we will simply return a base type of
-                // String because otherwise, we need the ResultSet for the array itself, and many JDBC Drivers do not
-                // support calling Array.getResultSet() and will throw an Exception if that is not supported.
-                if (rs.isAfterLast()) {
-                    return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
-                }
-
-                final Array array = rs.getArray(columnIndex);
-                if (array == null) {
-                    return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
-                }
-
-                final DataType baseType = getArrayBaseType(array);
-                return RecordFieldType.ARRAY.getArrayDataType(baseType);
-            case Types.BINARY:
-            case Types.LONGVARBINARY:
-            case Types.VARBINARY:
-                return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
-            case Types.OTHER:
-                // If we have no records to inspect, we can't really know its schema so we simply use the default data type.
-                if (rs.isAfterLast()) {
-                    return RecordFieldType.RECORD.getDataType();
-                }
-
-                final Object obj = rs.getObject(columnIndex);
-                if (obj == null || !(obj instanceof Record)) {
-                    return RecordFieldType.RECORD.getDataType();
-                }
-
-                final Record record = (Record) obj;
-                final RecordSchema recordSchema = record.getSchema();
-                return RecordFieldType.RECORD.getRecordDataType(recordSchema);
-            default:
-                return getFieldType(sqlType).getDataType();
-        }
-    }
-
-    private static DataType getArrayBaseType(final Array array) throws SQLException {
-        final Object arrayValue = array.getArray();
-        if (arrayValue == null) {
-            return RecordFieldType.STRING.getDataType();
-        }
-
-        if (arrayValue instanceof byte[]) {
-            return RecordFieldType.BYTE.getDataType();
-        }
-        if (arrayValue instanceof int[]) {
-            return RecordFieldType.INT.getDataType();
-        }
-        if (arrayValue instanceof long[]) {
-            return RecordFieldType.LONG.getDataType();
-        }
-        if (arrayValue instanceof boolean[]) {
-            return RecordFieldType.BOOLEAN.getDataType();
-        }
-        if (arrayValue instanceof short[]) {
-            return RecordFieldType.SHORT.getDataType();
-        }
-        if (arrayValue instanceof byte[]) {
-            return RecordFieldType.BYTE.getDataType();
-        }
-        if (arrayValue instanceof float[]) {
-            return RecordFieldType.FLOAT.getDataType();
-        }
-        if (arrayValue instanceof double[]) {
-            return RecordFieldType.DOUBLE.getDataType();
-        }
-        if (arrayValue instanceof char[]) {
-            return RecordFieldType.CHAR.getDataType();
-        }
-        if (arrayValue instanceof Object[]) {
-            final Object[] values = (Object[]) arrayValue;
-            if (values.length == 0) {
-                return RecordFieldType.STRING.getDataType();
-            }
-
-            Object valueToLookAt = null;
-            for (int i = 0; i < values.length; i++) {
-                valueToLookAt = values[i];
-                if (valueToLookAt != null) {
-                    break;
-                }
-            }
-            if (valueToLookAt == null) {
-                return RecordFieldType.STRING.getDataType();
-            }
-
-            if (valueToLookAt instanceof String) {
-                return RecordFieldType.STRING.getDataType();
-            }
-            if (valueToLookAt instanceof Long) {
-                return RecordFieldType.LONG.getDataType();
-            }
-            if (valueToLookAt instanceof Integer) {
-                return RecordFieldType.INT.getDataType();
-            }
-            if (valueToLookAt instanceof Short) {
-                return RecordFieldType.SHORT.getDataType();
-            }
-            if (valueToLookAt instanceof Byte) {
-                return RecordFieldType.BYTE.getDataType();
-            }
-            if (valueToLookAt instanceof Float) {
-                return RecordFieldType.FLOAT.getDataType();
-            }
-            if (valueToLookAt instanceof Double) {
-                return RecordFieldType.DOUBLE.getDataType();
-            }
-            if (valueToLookAt instanceof Boolean) {
-                return RecordFieldType.BOOLEAN.getDataType();
-            }
-            if (valueToLookAt instanceof Character) {
-                return RecordFieldType.CHAR.getDataType();
-            }
-            if (valueToLookAt instanceof BigInteger) {
-                return RecordFieldType.BIGINT.getDataType();
-            }
-            if (valueToLookAt instanceof Integer) {
-                return RecordFieldType.INT.getDataType();
-            }
-            if (valueToLookAt instanceof java.sql.Time) {
-                return RecordFieldType.TIME.getDataType();
-            }
-            if (valueToLookAt instanceof java.sql.Date) {
-                return RecordFieldType.DATE.getDataType();
-            }
-            if (valueToLookAt instanceof java.sql.Timestamp) {
-                return RecordFieldType.TIMESTAMP.getDataType();
-            }
-            if (valueToLookAt instanceof Record) {
-                return RecordFieldType.RECORD.getDataType();
-            }
-        }
-
-        return RecordFieldType.STRING.getDataType();
-    }
-
-
-    private static RecordFieldType getFieldType(final int sqlType) {
-        switch (sqlType) {
-            case Types.BIGINT:
-            case Types.ROWID:
-                return RecordFieldType.LONG;
-            case Types.BIT:
-            case Types.BOOLEAN:
-                return RecordFieldType.BOOLEAN;
-            case Types.CHAR:
-                return RecordFieldType.CHAR;
-            case Types.DATE:
-                return RecordFieldType.DATE;
-            case Types.DECIMAL:
-            case Types.DOUBLE:
-            case Types.NUMERIC:
-            case Types.REAL:
-                return RecordFieldType.DOUBLE;
-            case Types.FLOAT:
-                return RecordFieldType.FLOAT;
-            case Types.INTEGER:
-                return RecordFieldType.INT;
-            case Types.SMALLINT:
-                return RecordFieldType.SHORT;
-            case Types.TINYINT:
-                return RecordFieldType.BYTE;
-            case Types.LONGNVARCHAR:
-            case Types.LONGVARCHAR:
-            case Types.NCHAR:
-            case Types.NULL:
-            case Types.NVARCHAR:
-            case Types.VARCHAR:
-                return RecordFieldType.STRING;
-            case Types.OTHER:
-            case Types.JAVA_OBJECT:
-                return RecordFieldType.RECORD;
-            case Types.TIME:
-            case Types.TIME_WITH_TIMEZONE:
-                return RecordFieldType.TIME;
-            case Types.TIMESTAMP:
-            case Types.TIMESTAMP_WITH_TIMEZONE:
-                return RecordFieldType.TIMESTAMP;
-        }
-
-        return RecordFieldType.STRING;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
deleted file mode 100644
index d7f5664..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record;
-
-import java.util.Optional;
-import java.util.OptionalInt;
-import java.util.OptionalLong;
-
-public interface SchemaIdentifier {
-
-    /**
-     * @return the name of the schema, if one has been defined.
-     */
-    Optional<String> getName();
-
-    /**
-     * @return the identifier of the schema, if one has been defined.
-     */
-    OptionalLong getIdentifier();
-
-    /**
-     * @return the version of the schema, if one has been defined.
-     */
-    OptionalInt getVersion();
-
-
-    public static SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null);
-
-    public static SchemaIdentifier ofName(final String name) {
-        return new StandardSchemaIdentifier(name, null, null);
-    }
-
-    public static SchemaIdentifier of(final String name, final long identifier, final int version) {
-        return new StandardSchemaIdentifier(name, identifier, version);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
deleted file mode 100644
index 86db284..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record;
-
-import java.util.Optional;
-import java.util.OptionalInt;
-import java.util.OptionalLong;
-
-public class StandardSchemaIdentifier implements SchemaIdentifier {
-    private final Optional<String> name;
-    private final OptionalLong identifier;
-    private final OptionalInt version;
-
-    StandardSchemaIdentifier(final String name, final Long identifier, final Integer version) {
-        this.name = Optional.ofNullable(name);
-        this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);;
-        this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);;
-    }
-
-    @Override
-    public Optional<String> getName() {
-        return name;
-    }
-
-    @Override
-    public OptionalLong getIdentifier() {
-        return identifier;
-    }
-
-    @Override
-    public OptionalInt getVersion() {
-        return version;
-    }
-
-    @Override
-    public int hashCode() {
-        return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode();
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == this) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (!(obj instanceof SchemaIdentifier)) {
-            return false;
-        }
-        final SchemaIdentifier other = (SchemaIdentifier) obj;
-        return getName().equals(other.getName()) && getIdentifier().equals(other.getIdentifier()) && getVersion().equals(other.getVersion());
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
deleted file mode 100644
index af5f909..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/TypeMismatchException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record;
-
-public class TypeMismatchException extends RuntimeException {
-    public TypeMismatchException(String message) {
-        super(message);
-    }
-
-    public TypeMismatchException(String message, Throwable cause) {
-        super(message, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
deleted file mode 100644
index 0c21239..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ArrayDataType.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record.type;
-
-import java.util.Objects;
-
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordFieldType;
-
-public class ArrayDataType extends DataType {
-    private final DataType elementType;
-
-    public ArrayDataType(final DataType elementType) {
-        super(RecordFieldType.ARRAY, null);
-        this.elementType = elementType;
-    }
-
-    public DataType getElementType() {
-        return elementType;
-    }
-
-    @Override
-    public RecordFieldType getFieldType() {
-        return RecordFieldType.ARRAY;
-    }
-
-    @Override
-    public int hashCode() {
-        return 31 + 41 * getFieldType().hashCode() + 41 * (elementType == null ? 0 : elementType.hashCode());
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == this) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (!(obj instanceof ArrayDataType)) {
-            return false;
-        }
-
-        final ArrayDataType other = (ArrayDataType) obj;
-        return getFieldType().equals(other.getFieldType()) && Objects.equals(elementType, other.elementType);
-    }
-
-    @Override
-    public String toString() {
-        return "ARRAY[" + elementType + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
deleted file mode 100644
index 038b147..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/ChoiceDataType.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record.type;
-
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordFieldType;
-
-public class ChoiceDataType extends DataType {
-    private final List<DataType> possibleSubTypes;
-
-    public ChoiceDataType(final List<DataType> possibleSubTypes) {
-        super(RecordFieldType.CHOICE, null);
-        this.possibleSubTypes = Objects.requireNonNull(possibleSubTypes);
-    }
-
-    public List<DataType> getPossibleSubTypes() {
-        return possibleSubTypes;
-    }
-
-    @Override
-    public RecordFieldType getFieldType() {
-        return RecordFieldType.CHOICE;
-    }
-
-    @Override
-    public int hashCode() {
-        return 31 + 41 * getFieldType().hashCode() + 41 * (possibleSubTypes == null ? 0 : possibleSubTypes.hashCode());
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == this) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (!(obj instanceof ChoiceDataType)) {
-            return false;
-        }
-
-        final ChoiceDataType other = (ChoiceDataType) obj;
-        return getFieldType().equals(other.getFieldType()) && Objects.equals(possibleSubTypes, other.possibleSubTypes);
-    }
-
-    @Override
-    public String toString() {
-        return "CHOICE" + possibleSubTypes;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
deleted file mode 100644
index a85fb5e..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/MapDataType.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record.type;
-
-import java.util.Objects;
-
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordFieldType;
-
-public class MapDataType extends DataType {
-    private final DataType valueType;
-
-    public MapDataType(final DataType elementType) {
-        super(RecordFieldType.MAP, null);
-        this.valueType = elementType;
-    }
-
-    public DataType getValueType() {
-        return valueType;
-    }
-
-    @Override
-    public RecordFieldType getFieldType() {
-        return RecordFieldType.MAP;
-    }
-
-    @Override
-    public int hashCode() {
-        return 31 + 41 * getFieldType().hashCode() + 41 * (valueType == null ? 0 : valueType.hashCode());
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == this) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (!(obj instanceof MapDataType)) {
-            return false;
-        }
-
-        final MapDataType other = (MapDataType) obj;
-        return getValueType().equals(other.getValueType()) && Objects.equals(valueType, other.valueType);
-    }
-
-    @Override
-    public String toString() {
-        return "MAP[" + valueType + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
deleted file mode 100644
index 006d34c..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/type/RecordDataType.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record.type;
-
-import java.util.Objects;
-
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-public class RecordDataType extends DataType {
-    private final RecordSchema childSchema;
-
-    public RecordDataType(final RecordSchema childSchema) {
-        super(RecordFieldType.RECORD, null);
-        this.childSchema = childSchema;
-    }
-
-    @Override
-    public RecordFieldType getFieldType() {
-        return RecordFieldType.RECORD;
-    }
-
-    public RecordSchema getChildSchema() {
-        return childSchema;
-    }
-
-    @Override
-    public int hashCode() {
-        return 31 + 41 * getFieldType().hashCode() + 41 * (childSchema == null ? 0 : childSchema.hashCode());
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == this) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (!(obj instanceof RecordDataType)) {
-            return false;
-        }
-
-        final RecordDataType other = (RecordDataType) obj;
-        return getFieldType().equals(other.getFieldType()) && Objects.equals(childSchema, other.childSchema);
-    }
-
-    @Override
-    public String toString() {
-        return RecordFieldType.RECORD.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
deleted file mode 100644
index 05b3157..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ /dev/null
@@ -1,670 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record.util;
-
-import java.math.BigInteger;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.TimeZone;
-import java.util.function.Consumer;
-
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.type.ChoiceDataType;
-import org.apache.nifi.serialization.record.type.RecordDataType;
-
-public class DataTypeUtils {
-
-    private static final TimeZone gmt = TimeZone.getTimeZone("gmt");
-
-    public static Object convertType(final Object value, final DataType dataType, final String fieldName) {
-        return convertType(value, dataType, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), fieldName);
-    }
-
-    public static Object convertType(final Object value, final DataType dataType, final String dateFormat, final String timeFormat, final String timestampFormat, final String fieldName) {
-        switch (dataType.getFieldType()) {
-            case BIGINT:
-                return toBigInt(value, fieldName);
-            case BOOLEAN:
-                return toBoolean(value, fieldName);
-            case BYTE:
-                return toByte(value, fieldName);
-            case CHAR:
-                return toCharacter(value, fieldName);
-            case DATE:
-                return toDate(value, dateFormat, fieldName);
-            case DOUBLE:
-                return toDouble(value, fieldName);
-            case FLOAT:
-                return toFloat(value, fieldName);
-            case INT:
-                return toInteger(value, fieldName);
-            case LONG:
-                return toLong(value, fieldName);
-            case SHORT:
-                return toShort(value, fieldName);
-            case STRING:
-                return toString(value, dateFormat, timeFormat, timestampFormat);
-            case TIME:
-                return toTime(value, timeFormat, fieldName);
-            case TIMESTAMP:
-                return toTimestamp(value, timestampFormat, fieldName);
-            case ARRAY:
-                return toArray(value, fieldName);
-            case MAP:
-                return toMap(value, fieldName);
-            case RECORD:
-                final RecordDataType recordType = (RecordDataType) dataType;
-                final RecordSchema childSchema = recordType.getChildSchema();
-                return toRecord(value, childSchema, fieldName);
-            case CHOICE: {
-                if (value == null) {
-                    return null;
-                }
-
-                final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
-                final DataType chosenDataType = chooseDataType(value, choiceDataType);
-                if (chosenDataType == null) {
-                    throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass()
-                        + " for field " + fieldName + " to any of the following available Sub-Types for a Choice: " + choiceDataType.getPossibleSubTypes());
-                }
-
-                return convertType(value, chosenDataType, fieldName);
-            }
-        }
-
-        return null;
-    }
-
-
-    public static boolean isCompatibleDataType(final Object value, final DataType dataType) {
-        switch (dataType.getFieldType()) {
-            case ARRAY:
-                return isArrayTypeCompatible(value);
-            case BIGINT:
-                return isBigIntTypeCompatible(value);
-            case BOOLEAN:
-                return isBooleanTypeCompatible(value);
-            case BYTE:
-                return isByteTypeCompatible(value);
-            case CHAR:
-                return isCharacterTypeCompatible(value);
-            case DATE:
-                return isDateTypeCompatible(value, dataType.getFormat());
-            case DOUBLE:
-                return isDoubleTypeCompatible(value);
-            case FLOAT:
-                return isFloatTypeCompatible(value);
-            case INT:
-                return isIntegerTypeCompatible(value);
-            case LONG:
-                return isLongTypeCompatible(value);
-            case RECORD:
-                return isRecordTypeCompatible(value);
-            case SHORT:
-                return isShortTypeCompatible(value);
-            case TIME:
-                return isTimeTypeCompatible(value, dataType.getFormat());
-            case TIMESTAMP:
-                return isTimestampTypeCompatible(value, dataType.getFormat());
-            case STRING:
-                return isStringTypeCompatible(value);
-            case MAP:
-                return isMapTypeCompatible(value);
-            case CHOICE: {
-                final DataType chosenDataType = chooseDataType(value, (ChoiceDataType) dataType);
-                return chosenDataType != null;
-            }
-        }
-
-        return false;
-    }
-
-    public static DataType chooseDataType(final Object value, final ChoiceDataType choiceType) {
-        for (final DataType subType : choiceType.getPossibleSubTypes()) {
-            if (isCompatibleDataType(value, subType)) {
-                return subType;
-            }
-        }
-
-        return null;
-    }
-
-    public static Record toRecord(final Object value, final RecordSchema recordSchema, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Record) {
-            return ((Record) value);
-        }
-
-        if (value instanceof Map) {
-            if (recordSchema == null) {
-                throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass()
-                    + " to Record for field " + fieldName + " because the value is a Map but no Record Schema was provided");
-            }
-
-            final Map<?, ?> map = (Map<?, ?>) value;
-            final Map<String, Object> coercedValues = new HashMap<>();
-
-            for (final Map.Entry<?, ?> entry : map.entrySet()) {
-                final Object keyValue = entry.getKey();
-                if (keyValue == null) {
-                    continue;
-                }
-
-                final String key = keyValue.toString();
-                final Optional<DataType> desiredTypeOption = recordSchema.getDataType(key);
-                if (!desiredTypeOption.isPresent()) {
-                    continue;
-                }
-
-                final Object rawValue = entry.getValue();
-                final Object coercedValue = convertType(rawValue, desiredTypeOption.get(), fieldName);
-                coercedValues.put(key, coercedValue);
-            }
-
-            return new MapRecord(recordSchema, coercedValues);
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Record for field " + fieldName);
-    }
-
-    public static boolean isRecordTypeCompatible(final Object value) {
-        return value != null && value instanceof Record;
-    }
-
-    public static Object[] toArray(final Object value, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Object[]) {
-            return (Object[]) value;
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName);
-    }
-
-    public static boolean isArrayTypeCompatible(final Object value) {
-        return value != null && value instanceof Object[];
-    }
-
-    @SuppressWarnings("unchecked")
-    public static Map<String, Object> toMap(final Object value, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Map) {
-            final Map<?, ?> original = (Map<?, ?>) value;
-
-            boolean keysAreStrings = true;
-            for (final Object key : original.keySet()) {
-                if (!(key instanceof String)) {
-                    keysAreStrings = false;
-                }
-            }
-
-            if (keysAreStrings) {
-                return (Map<String, Object>) value;
-            }
-
-            final Map<String, Object> transformed = new HashMap<>();
-            for (final Map.Entry<?, ?> entry : original.entrySet()) {
-                final Object key = entry.getKey();
-                if (key == null) {
-                    transformed.put(null, entry.getValue());
-                } else {
-                    transformed.put(key.toString(), entry.getValue());
-                }
-            }
-
-            return transformed;
-        }
-
-        if (value instanceof Record) {
-            final Record record = (Record) value;
-            final RecordSchema recordSchema = record.getSchema();
-            if (recordSchema == null) {
-                throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type Record to Map for field " + fieldName
-                    + " because Record does not have an associated Schema");
-            }
-
-            final Map<String, Object> map = new HashMap<>();
-            for (final String recordFieldName : recordSchema.getFieldNames()) {
-                map.put(recordFieldName, record.getValue(recordFieldName));
-            }
-
-            return map;
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Map for field " + fieldName);
-    }
-
-    public static boolean isMapTypeCompatible(final Object value) {
-        return value != null && value instanceof Map;
-    }
-
-
-    public static String toString(final Object value, final String dateFormat, final String timeFormat, final String timestampFormat) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof String) {
-            return (String) value;
-        }
-
-        if (value instanceof java.sql.Date) {
-            return getDateFormat(dateFormat).format((java.util.Date) value);
-        }
-        if (value instanceof java.sql.Time) {
-            return getDateFormat(timeFormat).format((java.util.Date) value);
-        }
-        if (value instanceof java.sql.Timestamp) {
-            return getDateFormat(timestampFormat).format((java.util.Date) value);
-        }
-        if (value instanceof java.util.Date) {
-            return getDateFormat(timestampFormat).format((java.util.Date) value);
-        }
-
-        return value.toString();
-    }
-
-    public static boolean isStringTypeCompatible(final Object value) {
-        return value != null;
-    }
-
-    public static java.sql.Date toDate(final Object value, final String format, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Date) {
-            return (Date) value;
-        }
-
-        if (value instanceof Number) {
-            final long longValue = ((Number) value).longValue();
-            return new Date(longValue);
-        }
-
-        if (value instanceof String) {
-            try {
-                final java.util.Date utilDate = getDateFormat(format).parse((String) value);
-                return new Date(utilDate.getTime());
-            } catch (final ParseException e) {
-                throw new IllegalTypeConversionException("Could not convert value [" + value
-                    + "] of type java.lang.String to Date because the value is not in the expected date format: " + format + " for field " + fieldName);
-            }
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date for field " + fieldName);
-    }
-
-    public static boolean isDateTypeCompatible(final Object value, final String format) {
-        if (value == null) {
-            return false;
-        }
-
-        if (value instanceof java.util.Date || value instanceof Number) {
-            return true;
-        }
-
-        if (value instanceof String) {
-            try {
-                getDateFormat(format).parse((String) value);
-                return true;
-            } catch (final ParseException e) {
-                return false;
-            }
-        }
-
-        return false;
-    }
-
-    public static Time toTime(final Object value, final String format, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Time) {
-            return (Time) value;
-        }
-
-        if (value instanceof Number) {
-            final long longValue = ((Number) value).longValue();
-            return new Time(longValue);
-        }
-
-        if (value instanceof String) {
-            try {
-                final java.util.Date utilDate = getDateFormat(format).parse((String) value);
-                return new Time(utilDate.getTime());
-            } catch (final ParseException e) {
-                throw new IllegalTypeConversionException("Could not convert value [" + value
-                    + "] of type java.lang.String to Time for field " + fieldName + " because the value is not in the expected date format: " + format);
-            }
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Time for field " + fieldName);
-    }
-
-    private static DateFormat getDateFormat(final String format) {
-        final DateFormat df = new SimpleDateFormat(format);
-        df.setTimeZone(gmt);
-        return df;
-    }
-
-    public static boolean isTimeTypeCompatible(final Object value, final String format) {
-        return isDateTypeCompatible(value, format);
-    }
-
-    public static Timestamp toTimestamp(final Object value, final String format, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Timestamp) {
-            return (Timestamp) value;
-        }
-
-        if (value instanceof Number) {
-            final long longValue = ((Number) value).longValue();
-            return new Timestamp(longValue);
-        }
-
-        if (value instanceof String) {
-            try {
-                final java.util.Date utilDate = getDateFormat(format).parse((String) value);
-                return new Timestamp(utilDate.getTime());
-            } catch (final ParseException e) {
-                throw new IllegalTypeConversionException("Could not convert value [" + value
-                    + "] of type java.lang.String to Timestamp for field " + fieldName + " because the value is not in the expected date format: " + format);
-            }
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Timestamp for field " + fieldName);
-    }
-
-    public static boolean isTimestampTypeCompatible(final Object value, final String format) {
-        return isDateTypeCompatible(value, format);
-    }
-
-
-    public static BigInteger toBigInt(final Object value, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof BigInteger) {
-            return (BigInteger) value;
-        }
-        if (value instanceof Long) {
-            return BigInteger.valueOf((Long) value);
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigInteger for field " + fieldName);
-    }
-
-    public static boolean isBigIntTypeCompatible(final Object value) {
-        return value == null && (value instanceof BigInteger || value instanceof Long);
-    }
-
-    public static Boolean toBoolean(final Object value, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Boolean) {
-            return (Boolean) value;
-        }
-        if (value instanceof String) {
-            final String string = (String) value;
-            if (string.equalsIgnoreCase("true")) {
-                return Boolean.TRUE;
-            } else if (string.equalsIgnoreCase("false")) {
-                return Boolean.FALSE;
-            }
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Boolean for field " + fieldName);
-    }
-
-    public static boolean isBooleanTypeCompatible(final Object value) {
-        if (value == null) {
-            return false;
-        }
-        if (value instanceof Boolean) {
-            return true;
-        }
-        if (value instanceof String) {
-            final String string = (String) value;
-            return string.equalsIgnoreCase("true") || string.equalsIgnoreCase("false");
-        }
-        return false;
-    }
-
-    public static Double toDouble(final Object value, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Number) {
-            return ((Number) value).doubleValue();
-        }
-
-        if (value instanceof String) {
-            return Double.parseDouble((String) value);
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Double for field " + fieldName);
-    }
-
-    public static boolean isDoubleTypeCompatible(final Object value) {
-        return isNumberTypeCompatible(value, s -> Double.parseDouble(s));
-    }
-
-    private static boolean isNumberTypeCompatible(final Object value, final Consumer<String> stringValueVerifier) {
-        if (value == null) {
-            return false;
-        }
-
-        if (value instanceof Number) {
-            return true;
-        }
-
-        if (value instanceof String) {
-            try {
-                stringValueVerifier.accept((String) value);
-                return true;
-            } catch (final NumberFormatException nfe) {
-                return false;
-            }
-        }
-
-        return false;
-    }
-
-    public static Float toFloat(final Object value, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Number) {
-            return ((Number) value).floatValue();
-        }
-
-        if (value instanceof String) {
-            return Float.parseFloat((String) value);
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Float for field " + fieldName);
-    }
-
-    public static boolean isFloatTypeCompatible(final Object value) {
-        return isNumberTypeCompatible(value, s -> Float.parseFloat(s));
-    }
-
-    public static Long toLong(final Object value, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Number) {
-            return ((Number) value).longValue();
-        }
-
-        if (value instanceof String) {
-            return Long.parseLong((String) value);
-        }
-
-        if (value instanceof java.util.Date) {
-            return ((java.util.Date) value).getTime();
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Long for field " + fieldName);
-    }
-
-    public static boolean isLongTypeCompatible(final Object value) {
-        if (value == null) {
-            return false;
-        }
-
-        if (value instanceof Number) {
-            return true;
-        }
-
-        if (value instanceof java.util.Date) {
-            return true;
-        }
-
-        if (value instanceof String) {
-            try {
-                Long.parseLong((String) value);
-                return true;
-            } catch (final NumberFormatException nfe) {
-                return false;
-            }
-        }
-
-        return false;
-    }
-
-
-    public static Integer toInteger(final Object value, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Number) {
-            return ((Number) value).intValue();
-        }
-
-        if (value instanceof String) {
-            return Integer.parseInt((String) value);
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Integer for field " + fieldName);
-    }
-
-    public static boolean isIntegerTypeCompatible(final Object value) {
-        return isNumberTypeCompatible(value, s -> Integer.parseInt(s));
-    }
-
-
-    public static Short toShort(final Object value, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Number) {
-            return ((Number) value).shortValue();
-        }
-
-        if (value instanceof String) {
-            return Short.parseShort((String) value);
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Short for field " + fieldName);
-    }
-
-    public static boolean isShortTypeCompatible(final Object value) {
-        return isNumberTypeCompatible(value, s -> Short.parseShort(s));
-    }
-
-    public static Byte toByte(final Object value, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Number) {
-            return ((Number) value).byteValue();
-        }
-
-        if (value instanceof String) {
-            return Byte.parseByte((String) value);
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Byte for field " + fieldName);
-    }
-
-    public static boolean isByteTypeCompatible(final Object value) {
-        return isNumberTypeCompatible(value, s -> Byte.parseByte(s));
-    }
-
-
-    public static Character toCharacter(final Object value, final String fieldName) {
-        if (value == null) {
-            return null;
-        }
-
-        if (value instanceof Character) {
-            return ((Character) value);
-        }
-
-        if (value instanceof CharSequence) {
-            final CharSequence charSeq = (CharSequence) value;
-            if (charSeq.length() == 0) {
-                throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass()
-                    + " to Character because it has a length of 0 for field " + fieldName);
-            }
-
-            return charSeq.charAt(0);
-        }
-
-        throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Character for field " + fieldName);
-    }
-
-    public static boolean isCharacterTypeCompatible(final Object value) {
-        return value != null && (value instanceof Character || (value instanceof CharSequence && ((CharSequence) value).length() > 0));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
deleted file mode 100644
index 38b5d20..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record.util;
-
-public class IllegalTypeConversionException extends RuntimeException {
-
-    public IllegalTypeConversionException(final String message) {
-        super(message);
-    }
-
-    public IllegalTypeConversionException(final String message, final Throwable cause) {
-        super(message, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
deleted file mode 100644
index 5a61275..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestSimpleRecordSchema {
-
-    @Test
-    public void testPreventsTwoFieldsWithSameAlias() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("foo", "bar")));
-        fields.add(new RecordField("goodbye", RecordFieldType.STRING.getDataType(), null, set("baz", "bar")));
-
-        try {
-            new SimpleRecordSchema(fields);
-            Assert.fail("Was able to create two fields with same alias");
-        } catch (final IllegalArgumentException expected) {
-        }
-    }
-
-    @Test
-    public void testPreventsTwoFieldsWithSameName() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("foo", "bar")));
-        fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType()));
-
-        try {
-            new SimpleRecordSchema(fields);
-            Assert.fail("Was able to create two fields with same name");
-        } catch (final IllegalArgumentException expected) {
-        }
-    }
-
-    @Test
-    public void testPreventsTwoFieldsWithConflictingNamesAliases() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("foo", "bar")));
-        fields.add(new RecordField("bar", RecordFieldType.STRING.getDataType()));
-
-        try {
-            new SimpleRecordSchema(fields);
-            Assert.fail("Was able to create two fields with conflicting names/aliases");
-        } catch (final IllegalArgumentException expected) {
-        }
-    }
-
-    private Set<String> set(final String... values) {
-        final Set<String> set = new HashSet<>();
-        for (final String value : values) {
-            set.add(value);
-        }
-        return set;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
deleted file mode 100644
index 82e20a6..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.serialization.record;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestMapRecord {
-
-    @Test
-    public void testDefaultValue() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("noDefault", RecordFieldType.STRING.getDataType()));
-        fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello"));
-
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Map<String, Object> values = new HashMap<>();
-        final Record record = new MapRecord(schema, values);
-
-        assertNull(record.getValue("noDefault"));
-        assertEquals("hello", record.getValue("defaultOfHello"));
-    }
-
-    @Test
-    public void testDefaultValueInGivenField() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("noDefault", RecordFieldType.STRING.getDataType()));
-        fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello"));
-
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Map<String, Object> values = new HashMap<>();
-        final Record record = new MapRecord(schema, values);
-
-        assertNull(record.getValue("noDefault"));
-        assertEquals("hello", record.getValue("defaultOfHello"));
-
-        final RecordField newField = new RecordField("noDefault", RecordFieldType.STRING.getDataType(), "new");
-        assertEquals("new", record.getValue(newField));
-    }
-
-    @Test
-    public void testIllegalDefaultValue() {
-        new RecordField("hello", RecordFieldType.STRING.getDataType(), 84);
-        new RecordField("hello", RecordFieldType.STRING.getDataType(), (Object) null);
-        new RecordField("hello", RecordFieldType.INT.getDataType(), 84);
-        new RecordField("hello", RecordFieldType.INT.getDataType(), (Object) null);
-
-        try {
-            new RecordField("hello", RecordFieldType.INT.getDataType(), "foo");
-            Assert.fail("Was able to set a default value of \"foo\" for INT type");
-        } catch (final IllegalArgumentException expected) {
-            // expected
-        }
-    }
-
-    private Set<String> set(final String... values) {
-        final Set<String> set = new HashSet<>();
-        for (final String value : values) {
-            set.add(value);
-        }
-        return set;
-    }
-
-    @Test
-    public void testAliasOneValue() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
-
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Map<String, Object> values = new HashMap<>();
-        values.put("bar", 1);
-
-        final Record record = new MapRecord(schema, values);
-        assertEquals(1, record.getValue("foo"));
-        assertEquals(1, record.getValue("bar"));
-        assertEquals(1, record.getValue("baz"));
-    }
-
-    @Test
-    public void testAliasConflictingValues() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
-
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Map<String, Object> values = new HashMap<>();
-        values.put("bar", 1);
-        values.put("foo", null);
-
-        final Record record = new MapRecord(schema, values);
-        assertEquals(1, record.getValue("foo"));
-        assertEquals(1, record.getValue("bar"));
-        assertEquals(1, record.getValue("baz"));
-    }
-
-    @Test
-    public void testAliasConflictingAliasValues() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
-
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Map<String, Object> values = new HashMap<>();
-        values.put("baz", 1);
-        values.put("bar", 33);
-
-        final Record record = new MapRecord(schema, values);
-        assertEquals(33, record.getValue("foo"));
-        assertEquals(33, record.getValue("bar"));
-        assertEquals(33, record.getValue("baz"));
-    }
-
-    @Test
-    public void testAliasInGivenField() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
-
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Map<String, Object> values = new HashMap<>();
-        values.put("bar", 33);
-
-        final Record record = new MapRecord(schema, values);
-        assertEquals(33, record.getValue("foo"));
-        assertEquals(33, record.getValue("bar"));
-        assertEquals(33, record.getValue("baz"));
-
-        final RecordField noAlias = new RecordField("hello", RecordFieldType.STRING.getDataType());
-        assertNull(record.getValue(noAlias));
-
-        final RecordField withAlias = new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("baz"));
-        assertEquals(33, record.getValue(withAlias));
-        assertEquals("33", record.getAsString(withAlias, withAlias.getDataType().getFormat()));
-    }
-
-
-    @Test
-    public void testDefaultValueWithAliasValue() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz")));
-
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Map<String, Object> values = new HashMap<>();
-        values.put("baz", 1);
-        values.put("bar", 33);
-
-        final Record record = new MapRecord(schema, values);
-        assertEquals(33, record.getValue("foo"));
-        assertEquals(33, record.getValue("bar"));
-        assertEquals(33, record.getValue("baz"));
-    }
-
-    @Test
-    public void testDefaultValueWithAliasesDefined() {
-        final List<RecordField> fields = new ArrayList<>();
-        fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz")));
-
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-        final Map<String, Object> values = new HashMap<>();
-        final Record record = new MapRecord(schema, values);
-        assertEquals("hello", record.getValue("foo"));
-        assertEquals("hello", record.getValue("bar"));
-        assertEquals("hello", record.getValue("baz"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 16479f1..81a6775 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -35,9 +35,17 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-schema-registry-service-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.jayway.jsonpath</groupId>
             <artifactId>json-path</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
index f5b4373..ae6254e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -27,6 +27,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
@@ -50,7 +51,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
 
     @Override
     public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
-        final String schemaAccessStrategy = getConfigurationContext().getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
+        final String schemaAccessStrategy = getConfigurationContext().getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue();
         if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
             return new AvroReaderWithEmbeddedSchema(in);
         } else {

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
index 621ec74..13a8317 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
@@ -17,39 +17,20 @@
 
 package org.apache.nifi.avro;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.Array;
-import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.util.Utf8;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
-public abstract class AvroRecordReader implements RecordReader {
+import java.io.IOException;
+import java.util.Map;
 
+public abstract class AvroRecordReader implements RecordReader {
 
     protected abstract GenericRecord nextAvroRecord() throws IOException;
 
-
     @Override
     public Record nextRecord() throws IOException, MalformedRecordException {
         GenericRecord record = nextAvroRecord();
@@ -58,148 +39,8 @@ public abstract class AvroRecordReader implements RecordReader {
         }
 
         final RecordSchema schema = getSchema();
-        final Map<String, Object> values = convertAvroRecordToMap(record, schema);
+        final Map<String, Object> values = AvroTypeUtil.convertAvroRecordToMap(record, schema);
         return new MapRecord(schema, values);
     }
 
-
-    private Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema) {
-        final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
-
-        for (final RecordField recordField : recordSchema.getFields()) {
-            Object value = avroRecord.get(recordField.getFieldName());
-            if (value == null) {
-                for (final String alias : recordField.getAliases()) {
-                    value = avroRecord.get(alias);
-                    if (value != null) {
-                        break;
-                    }
-                }
-            }
-
-            final String fieldName = recordField.getFieldName();
-            final Field avroField = avroRecord.getSchema().getField(fieldName);
-            if (avroField == null) {
-                values.put(fieldName, null);
-                continue;
-            }
-
-            final Schema fieldSchema = avroField.schema();
-            final Object rawValue = normalizeValue(value, fieldSchema);
-
-            final DataType desiredType = recordField.getDataType();
-            final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName);
-
-            values.put(fieldName, coercedValue);
-        }
-
-        return values;
-    }
-
-    private Object normalizeValue(final Object value, final Schema avroSchema) {
-        if (value == null) {
-            return null;
-        }
-
-        switch (avroSchema.getType()) {
-            case INT: {
-                final LogicalType logicalType = avroSchema.getLogicalType();
-                if (logicalType == null) {
-                    return value;
-                }
-
-                final String logicalName = logicalType.getName();
-                if (LogicalTypes.date().getName().equals(logicalName)) {
-                    // date logical name means that the value is number of days since Jan 1, 1970
-                    return new java.sql.Date(TimeUnit.DAYS.toMillis((int) value));
-                } else if (LogicalTypes.timeMillis().equals(logicalName)) {
-                    // time-millis logical name means that the value is number of milliseconds since midnight.
-                    return new java.sql.Time((int) value);
-                }
-
-                break;
-            }
-            case LONG: {
-                final LogicalType logicalType = avroSchema.getLogicalType();
-                if (logicalType == null) {
-                    return value;
-                }
-
-                final String logicalName = logicalType.getName();
-                if (LogicalTypes.timeMicros().getName().equals(logicalName)) {
-                    return new java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value));
-                } else if (LogicalTypes.timestampMillis().getName().equals(logicalName)) {
-                    return new java.sql.Timestamp((long) value);
-                } else if (LogicalTypes.timestampMicros().getName().equals(logicalName)) {
-                    return new java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value));
-                }
-                break;
-            }
-            case UNION:
-                if (value instanceof GenericData.Record) {
-                    final GenericData.Record avroRecord = (GenericData.Record) value;
-                    return normalizeValue(value, avroRecord.getSchema());
-                }
-                break;
-            case RECORD:
-                final GenericData.Record record = (GenericData.Record) value;
-                final Schema recordSchema = record.getSchema();
-                final List<Field> recordFields = recordSchema.getFields();
-                final Map<String, Object> values = new HashMap<>(recordFields.size());
-                for (final Field field : recordFields) {
-                    final Object avroFieldValue = record.get(field.name());
-                    final Object fieldValue = normalizeValue(avroFieldValue, field.schema());
-                    values.put(field.name(), fieldValue);
-                }
-                final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema);
-                return new MapRecord(childSchema, values);
-            case BYTES:
-                final ByteBuffer bb = (ByteBuffer) value;
-                return AvroTypeUtil.convertByteArray(bb.array());
-            case FIXED:
-                final GenericFixed fixed = (GenericFixed) value;
-                return AvroTypeUtil.convertByteArray(fixed.bytes());
-            case ENUM:
-                return value.toString();
-            case NULL:
-                return null;
-            case STRING:
-                return value.toString();
-            case ARRAY:
-                final Array<?> array = (Array<?>) value;
-                final Object[] valueArray = new Object[array.size()];
-                for (int i = 0; i < array.size(); i++) {
-                    final Schema elementSchema = avroSchema.getElementType();
-                    valueArray[i] = normalizeValue(array.get(i), elementSchema);
-                }
-                return valueArray;
-            case MAP:
-                final Map<?, ?> avroMap = (Map<?, ?>) value;
-                final Map<String, Object> map = new HashMap<>(avroMap.size());
-                for (final Map.Entry<?, ?> entry : avroMap.entrySet()) {
-                    Object obj = entry.getValue();
-                    if (obj instanceof Utf8 || obj instanceof CharSequence) {
-                        obj = obj.toString();
-                    }
-
-                    final String key = entry.getKey().toString();
-                    obj = normalizeValue(obj, avroSchema.getValueType());
-
-                    map.put(key, obj);
-                }
-
-                final DataType elementType = AvroTypeUtil.determineDataType(avroSchema.getValueType());
-                final List<RecordField> mapFields = new ArrayList<>();
-                for (final String key : map.keySet()) {
-                    mapFields.add(new RecordField(key, elementType));
-                }
-                final RecordSchema mapSchema = new SimpleRecordSchema(mapFields);
-                return new MapRecord(mapSchema, map);
-        }
-
-        return value;
-    }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index 381e978..62e53ea 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -33,6 +33,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaField;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
@@ -60,7 +61,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
-        properties.add(SCHEMA_ACCESS_STRATEGY);
+        properties.add(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY);
         properties.add(SCHEMA_REGISTRY);
         return properties;
     }