You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/09/13 13:09:29 UTC

[nifi] branch master updated: fix deserialization issues with NiFiRecordSerDe for hive3streaming

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d1bb09  fix deserialization issues with NiFiRecordSerDe for hive3streaming
3d1bb09 is described below

commit 3d1bb09ff85462b08ffadcd2cbc8a5d6036a7cdc
Author: korir <gi...@gmail.com>
AuthorDate: Fri May 31 19:50:09 2019 +0300

    fix deserialization issues with NiFiRecordSerDe for hive3streaming
    
    NIFI-6295: Refactored NiFiRecordSerDe to handle nested complex types
    
    NIFI-6295: Incorporated review comments
    
    NIFI-6295: Refactored unit tests for NiFiRecordSerDe
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3684
---
 .../nifi-hive-bundle/nifi-hive3-processors/pom.xml |  33 +-
 .../org/apache/hive/streaming/NiFiRecordSerDe.java | 248 +++++++-------
 .../apache/hive/streaming/TestNiFiRecordSerDe.java | 373 +++++++++++++++++++++
 .../processors/hive/TestPutHive3Streaming.java     |  95 +++++-
 .../src/test/resources/nested-map-input.json       |  18 +
 .../src/test/resources/nested-map-schema.avsc      |  48 +++
 6 files changed, 679 insertions(+), 136 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
index e790283..38f72a5 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
@@ -13,7 +13,7 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -140,5 +140,36 @@
             <artifactId>hamcrest-all</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/nested-map-input.json</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
index e628474..51a06c8 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -38,14 +40,17 @@ import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.TimestampParser;
 import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
-import java.io.IOException;
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -57,6 +62,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 public class NiFiRecordSerDe extends AbstractSerDe {
 
@@ -71,8 +77,6 @@ public class NiFiRecordSerDe extends AbstractSerDe {
 
     private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
 
-    private Map<String, Integer> fieldPositionMap;
-
     public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
         this.recordReader = recordReader;
         this.log = log;
@@ -114,12 +118,6 @@ public class NiFiRecordSerDe extends AbstractSerDe {
         log.debug("schema : {}", new Object[]{schema});
         cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
         tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
-        // Populate mapping of field names to column positions
-        try {
-            populateFieldPositionMap();
-        } catch (MalformedRecordException | IOException e) {
-            throw new SerDeException(e);
-        }
         stats = new SerDeStats();
     }
 
@@ -142,43 +140,40 @@ public class NiFiRecordSerDe extends AbstractSerDe {
     public Object deserialize(Writable writable) throws SerDeException {
         ObjectWritable t = (ObjectWritable) writable;
         Record record = (Record) t.get();
-        List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
+
+        List<Object> result = deserialize(record, schema);
+
+        stats.setRowCount(stats.getRowCount() + 1);
+
+        return result;
+    }
+
+    private List<Object> deserialize(Record record, StructTypeInfo schema) throws SerDeException {
+        List<Object> result = new ArrayList<>(Collections.nCopies(schema.getAllStructFieldNames().size(), null));
+
         try {
             RecordSchema recordSchema = record.getSchema();
             for (RecordField field : recordSchema.getFields()) {
-                String fieldName = field.getFieldName();
-                String normalizedFieldName = fieldName.toLowerCase();
-
-                // Get column position of field name, and set field value there
-                Integer fpos = fieldPositionMap.get(normalizedFieldName);
-                if(fpos == null || fpos == -1) {
-                    // This is either a partition column or not a column in the target table, ignore either way
-                    continue;
-                }
-                Object currField = extractCurrentField(record, field, schema.getStructFieldTypeInfo(normalizedFieldName));
-                r.set(fpos, currField);
+                populateRecord(result, record.getValue(field), field, schema);
             }
-            stats.setRowCount(stats.getRowCount() + 1);
-
+        } catch(SerDeException se) {
+            log.error("Error [{}] parsing Record [{}].", new Object[]{se.toString(), record}, se);
+            throw se;
         } catch (Exception e) {
-            log.warn("Error [{}] parsing Record [{}].", new Object[]{e.toString(), t}, e);
+            log.error("Error [{}] parsing Record [{}].", new Object[]{e.toString(), record}, e);
             throw new SerDeException(e);
         }
 
-        return r;
+        return result;
     }
 
-    /**
-     * Utility method to extract current expected field from given record.
-     */
     @SuppressWarnings("unchecked")
-    private Object extractCurrentField(Record record, RecordField field, TypeInfo fieldTypeInfo) throws SerDeException {
-        Object val;
-        if (field == null) {
+    private Object extractCurrentField(final Object fieldValue, final String fieldName, final DataType fieldDataType, final TypeInfo fieldTypeInfo) throws SerDeException {
+        if(fieldValue == null){
             return null;
         }
-        String fieldName = field.getFieldName();
 
+        Object val;
         switch (fieldTypeInfo.getCategory()) {
             case PRIMITIVE:
                 PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = PrimitiveObjectInspector.PrimitiveCategory.UNKNOWN;
@@ -187,152 +182,151 @@ public class NiFiRecordSerDe extends AbstractSerDe {
                 }
                 switch (primitiveCategory) {
                     case BYTE:
-                        Integer bIntValue = record.getAsInt(fieldName);
-                        val = bIntValue == null ? null : bIntValue.byteValue();
+                        Integer bIntValue = DataTypeUtils.toInteger(fieldValue, fieldName);
+                        val = bIntValue.byteValue();
                         break;
                     case SHORT:
-                        Integer sIntValue = record.getAsInt(fieldName);
-                        val = sIntValue == null ? null : sIntValue.shortValue();
+                        Integer sIntValue = DataTypeUtils.toInteger(fieldValue, fieldName);
+                        val = sIntValue.shortValue();
                         break;
                     case INT:
-                        val = record.getAsInt(fieldName);
+                        val = DataTypeUtils.toInteger(fieldValue, fieldName);
                         break;
                     case LONG:
-                        val = record.getAsLong(fieldName);
+                        val = DataTypeUtils.toLong(fieldValue, fieldName);
                         break;
                     case BOOLEAN:
-                        val = record.getAsBoolean(fieldName);
+                        val = DataTypeUtils.toBoolean(fieldValue, fieldName);
                         break;
                     case FLOAT:
-                        val = record.getAsFloat(fieldName);
+                        val = DataTypeUtils.toFloat(fieldValue, fieldName);
                         break;
                     case DOUBLE:
-                        val = record.getAsDouble(fieldName);
+                        val = DataTypeUtils.toDouble(fieldValue, fieldName);
                         break;
                     case STRING:
                     case VARCHAR:
                     case CHAR:
-                        val = record.getAsString(fieldName);
+                        val = DataTypeUtils.toString(fieldValue, fieldName);
                         break;
                     case BINARY:
-                        Object[] array = record.getAsArray(fieldName);
-                        if (array == null) {
-                            return null;
+                        final ArrayDataType arrayDataType;
+                        if(fieldValue instanceof String) {
+                            // Treat this as an array of bytes
+                            arrayDataType = (ArrayDataType) RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+                        } else {
+                            arrayDataType = (ArrayDataType) fieldDataType;
                         }
+                        Object[] array = DataTypeUtils.toArray(fieldValue, fieldName, arrayDataType.getElementType());
                         val = AvroTypeUtil.convertByteArray(array).array();
                         break;
                     case DATE:
-                        Date d = record.getAsDate(fieldName, field.getDataType().getFormat());
-                        if(d != null) {
-                            org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date();
-                            hiveDate.setTimeInMillis(d.getTime());
-                            val = hiveDate;
-                        } else {
-                            val = null;
-                        }
+                        Date d = DataTypeUtils.toDate(fieldValue, () -> DataTypeUtils.getDateFormat(fieldDataType.getFormat()), fieldName);
+                        org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date();
+                        hiveDate.setTimeInMillis(d.getTime());
+                        val = hiveDate;
                         break;
                     // ORC doesn't currently handle TIMESTAMPLOCALTZ
                     case TIMESTAMP:
-                        Timestamp ts = DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(field.getDataType().getFormat()), fieldName);
-                        if(ts != null) {
-                            // Convert to Hive's Timestamp type
-                            org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp();
-                            hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos());
-                            val = hivetimestamp;
-                        } else {
-                            val = null;
-                        }
+                        Timestamp ts = DataTypeUtils.toTimestamp(fieldValue, () -> DataTypeUtils.getDateFormat(fieldDataType.getFormat()), fieldName);
+                        // Convert to Hive's Timestamp type
+                        org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp();
+                        hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos());
+                        val = hivetimestamp;
                         break;
                     case DECIMAL:
-                        Double value = record.getAsDouble(fieldName);
-                        val = value == null ? null : HiveDecimal.create(value);
+                        if(fieldValue instanceof BigDecimal){
+                            val = HiveDecimal.create((BigDecimal) fieldValue);
+                        } else if (fieldValue instanceof Number) {
+                            val = HiveDecimal.create(((Number)fieldValue).doubleValue());
+                        } else {
+                            val = HiveDecimal.create(DataTypeUtils.toDouble(fieldValue, fieldDataType.getFormat()));
+                        }
                         break;
                     default:
                         throw new IllegalArgumentException("Field " + fieldName + " cannot be converted to type: " + primitiveCategory.name());
                 }
                 break;
             case LIST:
-                Object[] value = record.getAsArray(fieldName);
-                val = value == null ? null : Arrays.asList(value);
+                Object[] value = (Object[])fieldValue;
+                ListTypeInfo listTypeInfo = (ListTypeInfo)fieldTypeInfo;
+                TypeInfo nestedType = listTypeInfo.getListElementTypeInfo();
+                List<Object> converted = new ArrayList<>(value.length);
+                for (Object o : value) {
+                    converted.add(extractCurrentField(o, fieldName, ((ArrayDataType) fieldDataType).getElementType(), nestedType));
+                }
+                val = converted;
                 break;
             case MAP:
-                val = record.getValue(fieldName);
+                //in nifi all maps are <String,?> so use that
+                Map<String, Object> valueMap = (Map<String,Object>)fieldValue;
+                MapTypeInfo mapTypeInfo = (MapTypeInfo)fieldTypeInfo;
+                Map<Object, Object> convertedMap = new HashMap<>(valueMap.size());
+                //get a key record field, nifi map keys are always string. synthesize new
+                //record fields for the map field key and value.
+                for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
+                    convertedMap.put(
+                            extractCurrentField(entry.getKey(), fieldName + ".key", RecordFieldType.STRING.getDataType(), mapTypeInfo.getMapKeyTypeInfo()),
+                            extractCurrentField(entry.getValue(), fieldName + ".value", ((MapDataType) fieldDataType).getValueType(), mapTypeInfo.getMapValueTypeInfo())
+                    );
+                }
+                val = convertedMap;
                 break;
             case STRUCT:
-                // The Hive StandardStructObjectInspector expects the object corresponding to a "struct" to be an array or List rather than a Map.
-                // Do the conversion here, calling extractCurrentField recursively to traverse any nested structs.
-                Record nestedRecord = (Record) record.getValue(fieldName);
-                if (nestedRecord == null) {
-                    return null;
-                }
-                try {
-                    RecordSchema recordSchema = nestedRecord.getSchema();
-                    List<RecordField> recordFields = recordSchema.getFields();
-                    if (recordFields == null || recordFields.isEmpty()) {
-                        return Collections.emptyList();
-                    }
-                    // This List will hold the values of the entries in the Map
-                    List<Object> structList = new ArrayList<>(recordFields.size());
-                    StructTypeInfo typeInfo = (StructTypeInfo) schema.getStructFieldTypeInfo(fieldName);
-                    for (RecordField nestedRecordField : recordFields) {
-                        String fName = nestedRecordField.getFieldName();
-                        String normalizedFieldName = fName.toLowerCase();
-                        structList.add(extractCurrentField(nestedRecord, nestedRecordField, typeInfo.getStructFieldTypeInfo(normalizedFieldName)));
-                    }
-                    return structList;
-                } catch (Exception e) {
-                    log.warn("Error [{}] parsing Record [{}].", new Object[]{e.toString(), nestedRecord}, e);
-                    throw new SerDeException(e);
-                }
-                // break unreachable
+                Record nestedRecord = (Record) fieldValue;
+                StructTypeInfo s = (StructTypeInfo) fieldTypeInfo;
+                val = deserialize(nestedRecord, s);
+                break;
             default:
-                log.error("Unknown type found: " + fieldTypeInfo + "for field of type: " + field.getDataType().toString());
+                log.error("Unknown type found: " + fieldTypeInfo + "for field of type: " + fieldDataType.toString());
                 return null;
         }
         return val;
     }
 
+
+
     @Override
     public ObjectInspector getObjectInspector() {
         return cachedObjectInspector;
     }
 
-    private void populateFieldPositionMap() throws MalformedRecordException, IOException {
-        // Populate the mapping of field names to column positions only once
-        fieldPositionMap = new HashMap<>(columnNames.size());
 
-        RecordSchema recordSchema = recordReader.getSchema();
-        for (RecordField field : recordSchema.getFields()) {
-            String fieldName = field.getFieldName();
-            String normalizedFieldName = fieldName.toLowerCase();
 
-            int fpos = schema.getAllStructFieldNames().indexOf(fieldName.toLowerCase());
+    private void populateRecord(List<Object> r, Object value, RecordField field, StructTypeInfo typeInfo) throws SerDeException {
+
+        String fieldName = field.getFieldName();
+        String normalizedFieldName = fieldName.toLowerCase();
+
+        // Normalize struct field names and search for the specified (normalized) field name
+        int fpos = typeInfo.getAllStructFieldNames().stream().map((s) -> s == null ? null : s.toLowerCase()).collect(Collectors.toList()).indexOf(normalizedFieldName);
+        if (fpos == -1) {
+            Matcher m = INTERNAL_PATTERN.matcher(fieldName);
+            fpos = m.matches() ? Integer.parseInt(m.group(1)) : -1;
+
+            log.debug("NPE finding position for field [{}] in schema [{}],"
+                    + " attempting to check if it is an internal column name like _col0", new Object[]{fieldName, typeInfo});
             if (fpos == -1) {
-                Matcher m = INTERNAL_PATTERN.matcher(fieldName);
-                fpos = m.matches() ? Integer.parseInt(m.group(1)) : -1;
-
-                log.debug("NPE finding position for field [{}] in schema [{}],"
-                        + " attempting to check if it is an internal column name like _col0", new Object[]{fieldName, schema});
-                if (fpos == -1) {
-                    // unknown field, we return. We'll continue from the next field onwards. Log at debug level because partition columns will be "unknown fields"
-                    log.debug("Field {} is not found in the target table, ignoring...", new Object[]{field.getFieldName()});
-                    continue;
-                }
-                // If we get past this, then the column name did match the hive pattern for an internal
-                // column name, such as _col0, etc, so it *MUST* match the schema for the appropriate column.
-                // This means people can't use arbitrary column names such as _col0, and expect us to ignore it
-                // if we find it.
-                if (!fieldName.equalsIgnoreCase(HiveConf.getColumnInternalName(fpos))) {
-                    log.error("Hive internal column name {} and position "
-                            + "encoding {} for the column name are at odds", new Object[]{fieldName, fpos});
-                    throw new IOException("Hive internal column name (" + fieldName
-                            + ") and position encoding (" + fpos
-                            + ") for the column name are at odds");
-                }
-                // If we reached here, then we were successful at finding an alternate internal
-                // column mapping, and we're about to proceed.
+                // unknown field, we return. We'll continue from the next field onwards. Log at debug level because partition columns will be "unknown fields"
+                log.debug("Field {} is not found in the target table, ignoring...", new Object[]{field.getFieldName()});
+                return;
             }
-            fieldPositionMap.put(normalizedFieldName, fpos);
+            // If we get past this, then the column name did match the hive pattern for an internal
+            // column name, such as _col0, etc, so it *MUST* match the schema for the appropriate column.
+            // This means people can't use arbitrary column names such as _col0, and expect us to ignore it
+            // if we find it.
+            if (!fieldName.equalsIgnoreCase(HiveConf.getColumnInternalName(fpos))) {
+                log.error("Hive internal column name {} and position "
+                        + "encoding {} for the column name are at odds", new Object[]{fieldName, fpos});
+                throw new SerDeException("Hive internal column name (" + fieldName
+                        + ") and position encoding (" + fpos
+                        + ") for the column name are at odds");
+            }
+            // If we reached here, then we were successful at finding an alternate internal
+            // column mapping, and we're about to proceed.
         }
+        Object currField = extractCurrentField(value, fieldName, field.getDataType(), typeInfo.getStructFieldTypeInfo(normalizedFieldName));
+        r.set(fpos, currField);
     }
+
 }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/hive/streaming/TestNiFiRecordSerDe.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/hive/streaming/TestNiFiRecordSerDe.java
new file mode 100644
index 0000000..2d22571
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/hive/streaming/TestNiFiRecordSerDe.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestNiFiRecordSerDe {
+
+    @Test
+    public void testSimpleFields() throws SerDeException {
+        NiFiRecordSerDe serDe = createSerDe(
+                "bytec,shortc,intc,longc,boolc,floatc,doublec,stringc,varcharc,charc,datec,timestampc,decimalc",
+                "tinyint:smallint:int:bigint:boolean:float:double:string:varchar(50):char(1):date:timestamp:decimal"
+        );
+        RecordSchema schema = new SimpleRecordSchema(
+                Arrays.asList(
+                        new RecordField("bytec", RecordFieldType.BYTE.getDataType()),
+                        new RecordField("shortc", RecordFieldType.SHORT.getDataType()),
+                        new RecordField("intc", RecordFieldType.INT.getDataType()),
+                        new RecordField("longc", RecordFieldType.LONG.getDataType()),
+                        new RecordField("boolc", RecordFieldType.BOOLEAN.getDataType()),
+                        new RecordField("floatc", RecordFieldType.FLOAT.getDataType()),
+                        new RecordField("doublec", RecordFieldType.DOUBLE.getDataType()),
+                        new RecordField("stringc", RecordFieldType.STRING.getDataType()),
+                        new RecordField("varcharc", RecordFieldType.STRING.getDataType()),
+                        new RecordField("charc", RecordFieldType.CHAR.getDataType()),
+                        new RecordField("datec", RecordFieldType.DATE.getDataType("yyyy-MM-dd")),
+                        new RecordField("timestampc", RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd HH:mm:ss")),
+                        new RecordField("decimalc", RecordFieldType.DOUBLE.getDataType())
+                )
+        );
+
+        long currentTimeMillis = System.currentTimeMillis();
+
+        HashMap<String, Object> input = new HashMap<String, Object>() {{
+            put("bytec", (byte) 2);
+            put("shortc", (short) 45);
+            put("intc", 95);
+            put("longc", 876L);
+            put("boolc", Boolean.TRUE);
+            put("floatc", 4.56f);
+            put("doublec", 2.3445);
+            put("stringc", "test");
+            put("varcharc", "test2");
+            put("charc", 'c');
+            put("datec", new java.sql.Date(currentTimeMillis));
+            put("timestampc", new java.sql.Timestamp(currentTimeMillis));
+            put("decimalc", 0.45);
+        }};
+
+        Date date = new Date();
+        date.setTimeInMillis(currentTimeMillis);
+        Timestamp ts = new Timestamp();
+        ts.setTimeInMillis(currentTimeMillis);
+
+        List<Object> expected = Arrays.asList(
+                Byte.valueOf("2"),
+                Short.valueOf("45"),
+                95,
+                876L,
+                Boolean.TRUE,
+                4.56f,
+                2.3445,
+                "test",
+                "test2",
+                "c",
+                date,
+                ts,
+                HiveDecimal.create("0.45")
+        );
+
+        Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input)));
+
+        assertEquals(expected, deserialized);
+    }
+
+    @Test
+    public void testArrays() throws SerDeException {
+        NiFiRecordSerDe serDe = createSerDe(
+                "binaryc,binaryc2",
+                "binary:binary"
+        );
+        RecordSchema schema = new SimpleRecordSchema(
+                Arrays.asList(
+                        new RecordField("binaryc", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())),
+                        new RecordField("binaryc2", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))
+                )
+        );
+
+        HashMap<String, Object> input = new HashMap<String, Object>() {{
+            put("binaryc", new byte[]{1, 2});
+            put("binaryc2", "Hello");
+        }};
+
+
+        Object[] expected = new Object[]{
+                new byte[]{1, 2},
+                "Hello".getBytes(StandardCharsets.UTF_8)
+        };
+
+        Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input)));
+
+        assertArrayEquals(expected, ((List)deserialized).toArray());
+    }
+
+    @Test
+    public void testStructField() throws SerDeException{
+        NiFiRecordSerDe serDe = createSerDe("structc",
+                "struct<age:int,name:string>"
+        );
+        RecordSchema innerSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("age", RecordFieldType.INT.getDataType()),
+                new RecordField("name", RecordFieldType.STRING.getDataType())
+        ));
+        RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("structc", RecordFieldType.RECORD.getRecordDataType(innerSchema))
+        ));
+
+        HashMap<String, Object> value = new HashMap<String, Object>() {{
+            put("structc", new MapRecord(innerSchema, new HashMap<String, Object>() {{
+                put("age", 15);
+                put("name", "gideon");
+            }}));
+        }};
+
+        List<Object> expected = Collections.singletonList(
+                Arrays.asList(15, "gideon")
+        );
+
+        Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, value)));
+
+        assertEquals(expected, deserialized);
+    }
+
+    @Test
+    public void testSimpleArray() throws SerDeException{
+        long now = System.currentTimeMillis();
+        Date hiveDate = new Date();
+        hiveDate.setTimeInMillis(now);
+        Timestamp hiveTs = new Timestamp();
+        hiveTs.setTimeInMillis(now);
+
+        testSimpleArray("tinyint", RecordFieldType.BYTE.getDataType(), new Byte[] { 5, 29 },
+                new Byte[] { 5, 29 });
+        testSimpleArray("smallint", RecordFieldType.SHORT.getDataType(), new Short[] { 5, 29 },
+                new Short[] { 5, 29 });
+        testSimpleArray("int", RecordFieldType.INT.getDataType(), new Object[] { 1, 2, 3 ,4, 5 },
+                new Object[] { 1, 2, 3, 4, 5 });
+        testSimpleArray("bigint", RecordFieldType.LONG.getDataType(), new Object[] { 298767L, 89876L },
+                new Object[] { 298767L, 89876L });
+        testSimpleArray("boolean", RecordFieldType.BOOLEAN.getDataType(), new Object[] { true, false },
+                new Object[] { true, false });
+        testSimpleArray("float", RecordFieldType.FLOAT.getDataType(), new Object[] { 1.23f, 3.14f },
+                new Object[] { 1.23f, 3.14f });
+        testSimpleArray("double", RecordFieldType.DOUBLE.getDataType(), new Object[] { 1.235, 3.142, 1.0 },
+                new Object[] { 1.235, 3.142, 1.0 });
+        testSimpleArray("string", RecordFieldType.STRING.getDataType(), new Object[] { "sasa", "wewe" },
+                new Object[] { "sasa", "wewe" });
+        testSimpleArray("varchar(20)", RecordFieldType.STRING.getDataType(), new Object[] { "niko", "fiti", "sema"},
+                new Object[]  { "niko", "fiti", "sema" });
+        testSimpleArray("char(1)", RecordFieldType.CHAR.getDataType(), new Object[] { 'a', 'b', 'c' },
+                new Object[] { "a", "b", "c"});
+        testSimpleArray("date", RecordFieldType.DATE.getDataType(), new Object[] { new java.sql.Date(now)},
+                new Object[] { hiveDate });
+        testSimpleArray("timestamp", RecordFieldType.TIMESTAMP.getDataType(), new Object[] { new java.sql.Timestamp(now)},
+                new Object[] { hiveTs });
+        testSimpleArray("decimal(10,2)", RecordFieldType.DOUBLE.getDataType(), new Object[] { 3.45, 1.25 },
+                new Object[] { HiveDecimal.create(3.45), HiveDecimal.create(1.25)});
+    }
+
+    public void testSimpleArray(String typeName, DataType elementDataType, Object[] values, Object[] expected) throws SerDeException {
+        NiFiRecordSerDe serDe = createSerDe("listc",
+                "array<" + typeName + ">"
+        );
+
+        RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("listc", RecordFieldType.ARRAY.getArrayDataType(elementDataType))
+        ));
+
+        Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, new HashMap<String, Object>() {{
+            put("listc", values);
+        }})));
+
+        List<Object> fields = (List<Object>)deserialized;
+        assertEquals(1, fields.size());
+        List<Object> nested = (List<Object>) fields.get(0);
+
+        for(int i=0; i<expected.length; i++){
+            assertEquals(expected[i], nested.get(i));
+        }
+    }
+
+    @Test
+    public void testStructArray() throws SerDeException{
+        NiFiRecordSerDe serDe = createSerDe("listc",
+                "array<struct<age:int,name:string>>"
+        );
+        RecordSchema innerSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("age", RecordFieldType.INT.getDataType()),
+                new RecordField("name", RecordFieldType.STRING.getDataType())
+        ));
+        RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("listc", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(innerSchema)))
+        ));
+        HashMap<String, Object> input = new HashMap<String, Object>() {{
+            put("listc", new Record[]{new MapRecord(innerSchema, new HashMap<String, Object>() {{
+                put("age", 15);
+                put("name", "gideon");
+            }}),
+                    new MapRecord(innerSchema, new HashMap<String, Object>() {{
+                        put("age", 87);
+                        put("name", "cucu");
+                    }})
+            });
+        }};
+
+        Object expected = Collections.singletonList(
+                Arrays.asList(
+                        Arrays.asList(15, "gideon"),
+                        Arrays.asList(87, "cucu")
+                )
+        );
+
+        Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input)));
+
+        assertEquals(expected, deserialized);
+    }
+
+    @Test
+    public void testSimpleMap() throws SerDeException{
+        testSimpleMap("string", "tinyint", RecordFieldType.BYTE.getDataType(), createMap((byte)89, (byte)2), objectMap(createMap((byte)89, (byte)2)));
+        testSimpleMap("string", "smallint", RecordFieldType.SHORT.getDataType(), createMap((short)89, (short)209), objectMap(createMap((short)89, (short)209)));
+        testSimpleMap("string", "int", RecordFieldType.INT.getDataType(), createMap(90, 87), objectMap(createMap(90, 87)));
+        testSimpleMap("string", "bigint", RecordFieldType.BIGINT.getDataType(), createMap(87888L, 876L, 123L), objectMap(createMap(87888L, 876L, 123L)));
+        testSimpleMap("string", "boolean", RecordFieldType.BOOLEAN.getDataType(), createMap(false, true, true, false), objectMap(createMap(false, true, true, false)));
+        testSimpleMap("string", "float", RecordFieldType.FLOAT.getDataType(), createMap(1.2f, 8.6f, 0.125f), objectMap(createMap(1.2f, 8.6f, 0.125f)));
+        testSimpleMap("string", "double", RecordFieldType.DOUBLE.getDataType(), createMap(3.142, 8.93), objectMap(createMap(3.142, 8.93)));
+        testSimpleMap("string", "string", RecordFieldType.STRING.getDataType(), createMap("form", "ni", "aje"), objectMap(createMap("form", "ni", "aje")));
+        testSimpleMap("string", "varchar(20)", RecordFieldType.STRING.getDataType(), createMap("niko", "kiza"), objectMap(createMap("niko", "kiza")));
+        testSimpleMap("string", "char(1)", RecordFieldType.CHAR.getDataType(), createMap('a', 'b', 'c'), objectMap(createMap("a", "b", "c")));
+        long now = System.currentTimeMillis();
+        Date hiveDate = new Date();
+        hiveDate.setTimeInMillis(now);
+        Timestamp hiveTs = new Timestamp();
+        hiveTs.setTimeInMillis(now);
+
+        testSimpleMap("string", "date", RecordFieldType.DATE.getDataType(), createMap(new java.sql.Date(now)), objectMap(createMap(hiveDate)));
+        testSimpleMap("string", "timestamp", RecordFieldType.TIMESTAMP.getDataType(), createMap(new java.sql.Timestamp(now)), objectMap(createMap(hiveTs)));
+        testSimpleMap("string", "decimal(10,2)", RecordFieldType.DOUBLE.getDataType(), createMap(45.6, 2345.5), objectMap(createMap(
+                HiveDecimal.create(45.6), HiveDecimal.create(2345.5)
+        )));
+    }
+
+    private Map<String,Object> createMap(Object... keyValues){
+        Map<String,Object> map = new HashMap<>(keyValues.length);
+        for(int i=0; i<keyValues.length; i++){
+            map.put("key." + i, keyValues[i]);
+        }
+        return  map;
+    }
+
+    Map<Object,Object> objectMap(Map<String,Object> input){
+        return new HashMap<>(input);
+    }
+
+    private void testSimpleMap(String keyType, String valueType, DataType fieldType, Map<String, Object> fields, Map<Object, Object> expected) throws SerDeException{
+        NiFiRecordSerDe serDe = createSerDe("mapc",
+                "map<" + keyType + "," + valueType + ">"
+        );
+        RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("mapc", RecordFieldType.MAP.getMapDataType(fieldType))
+        ));
+
+        Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, new HashMap<String, Object>(){
+            {
+                put("mapc", fields);
+            }
+        })));
+        List<Object> desFields = (List<Object>)deserialized;
+        assertEquals(1, desFields.size());
+        Map<Object,Object> map = (Map<Object, Object>)desFields.get(0);
+        for(Map.Entry<Object, Object> entry: expected.entrySet()){
+            assertEquals(entry.getValue(), map.get(entry.getKey()));
+        }
+    }
+
+    @Test
+    public void testStructMap() throws SerDeException{
+        NiFiRecordSerDe serDe = createSerDe(
+                "mapc",
+                "map<string,struct<id:int,balance:decimal(18,2)>>"
+        );
+        RecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.INT.getDataType()),
+                new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
+        ));
+        RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("mapc", RecordFieldType.MAP.getMapDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema)))
+        ));
+
+        HashMap<String, Object> input = new HashMap<String, Object>() {{
+            put("mapc", new HashMap<String, Object>() {{
+                put("current", new MapRecord(recordSchema, new HashMap<String, Object>() {{
+                    put("id", 1);
+                    put("balance", 56.9);
+                }}));
+                put("savings", new MapRecord(recordSchema, new HashMap<String, Object>() {{
+                    put("id", 2);
+                    put("balance", 104.65);
+                }}));
+            }});
+        }};
+
+        Object expected = Collections.singletonList(
+                new HashMap<String, Object>() {{
+                    put("current", Arrays.asList(1, HiveDecimal.create(56.9)));
+                    put("savings", Arrays.asList(2, HiveDecimal.create(104.65)));
+                }}
+        );
+
+        Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input)));
+
+        assertEquals(expected, deserialized);
+    }
+
+    NiFiRecordSerDe createSerDe(String columnNames, String typeInfo) throws SerDeException{
+        Properties props = new Properties();
+        props.setProperty(serdeConstants.LIST_COLUMNS, columnNames);
+        props.setProperty(serdeConstants.LIST_COLUMN_TYPES, typeInfo);
+        NiFiRecordSerDe serDe = new NiFiRecordSerDe(null, new MockComponentLog("logger", new Object())); //reader isn't used
+        serDe.initialize(null, props); //conf isn't used
+        return  serDe;
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
index 4f49932..137becc 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
@@ -53,18 +53,24 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -80,6 +86,8 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.Arrays;
@@ -204,27 +212,29 @@ public class TestPutHive3Streaming {
         final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/array_of_records.avsc"), StandardCharsets.UTF_8);
         schema = new Schema.Parser().parse(avroSchema);
         processor.setFields(Arrays.asList(new FieldSchema("records",
-                serdeConstants.LIST_TYPE_NAME + "<"
-                        + serdeConstants.MAP_TYPE_NAME + "<"
-                        + serdeConstants.STRING_TYPE_NAME + ","
-                        +  serdeConstants.STRING_TYPE_NAME + ">>", "")));
+                "array<struct<name:string,age:string>>", "")));
         runner = TestRunners.newTestRunner(processor);
         runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
         MockRecordParser readerFactory = new MockRecordParser();
         final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
         for (final RecordField recordField : recordSchema.getFields()) {
-            readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+            //add the recordField so that we don't loose the element type data type
+            readerFactory.addSchemaField(recordField);
         }
 
         if (recordGenerator == null) {
-            Object[] mapArray = new Object[numUsers];
+            //given the schema is array of records we need the
+            //array in the records field to contain Record objects
+            MapRecord[] mapArray = new MapRecord[numUsers];
+            ArrayDataType recordsDataType = (ArrayDataType)recordSchema.getField("records").get().getDataType();
+            RecordDataType nestedStructType = (RecordDataType)recordsDataType.getElementType();
             for (int i = 0; i < numUsers; i++) {
                 final int x = i;
                 Map<String, Object> map = new HashMap<String, Object>() {{
                     put("name", "name" + x);
                     put("age", x * 5);
                 }};
-                mapArray[i] = map;
+                mapArray[i] = new MapRecord(nestedStructType.getChildSchema(), map);
             }
             readerFactory.addRecord((Object)mapArray);
         } else {
@@ -757,7 +767,9 @@ public class TestPutHive3Streaming {
         MockRecordParser readerFactory = new MockRecordParser();
         final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
         for (final RecordField recordField : recordSchema.getFields()) {
-            readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+            //add the schema field so that we don't loose the map value data type for
+            //mapc field and element type for listc field
+            readerFactory.addSchemaField(recordField);
         }
 
         List<String> enumc = Arrays.asList("SPADES", "HEARTS", "DIAMONDS", "CLUBS");
@@ -960,6 +972,73 @@ public class TestPutHive3Streaming {
     }
 
     @Test
+    public void testNestedRecords() throws Exception {
+        runner = TestRunners.newTestRunner(processor);
+        MockRecordParser readerFactory = new MockRecordParser();
+
+        final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/nested_record.avsc"), StandardCharsets.UTF_8);
+        schema = new Schema.Parser().parse(avroSchema);
+
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+        for (final RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField);
+        }
+
+        Map<String,Object> nestedRecordMap = new HashMap<>();
+        nestedRecordMap.put("id", 11088000000001615L);
+        nestedRecordMap.put("x", "Hello World!");
+
+        RecordSchema nestedRecordSchema = AvroTypeUtil.createSchema(schema.getField("myField").schema());
+        MapRecord nestedRecord = new MapRecord(nestedRecordSchema, nestedRecordMap);
+        // This gets added in to its spot in the schema, which is already named "myField"
+        readerFactory.addRecord(nestedRecord);
+
+        runner.addControllerService("mock-reader-factory", readerFactory);
+        runner.enableControllerService(readerFactory);
+
+        runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
+        runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "groups");
+
+        runner.enqueue("trigger");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHive3Streaming.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testValidateNestedMap() throws InitializationException, IOException {
+        final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/nested-map-schema.avsc")), StandardCharsets.UTF_8);
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("reader", jsonReader);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema);
+        runner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", validWriter);
+        runner.setProperty(validWriter, "Schema Write Strategy", "full-schema-attribute");
+        runner.enableControllerService(validWriter);
+
+        final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
+        runner.addControllerService("invalid-writer", invalidWriter);
+        runner.enableControllerService(invalidWriter);
+
+        runner.setProperty(PutHive3Streaming.RECORD_READER, "reader");
+        runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+        runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+        runner.setProperty(PutHive3Streaming.TABLE_NAME, "groups");
+        runner.enqueue(Paths.get("src/test/resources/nested-map-input.json"));
+        runner.run();
+
+        runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0);
+        runner.clearTransferState();
+    }
+
+    @Test
     public void cleanup() {
         processor.cleanup();
     }
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested-map-input.json b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested-map-input.json
new file mode 100644
index 0000000..ed8dc3b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested-map-input.json
@@ -0,0 +1,18 @@
+[{
+  "images" : [ {
+    "headers" : {
+      "Accept-Ranges" : "bytes",
+      "Server" : "Apache"
+    },
+    "id" : "205f40e3-2675-4c61-abc1-a0aeb609c023"
+  } ]
+},
+{
+  "images" : [ {
+    "headers" : {
+      "Accept-Ranges" : 4,
+      "Server" : 10
+    },
+    "id" : "205f40e3-2675-4c61-abc1-a0aeb609c023"
+  } ]
+}]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested-map-schema.avsc b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested-map-schema.avsc
new file mode 100644
index 0000000..3cb1a92
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested-map-schema.avsc
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ {
+  "type": "record",
+  "name": "example",
+  "fields": [
+    {
+      "name": "images",
+      "type": {
+        "type": "array",
+        "items": {
+          "type": "record",
+          "name": "images",
+          "fields": [
+            {
+              "name": "id",
+              "type": [
+                "string",
+                "null"
+              ]
+            },
+            {
+              "name": "headers",
+              "type": {
+                "type": "map",
+                "values": "string"
+              }
+            }
+          ]
+        }
+      }
+    }
+  ]
+}
\ No newline at end of file