You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/09/13 13:09:29 UTC
[nifi] branch master updated: fix deserialization issues with
NiFiRecordSerDe for hive3streaming
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 3d1bb09 fix deserialization issues with NiFiRecordSerDe for hive3streaming
3d1bb09 is described below
commit 3d1bb09ff85462b08ffadcd2cbc8a5d6036a7cdc
Author: korir <gi...@gmail.com>
AuthorDate: Fri May 31 19:50:09 2019 +0300
fix deserialization issues with NiFiRecordSerDe for hive3streaming
NIFI-6295: Refactored NiFiRecordSerDe to handle nested complex types
NIFI-6295: Incorporated review comments
NIFI-6295: Refactored unit tests for NiFiRecordSerDe
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3684
---
.../nifi-hive-bundle/nifi-hive3-processors/pom.xml | 33 +-
.../org/apache/hive/streaming/NiFiRecordSerDe.java | 248 +++++++-------
.../apache/hive/streaming/TestNiFiRecordSerDe.java | 373 +++++++++++++++++++++
.../processors/hive/TestPutHive3Streaming.java | 95 +++++-
.../src/test/resources/nested-map-input.json | 18 +
.../src/test/resources/nested-map-schema.avsc | 48 +++
6 files changed, 679 insertions(+), 136 deletions(-)
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
index e790283..38f72a5 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/pom.xml
@@ -13,7 +13,7 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -140,5 +140,36 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <exclude>src/test/resources/nested-map-input.json</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
index e628474..51a06c8 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hive/streaming/NiFiRecordSerDe.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -38,14 +40,17 @@ import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.TimestampParser;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
-import java.io.IOException;
+import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
@@ -57,6 +62,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
public class NiFiRecordSerDe extends AbstractSerDe {
@@ -71,8 +77,6 @@ public class NiFiRecordSerDe extends AbstractSerDe {
private final static Pattern INTERNAL_PATTERN = Pattern.compile("_col([0-9]+)");
- private Map<String, Integer> fieldPositionMap;
-
public NiFiRecordSerDe(RecordReader recordReader, ComponentLog log) {
this.recordReader = recordReader;
this.log = log;
@@ -114,12 +118,6 @@ public class NiFiRecordSerDe extends AbstractSerDe {
log.debug("schema : {}", new Object[]{schema});
cachedObjectInspector = (StandardStructObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);
tsParser = new TimestampParser(HiveStringUtils.splitAndUnEscape(tbl.getProperty(serdeConstants.TIMESTAMP_FORMATS)));
- // Populate mapping of field names to column positions
- try {
- populateFieldPositionMap();
- } catch (MalformedRecordException | IOException e) {
- throw new SerDeException(e);
- }
stats = new SerDeStats();
}
@@ -142,43 +140,40 @@ public class NiFiRecordSerDe extends AbstractSerDe {
public Object deserialize(Writable writable) throws SerDeException {
ObjectWritable t = (ObjectWritable) writable;
Record record = (Record) t.get();
- List<Object> r = new ArrayList<>(Collections.nCopies(columnNames.size(), null));
+
+ List<Object> result = deserialize(record, schema);
+
+ stats.setRowCount(stats.getRowCount() + 1);
+
+ return result;
+ }
+
+ private List<Object> deserialize(Record record, StructTypeInfo schema) throws SerDeException {
+ List<Object> result = new ArrayList<>(Collections.nCopies(schema.getAllStructFieldNames().size(), null));
+
try {
RecordSchema recordSchema = record.getSchema();
for (RecordField field : recordSchema.getFields()) {
- String fieldName = field.getFieldName();
- String normalizedFieldName = fieldName.toLowerCase();
-
- // Get column position of field name, and set field value there
- Integer fpos = fieldPositionMap.get(normalizedFieldName);
- if(fpos == null || fpos == -1) {
- // This is either a partition column or not a column in the target table, ignore either way
- continue;
- }
- Object currField = extractCurrentField(record, field, schema.getStructFieldTypeInfo(normalizedFieldName));
- r.set(fpos, currField);
+ populateRecord(result, record.getValue(field), field, schema);
}
- stats.setRowCount(stats.getRowCount() + 1);
-
+ } catch(SerDeException se) {
+ log.error("Error [{}] parsing Record [{}].", new Object[]{se.toString(), record}, se);
+ throw se;
} catch (Exception e) {
- log.warn("Error [{}] parsing Record [{}].", new Object[]{e.toString(), t}, e);
+ log.error("Error [{}] parsing Record [{}].", new Object[]{e.toString(), record}, e);
throw new SerDeException(e);
}
- return r;
+ return result;
}
- /**
- * Utility method to extract current expected field from given record.
- */
@SuppressWarnings("unchecked")
- private Object extractCurrentField(Record record, RecordField field, TypeInfo fieldTypeInfo) throws SerDeException {
- Object val;
- if (field == null) {
+ private Object extractCurrentField(final Object fieldValue, final String fieldName, final DataType fieldDataType, final TypeInfo fieldTypeInfo) throws SerDeException {
+ if(fieldValue == null){
return null;
}
- String fieldName = field.getFieldName();
+ Object val;
switch (fieldTypeInfo.getCategory()) {
case PRIMITIVE:
PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = PrimitiveObjectInspector.PrimitiveCategory.UNKNOWN;
@@ -187,152 +182,151 @@ public class NiFiRecordSerDe extends AbstractSerDe {
}
switch (primitiveCategory) {
case BYTE:
- Integer bIntValue = record.getAsInt(fieldName);
- val = bIntValue == null ? null : bIntValue.byteValue();
+ Integer bIntValue = DataTypeUtils.toInteger(fieldValue, fieldName);
+ val = bIntValue.byteValue();
break;
case SHORT:
- Integer sIntValue = record.getAsInt(fieldName);
- val = sIntValue == null ? null : sIntValue.shortValue();
+ Integer sIntValue = DataTypeUtils.toInteger(fieldValue, fieldName);
+ val = sIntValue.shortValue();
break;
case INT:
- val = record.getAsInt(fieldName);
+ val = DataTypeUtils.toInteger(fieldValue, fieldName);
break;
case LONG:
- val = record.getAsLong(fieldName);
+ val = DataTypeUtils.toLong(fieldValue, fieldName);
break;
case BOOLEAN:
- val = record.getAsBoolean(fieldName);
+ val = DataTypeUtils.toBoolean(fieldValue, fieldName);
break;
case FLOAT:
- val = record.getAsFloat(fieldName);
+ val = DataTypeUtils.toFloat(fieldValue, fieldName);
break;
case DOUBLE:
- val = record.getAsDouble(fieldName);
+ val = DataTypeUtils.toDouble(fieldValue, fieldName);
break;
case STRING:
case VARCHAR:
case CHAR:
- val = record.getAsString(fieldName);
+ val = DataTypeUtils.toString(fieldValue, fieldName);
break;
case BINARY:
- Object[] array = record.getAsArray(fieldName);
- if (array == null) {
- return null;
+ final ArrayDataType arrayDataType;
+ if(fieldValue instanceof String) {
+ // Treat this as an array of bytes
+ arrayDataType = (ArrayDataType) RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+ } else {
+ arrayDataType = (ArrayDataType) fieldDataType;
}
+ Object[] array = DataTypeUtils.toArray(fieldValue, fieldName, arrayDataType.getElementType());
val = AvroTypeUtil.convertByteArray(array).array();
break;
case DATE:
- Date d = record.getAsDate(fieldName, field.getDataType().getFormat());
- if(d != null) {
- org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date();
- hiveDate.setTimeInMillis(d.getTime());
- val = hiveDate;
- } else {
- val = null;
- }
+ Date d = DataTypeUtils.toDate(fieldValue, () -> DataTypeUtils.getDateFormat(fieldDataType.getFormat()), fieldName);
+ org.apache.hadoop.hive.common.type.Date hiveDate = new org.apache.hadoop.hive.common.type.Date();
+ hiveDate.setTimeInMillis(d.getTime());
+ val = hiveDate;
break;
// ORC doesn't currently handle TIMESTAMPLOCALTZ
case TIMESTAMP:
- Timestamp ts = DataTypeUtils.toTimestamp(record.getValue(fieldName), () -> DataTypeUtils.getDateFormat(field.getDataType().getFormat()), fieldName);
- if(ts != null) {
- // Convert to Hive's Timestamp type
- org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp();
- hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos());
- val = hivetimestamp;
- } else {
- val = null;
- }
+ Timestamp ts = DataTypeUtils.toTimestamp(fieldValue, () -> DataTypeUtils.getDateFormat(fieldDataType.getFormat()), fieldName);
+ // Convert to Hive's Timestamp type
+ org.apache.hadoop.hive.common.type.Timestamp hivetimestamp = new org.apache.hadoop.hive.common.type.Timestamp();
+ hivetimestamp.setTimeInMillis(ts.getTime(), ts.getNanos());
+ val = hivetimestamp;
break;
case DECIMAL:
- Double value = record.getAsDouble(fieldName);
- val = value == null ? null : HiveDecimal.create(value);
+ if(fieldValue instanceof BigDecimal){
+ val = HiveDecimal.create((BigDecimal) fieldValue);
+ } else if (fieldValue instanceof Number) {
+ val = HiveDecimal.create(((Number)fieldValue).doubleValue());
+ } else {
+ val = HiveDecimal.create(DataTypeUtils.toDouble(fieldValue, fieldDataType.getFormat()));
+ }
break;
default:
throw new IllegalArgumentException("Field " + fieldName + " cannot be converted to type: " + primitiveCategory.name());
}
break;
case LIST:
- Object[] value = record.getAsArray(fieldName);
- val = value == null ? null : Arrays.asList(value);
+ Object[] value = (Object[])fieldValue;
+ ListTypeInfo listTypeInfo = (ListTypeInfo)fieldTypeInfo;
+ TypeInfo nestedType = listTypeInfo.getListElementTypeInfo();
+ List<Object> converted = new ArrayList<>(value.length);
+ for (Object o : value) {
+ converted.add(extractCurrentField(o, fieldName, ((ArrayDataType) fieldDataType).getElementType(), nestedType));
+ }
+ val = converted;
break;
case MAP:
- val = record.getValue(fieldName);
+ //in nifi all maps are <String,?> so use that
+ Map<String, Object> valueMap = (Map<String,Object>)fieldValue;
+ MapTypeInfo mapTypeInfo = (MapTypeInfo)fieldTypeInfo;
+ Map<Object, Object> convertedMap = new HashMap<>(valueMap.size());
+ //get a key record field, nifi map keys are always string. synthesize new
+ //record fields for the map field key and value.
+ for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
+ convertedMap.put(
+ extractCurrentField(entry.getKey(), fieldName + ".key", RecordFieldType.STRING.getDataType(), mapTypeInfo.getMapKeyTypeInfo()),
+ extractCurrentField(entry.getValue(), fieldName + ".value", ((MapDataType) fieldDataType).getValueType(), mapTypeInfo.getMapValueTypeInfo())
+ );
+ }
+ val = convertedMap;
break;
case STRUCT:
- // The Hive StandardStructObjectInspector expects the object corresponding to a "struct" to be an array or List rather than a Map.
- // Do the conversion here, calling extractCurrentField recursively to traverse any nested structs.
- Record nestedRecord = (Record) record.getValue(fieldName);
- if (nestedRecord == null) {
- return null;
- }
- try {
- RecordSchema recordSchema = nestedRecord.getSchema();
- List<RecordField> recordFields = recordSchema.getFields();
- if (recordFields == null || recordFields.isEmpty()) {
- return Collections.emptyList();
- }
- // This List will hold the values of the entries in the Map
- List<Object> structList = new ArrayList<>(recordFields.size());
- StructTypeInfo typeInfo = (StructTypeInfo) schema.getStructFieldTypeInfo(fieldName);
- for (RecordField nestedRecordField : recordFields) {
- String fName = nestedRecordField.getFieldName();
- String normalizedFieldName = fName.toLowerCase();
- structList.add(extractCurrentField(nestedRecord, nestedRecordField, typeInfo.getStructFieldTypeInfo(normalizedFieldName)));
- }
- return structList;
- } catch (Exception e) {
- log.warn("Error [{}] parsing Record [{}].", new Object[]{e.toString(), nestedRecord}, e);
- throw new SerDeException(e);
- }
- // break unreachable
+ Record nestedRecord = (Record) fieldValue;
+ StructTypeInfo s = (StructTypeInfo) fieldTypeInfo;
+ val = deserialize(nestedRecord, s);
+ break;
default:
- log.error("Unknown type found: " + fieldTypeInfo + "for field of type: " + field.getDataType().toString());
+ log.error("Unknown type found: " + fieldTypeInfo + "for field of type: " + fieldDataType.toString());
return null;
}
return val;
}
+
+
@Override
public ObjectInspector getObjectInspector() {
return cachedObjectInspector;
}
- private void populateFieldPositionMap() throws MalformedRecordException, IOException {
- // Populate the mapping of field names to column positions only once
- fieldPositionMap = new HashMap<>(columnNames.size());
- RecordSchema recordSchema = recordReader.getSchema();
- for (RecordField field : recordSchema.getFields()) {
- String fieldName = field.getFieldName();
- String normalizedFieldName = fieldName.toLowerCase();
- int fpos = schema.getAllStructFieldNames().indexOf(fieldName.toLowerCase());
+ private void populateRecord(List<Object> r, Object value, RecordField field, StructTypeInfo typeInfo) throws SerDeException {
+
+ String fieldName = field.getFieldName();
+ String normalizedFieldName = fieldName.toLowerCase();
+
+ // Normalize struct field names and search for the specified (normalized) field name
+ int fpos = typeInfo.getAllStructFieldNames().stream().map((s) -> s == null ? null : s.toLowerCase()).collect(Collectors.toList()).indexOf(normalizedFieldName);
+ if (fpos == -1) {
+ Matcher m = INTERNAL_PATTERN.matcher(fieldName);
+ fpos = m.matches() ? Integer.parseInt(m.group(1)) : -1;
+
+ log.debug("NPE finding position for field [{}] in schema [{}],"
+ + " attempting to check if it is an internal column name like _col0", new Object[]{fieldName, typeInfo});
if (fpos == -1) {
- Matcher m = INTERNAL_PATTERN.matcher(fieldName);
- fpos = m.matches() ? Integer.parseInt(m.group(1)) : -1;
-
- log.debug("NPE finding position for field [{}] in schema [{}],"
- + " attempting to check if it is an internal column name like _col0", new Object[]{fieldName, schema});
- if (fpos == -1) {
- // unknown field, we return. We'll continue from the next field onwards. Log at debug level because partition columns will be "unknown fields"
- log.debug("Field {} is not found in the target table, ignoring...", new Object[]{field.getFieldName()});
- continue;
- }
- // If we get past this, then the column name did match the hive pattern for an internal
- // column name, such as _col0, etc, so it *MUST* match the schema for the appropriate column.
- // This means people can't use arbitrary column names such as _col0, and expect us to ignore it
- // if we find it.
- if (!fieldName.equalsIgnoreCase(HiveConf.getColumnInternalName(fpos))) {
- log.error("Hive internal column name {} and position "
- + "encoding {} for the column name are at odds", new Object[]{fieldName, fpos});
- throw new IOException("Hive internal column name (" + fieldName
- + ") and position encoding (" + fpos
- + ") for the column name are at odds");
- }
- // If we reached here, then we were successful at finding an alternate internal
- // column mapping, and we're about to proceed.
+ // unknown field, we return. We'll continue from the next field onwards. Log at debug level because partition columns will be "unknown fields"
+ log.debug("Field {} is not found in the target table, ignoring...", new Object[]{field.getFieldName()});
+ return;
}
- fieldPositionMap.put(normalizedFieldName, fpos);
+ // If we get past this, then the column name did match the hive pattern for an internal
+ // column name, such as _col0, etc, so it *MUST* match the schema for the appropriate column.
+ // This means people can't use arbitrary column names such as _col0, and expect us to ignore it
+ // if we find it.
+ if (!fieldName.equalsIgnoreCase(HiveConf.getColumnInternalName(fpos))) {
+ log.error("Hive internal column name {} and position "
+ + "encoding {} for the column name are at odds", new Object[]{fieldName, fpos});
+ throw new SerDeException("Hive internal column name (" + fieldName
+ + ") and position encoding (" + fpos
+ + ") for the column name are at odds");
+ }
+ // If we reached here, then we were successful at finding an alternate internal
+ // column mapping, and we're about to proceed.
}
+ Object currField = extractCurrentField(value, fieldName, field.getDataType(), typeInfo.getStructFieldTypeInfo(normalizedFieldName));
+ r.set(fpos, currField);
}
+
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/hive/streaming/TestNiFiRecordSerDe.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/hive/streaming/TestNiFiRecordSerDe.java
new file mode 100644
index 0000000..2d22571
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/hive/streaming/TestNiFiRecordSerDe.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestNiFiRecordSerDe {
+
+ @Test
+ public void testSimpleFields() throws SerDeException {
+ NiFiRecordSerDe serDe = createSerDe(
+ "bytec,shortc,intc,longc,boolc,floatc,doublec,stringc,varcharc,charc,datec,timestampc,decimalc",
+ "tinyint:smallint:int:bigint:boolean:float:double:string:varchar(50):char(1):date:timestamp:decimal"
+ );
+ RecordSchema schema = new SimpleRecordSchema(
+ Arrays.asList(
+ new RecordField("bytec", RecordFieldType.BYTE.getDataType()),
+ new RecordField("shortc", RecordFieldType.SHORT.getDataType()),
+ new RecordField("intc", RecordFieldType.INT.getDataType()),
+ new RecordField("longc", RecordFieldType.LONG.getDataType()),
+ new RecordField("boolc", RecordFieldType.BOOLEAN.getDataType()),
+ new RecordField("floatc", RecordFieldType.FLOAT.getDataType()),
+ new RecordField("doublec", RecordFieldType.DOUBLE.getDataType()),
+ new RecordField("stringc", RecordFieldType.STRING.getDataType()),
+ new RecordField("varcharc", RecordFieldType.STRING.getDataType()),
+ new RecordField("charc", RecordFieldType.CHAR.getDataType()),
+ new RecordField("datec", RecordFieldType.DATE.getDataType("yyyy-MM-dd")),
+ new RecordField("timestampc", RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd HH:mm:ss")),
+ new RecordField("decimalc", RecordFieldType.DOUBLE.getDataType())
+ )
+ );
+
+ long currentTimeMillis = System.currentTimeMillis();
+
+ HashMap<String, Object> input = new HashMap<String, Object>() {{
+ put("bytec", (byte) 2);
+ put("shortc", (short) 45);
+ put("intc", 95);
+ put("longc", 876L);
+ put("boolc", Boolean.TRUE);
+ put("floatc", 4.56f);
+ put("doublec", 2.3445);
+ put("stringc", "test");
+ put("varcharc", "test2");
+ put("charc", 'c');
+ put("datec", new java.sql.Date(currentTimeMillis));
+ put("timestampc", new java.sql.Timestamp(currentTimeMillis));
+ put("decimalc", 0.45);
+ }};
+
+ Date date = new Date();
+ date.setTimeInMillis(currentTimeMillis);
+ Timestamp ts = new Timestamp();
+ ts.setTimeInMillis(currentTimeMillis);
+
+ List<Object> expected = Arrays.asList(
+ Byte.valueOf("2"),
+ Short.valueOf("45"),
+ 95,
+ 876L,
+ Boolean.TRUE,
+ 4.56f,
+ 2.3445,
+ "test",
+ "test2",
+ "c",
+ date,
+ ts,
+ HiveDecimal.create("0.45")
+ );
+
+ Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input)));
+
+ assertEquals(expected, deserialized);
+ }
+
+ @Test
+ public void testArrays() throws SerDeException {
+ NiFiRecordSerDe serDe = createSerDe(
+ "binaryc,binaryc2",
+ "binary:binary"
+ );
+ RecordSchema schema = new SimpleRecordSchema(
+ Arrays.asList(
+ new RecordField("binaryc", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())),
+ new RecordField("binaryc2", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))
+ )
+ );
+
+ HashMap<String, Object> input = new HashMap<String, Object>() {{
+ put("binaryc", new byte[]{1, 2});
+ put("binaryc2", "Hello");
+ }};
+
+
+ Object[] expected = new Object[]{
+ new byte[]{1, 2},
+ "Hello".getBytes(StandardCharsets.UTF_8)
+ };
+
+ Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input)));
+
+ assertArrayEquals(expected, ((List)deserialized).toArray());
+ }
+
+ @Test
+ public void testStructField() throws SerDeException{
+ NiFiRecordSerDe serDe = createSerDe("structc",
+ "struct<age:int,name:string>"
+ );
+ RecordSchema innerSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("age", RecordFieldType.INT.getDataType()),
+ new RecordField("name", RecordFieldType.STRING.getDataType())
+ ));
+ RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField("structc", RecordFieldType.RECORD.getRecordDataType(innerSchema))
+ ));
+
+ HashMap<String, Object> value = new HashMap<String, Object>() {{
+ put("structc", new MapRecord(innerSchema, new HashMap<String, Object>() {{
+ put("age", 15);
+ put("name", "gideon");
+ }}));
+ }};
+
+ List<Object> expected = Collections.singletonList(
+ Arrays.asList(15, "gideon")
+ );
+
+ Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, value)));
+
+ assertEquals(expected, deserialized);
+ }
+
+ @Test
+ public void testSimpleArray() throws SerDeException{
+ long now = System.currentTimeMillis();
+ Date hiveDate = new Date();
+ hiveDate.setTimeInMillis(now);
+ Timestamp hiveTs = new Timestamp();
+ hiveTs.setTimeInMillis(now);
+
+ testSimpleArray("tinyint", RecordFieldType.BYTE.getDataType(), new Byte[] { 5, 29 },
+ new Byte[] { 5, 29 });
+ testSimpleArray("smallint", RecordFieldType.SHORT.getDataType(), new Short[] { 5, 29 },
+ new Short[] { 5, 29 });
+ testSimpleArray("int", RecordFieldType.INT.getDataType(), new Object[] { 1, 2, 3 ,4, 5 },
+ new Object[] { 1, 2, 3, 4, 5 });
+ testSimpleArray("bigint", RecordFieldType.LONG.getDataType(), new Object[] { 298767L, 89876L },
+ new Object[] { 298767L, 89876L });
+ testSimpleArray("boolean", RecordFieldType.BOOLEAN.getDataType(), new Object[] { true, false },
+ new Object[] { true, false });
+ testSimpleArray("float", RecordFieldType.FLOAT.getDataType(), new Object[] { 1.23f, 3.14f },
+ new Object[] { 1.23f, 3.14f });
+ testSimpleArray("double", RecordFieldType.DOUBLE.getDataType(), new Object[] { 1.235, 3.142, 1.0 },
+ new Object[] { 1.235, 3.142, 1.0 });
+ testSimpleArray("string", RecordFieldType.STRING.getDataType(), new Object[] { "sasa", "wewe" },
+ new Object[] { "sasa", "wewe" });
+ testSimpleArray("varchar(20)", RecordFieldType.STRING.getDataType(), new Object[] { "niko", "fiti", "sema"},
+ new Object[] { "niko", "fiti", "sema" });
+ testSimpleArray("char(1)", RecordFieldType.CHAR.getDataType(), new Object[] { 'a', 'b', 'c' },
+ new Object[] { "a", "b", "c"});
+ testSimpleArray("date", RecordFieldType.DATE.getDataType(), new Object[] { new java.sql.Date(now)},
+ new Object[] { hiveDate });
+ testSimpleArray("timestamp", RecordFieldType.TIMESTAMP.getDataType(), new Object[] { new java.sql.Timestamp(now)},
+ new Object[] { hiveTs });
+ testSimpleArray("decimal(10,2)", RecordFieldType.DOUBLE.getDataType(), new Object[] { 3.45, 1.25 },
+ new Object[] { HiveDecimal.create(3.45), HiveDecimal.create(1.25)});
+ }
+
+ public void testSimpleArray(String typeName, DataType elementDataType, Object[] values, Object[] expected) throws SerDeException {
+ NiFiRecordSerDe serDe = createSerDe("listc",
+ "array<" + typeName + ">"
+ );
+
+ RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField("listc", RecordFieldType.ARRAY.getArrayDataType(elementDataType))
+ ));
+
+ Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, new HashMap<String, Object>() {{
+ put("listc", values);
+ }})));
+
+ List<Object> fields = (List<Object>)deserialized;
+ assertEquals(1, fields.size());
+ List<Object> nested = (List<Object>) fields.get(0);
+
+ for(int i=0; i<expected.length; i++){
+ assertEquals(expected[i], nested.get(i));
+ }
+ }
+
+ @Test
+ public void testStructArray() throws SerDeException{
+ NiFiRecordSerDe serDe = createSerDe("listc",
+ "array<struct<age:int,name:string>>"
+ );
+ RecordSchema innerSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("age", RecordFieldType.INT.getDataType()),
+ new RecordField("name", RecordFieldType.STRING.getDataType())
+ ));
+ RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField("listc", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(innerSchema)))
+ ));
+ HashMap<String, Object> input = new HashMap<String, Object>() {{
+ put("listc", new Record[]{new MapRecord(innerSchema, new HashMap<String, Object>() {{
+ put("age", 15);
+ put("name", "gideon");
+ }}),
+ new MapRecord(innerSchema, new HashMap<String, Object>() {{
+ put("age", 87);
+ put("name", "cucu");
+ }})
+ });
+ }};
+
+ Object expected = Collections.singletonList(
+ Arrays.asList(
+ Arrays.asList(15, "gideon"),
+ Arrays.asList(87, "cucu")
+ )
+ );
+
+ Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input)));
+
+ assertEquals(expected, deserialized);
+ }
+
+ @Test
+ public void testSimpleMap() throws SerDeException{
+ testSimpleMap("string", "tinyint", RecordFieldType.BYTE.getDataType(), createMap((byte)89, (byte)2), objectMap(createMap((byte)89, (byte)2)));
+ testSimpleMap("string", "smallint", RecordFieldType.SHORT.getDataType(), createMap((short)89, (short)209), objectMap(createMap((short)89, (short)209)));
+ testSimpleMap("string", "int", RecordFieldType.INT.getDataType(), createMap(90, 87), objectMap(createMap(90, 87)));
+ testSimpleMap("string", "bigint", RecordFieldType.BIGINT.getDataType(), createMap(87888L, 876L, 123L), objectMap(createMap(87888L, 876L, 123L)));
+ testSimpleMap("string", "boolean", RecordFieldType.BOOLEAN.getDataType(), createMap(false, true, true, false), objectMap(createMap(false, true, true, false)));
+ testSimpleMap("string", "float", RecordFieldType.FLOAT.getDataType(), createMap(1.2f, 8.6f, 0.125f), objectMap(createMap(1.2f, 8.6f, 0.125f)));
+ testSimpleMap("string", "double", RecordFieldType.DOUBLE.getDataType(), createMap(3.142, 8.93), objectMap(createMap(3.142, 8.93)));
+ testSimpleMap("string", "string", RecordFieldType.STRING.getDataType(), createMap("form", "ni", "aje"), objectMap(createMap("form", "ni", "aje")));
+ testSimpleMap("string", "varchar(20)", RecordFieldType.STRING.getDataType(), createMap("niko", "kiza"), objectMap(createMap("niko", "kiza")));
+ testSimpleMap("string", "char(1)", RecordFieldType.CHAR.getDataType(), createMap('a', 'b', 'c'), objectMap(createMap("a", "b", "c")));
+ long now = System.currentTimeMillis();
+ Date hiveDate = new Date();
+ hiveDate.setTimeInMillis(now);
+ Timestamp hiveTs = new Timestamp();
+ hiveTs.setTimeInMillis(now);
+
+ testSimpleMap("string", "date", RecordFieldType.DATE.getDataType(), createMap(new java.sql.Date(now)), objectMap(createMap(hiveDate)));
+ testSimpleMap("string", "timestamp", RecordFieldType.TIMESTAMP.getDataType(), createMap(new java.sql.Timestamp(now)), objectMap(createMap(hiveTs)));
+ testSimpleMap("string", "decimal(10,2)", RecordFieldType.DOUBLE.getDataType(), createMap(45.6, 2345.5), objectMap(createMap(
+ HiveDecimal.create(45.6), HiveDecimal.create(2345.5)
+ )));
+ }
+
+ private Map<String,Object> createMap(Object... keyValues){
+ Map<String,Object> map = new HashMap<>(keyValues.length);
+ for(int i=0; i<keyValues.length; i++){
+ map.put("key." + i, keyValues[i]);
+ }
+ return map;
+ }
+
+ Map<Object,Object> objectMap(Map<String,Object> input){
+ return new HashMap<>(input);
+ }
+
+ private void testSimpleMap(String keyType, String valueType, DataType fieldType, Map<String, Object> fields, Map<Object, Object> expected) throws SerDeException{
+ NiFiRecordSerDe serDe = createSerDe("mapc",
+ "map<" + keyType + "," + valueType + ">"
+ );
+ RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField("mapc", RecordFieldType.MAP.getMapDataType(fieldType))
+ ));
+
+ Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, new HashMap<String, Object>(){
+ {
+ put("mapc", fields);
+ }
+ })));
+ List<Object> desFields = (List<Object>)deserialized;
+ assertEquals(1, desFields.size());
+ Map<Object,Object> map = (Map<Object, Object>)desFields.get(0);
+ for(Map.Entry<Object, Object> entry: expected.entrySet()){
+ assertEquals(entry.getValue(), map.get(entry.getKey()));
+ }
+ }
+
+ @Test
+ public void testStructMap() throws SerDeException{
+ NiFiRecordSerDe serDe = createSerDe(
+ "mapc",
+ "map<string,struct<id:int,balance:decimal(18,2)>>"
+ );
+ RecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("id", RecordFieldType.INT.getDataType()),
+ new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
+ ));
+ RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField("mapc", RecordFieldType.MAP.getMapDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema)))
+ ));
+
+ HashMap<String, Object> input = new HashMap<String, Object>() {{
+ put("mapc", new HashMap<String, Object>() {{
+ put("current", new MapRecord(recordSchema, new HashMap<String, Object>() {{
+ put("id", 1);
+ put("balance", 56.9);
+ }}));
+ put("savings", new MapRecord(recordSchema, new HashMap<String, Object>() {{
+ put("id", 2);
+ put("balance", 104.65);
+ }}));
+ }});
+ }};
+
+ Object expected = Collections.singletonList(
+ new HashMap<String, Object>() {{
+ put("current", Arrays.asList(1, HiveDecimal.create(56.9)));
+ put("savings", Arrays.asList(2, HiveDecimal.create(104.65)));
+ }}
+ );
+
+ Object deserialized = serDe.deserialize(new ObjectWritable(new MapRecord(schema, input)));
+
+ assertEquals(expected, deserialized);
+ }
+
+ NiFiRecordSerDe createSerDe(String columnNames, String typeInfo) throws SerDeException{
+ Properties props = new Properties();
+ props.setProperty(serdeConstants.LIST_COLUMNS, columnNames);
+ props.setProperty(serdeConstants.LIST_COLUMN_TYPES, typeInfo);
+ NiFiRecordSerDe serDe = new NiFiRecordSerDe(null, new MockComponentLog("logger", new Object())); //reader isn't used
+ serDe.initialize(null, props); //conf isn't used
+ return serDe;
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
index 4f49932..137becc 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java
@@ -53,18 +53,24 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -80,6 +86,8 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Arrays;
@@ -204,27 +212,29 @@ public class TestPutHive3Streaming {
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/array_of_records.avsc"), StandardCharsets.UTF_8);
schema = new Schema.Parser().parse(avroSchema);
processor.setFields(Arrays.asList(new FieldSchema("records",
- serdeConstants.LIST_TYPE_NAME + "<"
- + serdeConstants.MAP_TYPE_NAME + "<"
- + serdeConstants.STRING_TYPE_NAME + ","
- + serdeConstants.STRING_TYPE_NAME + ">>", "")));
+ "array<struct<name:string,age:string>>", "")));
runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
MockRecordParser readerFactory = new MockRecordParser();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
for (final RecordField recordField : recordSchema.getFields()) {
- readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+ //add the recordField so that we don't loose the element type data type
+ readerFactory.addSchemaField(recordField);
}
if (recordGenerator == null) {
- Object[] mapArray = new Object[numUsers];
+ //given the schema is array of records we need the
+ //array in the records field to contain Record objects
+ MapRecord[] mapArray = new MapRecord[numUsers];
+ ArrayDataType recordsDataType = (ArrayDataType)recordSchema.getField("records").get().getDataType();
+ RecordDataType nestedStructType = (RecordDataType)recordsDataType.getElementType();
for (int i = 0; i < numUsers; i++) {
final int x = i;
Map<String, Object> map = new HashMap<String, Object>() {{
put("name", "name" + x);
put("age", x * 5);
}};
- mapArray[i] = map;
+ mapArray[i] = new MapRecord(nestedStructType.getChildSchema(), map);
}
readerFactory.addRecord((Object)mapArray);
} else {
@@ -757,7 +767,9 @@ public class TestPutHive3Streaming {
MockRecordParser readerFactory = new MockRecordParser();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
for (final RecordField recordField : recordSchema.getFields()) {
- readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
+ //add the schema field so that we don't loose the map value data type for
+ //mapc field and element type for listc field
+ readerFactory.addSchemaField(recordField);
}
List<String> enumc = Arrays.asList("SPADES", "HEARTS", "DIAMONDS", "CLUBS");
@@ -960,6 +972,73 @@ public class TestPutHive3Streaming {
}
@Test
+ public void testNestedRecords() throws Exception {
+ runner = TestRunners.newTestRunner(processor);
+ MockRecordParser readerFactory = new MockRecordParser();
+
+ final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/nested_record.avsc"), StandardCharsets.UTF_8);
+ schema = new Schema.Parser().parse(avroSchema);
+
+ final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+ for (final RecordField recordField : recordSchema.getFields()) {
+ readerFactory.addSchemaField(recordField);
+ }
+
+ Map<String,Object> nestedRecordMap = new HashMap<>();
+ nestedRecordMap.put("id", 11088000000001615L);
+ nestedRecordMap.put("x", "Hello World!");
+
+ RecordSchema nestedRecordSchema = AvroTypeUtil.createSchema(schema.getField("myField").schema());
+ MapRecord nestedRecord = new MapRecord(nestedRecordSchema, nestedRecordMap);
+ // This gets added in to its spot in the schema, which is already named "myField"
+ readerFactory.addRecord(nestedRecord);
+
+ runner.addControllerService("mock-reader-factory", readerFactory);
+ runner.enableControllerService(readerFactory);
+
+ runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory");
+ runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+ runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+ runner.setProperty(PutHive3Streaming.TABLE_NAME, "groups");
+
+ runner.enqueue("trigger");
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutHive3Streaming.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testValidateNestedMap() throws InitializationException, IOException {
+ final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/nested-map-schema.avsc")), StandardCharsets.UTF_8);
+
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner = TestRunners.newTestRunner(processor);
+ runner.addControllerService("reader", jsonReader);
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
+ runner.addControllerService("writer", validWriter);
+ runner.setProperty(validWriter, "Schema Write Strategy", "full-schema-attribute");
+ runner.enableControllerService(validWriter);
+
+ final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
+ runner.addControllerService("invalid-writer", invalidWriter);
+ runner.enableControllerService(invalidWriter);
+
+ runner.setProperty(PutHive3Streaming.RECORD_READER, "reader");
+ runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+ runner.setProperty(PutHive3Streaming.DB_NAME, "default");
+ runner.setProperty(PutHive3Streaming.TABLE_NAME, "groups");
+ runner.enqueue(Paths.get("src/test/resources/nested-map-input.json"));
+ runner.run();
+
+ runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1);
+ runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0);
+ runner.clearTransferState();
+ }
+
+ @Test
public void cleanup() {
processor.cleanup();
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested-map-input.json b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested-map-input.json
new file mode 100644
index 0000000..ed8dc3b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested-map-input.json
@@ -0,0 +1,18 @@
+[{
+ "images" : [ {
+ "headers" : {
+ "Accept-Ranges" : "bytes",
+ "Server" : "Apache"
+ },
+ "id" : "205f40e3-2675-4c61-abc1-a0aeb609c023"
+ } ]
+},
+{
+ "images" : [ {
+ "headers" : {
+ "Accept-Ranges" : 4,
+ "Server" : 10
+ },
+ "id" : "205f40e3-2675-4c61-abc1-a0aeb609c023"
+ } ]
+}]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested-map-schema.avsc b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested-map-schema.avsc
new file mode 100644
index 0000000..3cb1a92
--- /dev/null
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/resources/nested-map-schema.avsc
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ {
+ "type": "record",
+ "name": "example",
+ "fields": [
+ {
+ "name": "images",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "images",
+ "fields": [
+ {
+ "name": "id",
+ "type": [
+ "string",
+ "null"
+ ]
+ },
+ {
+ "name": "headers",
+ "type": {
+ "type": "map",
+ "values": "string"
+ }
+ }
+ ]
+ }
+ }
+ }
+ ]
+}
\ No newline at end of file