You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/03/13 15:50:20 UTC

[nifi] 01/02: NIFI-6105: Fix handling of arrays of records/maps in record utilities

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit f91311da9d759fb5712ee8121d54d27a7deea236
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Mar 6 09:53:13 2019 -0500

    NIFI-6105: Fix handling of arrays of records/maps in record utilities
    
    This closes #3353.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../serialization/record/util/DataTypeUtils.java   | 27 +++++++++++++++++
 .../serialization/record/TestDataTypeUtils.java    | 35 ++++++++++++++++++++++
 2 files changed, 62 insertions(+)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index a399f67..fb6cdbd 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -204,6 +204,30 @@ public class DataTypeUtils {
                 if (value == null) {
                     return false;
                 }
+
+                // value may be a Map even when type is RECORD
+                if (value instanceof Map) {
+                    final RecordSchema schema = ((RecordDataType) dataType).getChildSchema();
+                    if (schema == null) {
+                        return true;
+                    }
+                    Map<String, Object> record = ((Map<String, Object>) value);
+                    for (final RecordField childField : schema.getFields()) {
+                        final Object childValue = record.get(childField.getFieldName());
+                        if (childValue == null && !childField.isNullable()) {
+                            logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
+                            return false;
+                        }
+                        if (childValue == null) {
+                            continue; // consider compatible
+                        }
+
+                        if (!isCompatibleDataType(childValue, childField.getDataType())) {
+                            return false;
+                        }
+                    }
+                    return true;
+                }
                 if (!(value instanceof Record)) {
                     return false;
                 }
@@ -687,6 +711,9 @@ public class DataTypeUtils {
             return convertRecordMapToJavaMap((Map) value, ((MapDataType) dataType).getValueType());
         } else if (dataType != null && isScalarValue(dataType, value)) {
             return value;
+        } else if (value instanceof Object[] && dataType instanceof ArrayDataType) {
+            // This is likely a Map whose values are represented as an array. Return a new array with each element converted to a Java object
+            return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) dataType).getElementType());
         }
 
         throw new IllegalTypeConversionException("Cannot convert value of class " + value.getClass().getName() + " because the type is not supported");
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index 45b65b4..cef0eec 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -108,6 +108,34 @@ public class TestDataTypeUtils {
     }
 
     @Test
+    public void testConvertArrayOfRecordsToJavaArray() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("stringField", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("intField", RecordFieldType.INT.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values1 = new HashMap<>();
+        values1.put("stringField", "hello");
+        values1.put("intField", 5);
+        final Record inputRecord1 = new MapRecord(schema, values1);
+
+        final Map<String, Object> values2 = new HashMap<>();
+        values2.put("stringField", "world");
+        values2.put("intField", 50);
+        final Record inputRecord2 = new MapRecord(schema, values2);
+
+        Object[] recordArray = {inputRecord1, inputRecord2};
+        Object resultObj = DataTypeUtils.convertRecordFieldtoObject(recordArray, RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(schema)));
+        assertNotNull(resultObj);
+        assertTrue(resultObj instanceof Object[]);
+        Object[] resultArray = (Object[]) resultObj;
+        for(Object o : resultArray) {
+            assertTrue(o instanceof Map);
+        }
+    }
+
+    @Test
     @SuppressWarnings("unchecked")
     public void testConvertRecordFieldToObject() {
         assertNull(DataTypeUtils.convertRecordFieldtoObject(null, null));
@@ -252,4 +280,11 @@ public class TestDataTypeUtils {
             }
         }
     }
+
+    @Test
+    public void testIsCompatibleDataTypeMap() {
+        Map<String,Object> testMap = new HashMap<>();
+        testMap.put("Hello", "World");
+        assertTrue(DataTypeUtils.isCompatibleDataType(testMap, RecordFieldType.RECORD.getDataType()));
+    }
 }