You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/03/13 15:50:19 UTC

[nifi] branch master updated (35d1cac -> 2846d3c)

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from 35d1cac  Documentation :  ConsumerKafka_2_0 - updated configuration details re… (#3360)
     new f91311d  NIFI-6105: Fix handling of arrays of records/maps in record utilities
     new 2846d3c  NIFI-6105: Fix handling of arrays of records/maps in record utilities

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../serialization/record/util/DataTypeUtils.java   | 75 +++++++++++++---------
 .../serialization/record/TestDataTypeUtils.java    | 35 ++++++++++
 2 files changed, 80 insertions(+), 30 deletions(-)


[nifi] 01/02: NIFI-6105: Fix handling of arrays of records/maps in record utilities

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit f91311da9d759fb5712ee8121d54d27a7deea236
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Mar 6 09:53:13 2019 -0500

    NIFI-6105: Fix handling of arrays of records/maps in record utilities
    
    This closes #3353.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../serialization/record/util/DataTypeUtils.java   | 27 +++++++++++++++++
 .../serialization/record/TestDataTypeUtils.java    | 35 ++++++++++++++++++++++
 2 files changed, 62 insertions(+)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index a399f67..fb6cdbd 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -204,6 +204,30 @@ public class DataTypeUtils {
                 if (value == null) {
                     return false;
                 }
+
+                // value may be a Map even when type is RECORD
+                if (value instanceof Map) {
+                    final RecordSchema schema = ((RecordDataType) dataType).getChildSchema();
+                    if (schema == null) {
+                        return true;
+                    }
+                    Map<String, Object> record = ((Map<String, Object>) value);
+                    for (final RecordField childField : schema.getFields()) {
+                        final Object childValue = record.get(childField.getFieldName());
+                        if (childValue == null && !childField.isNullable()) {
+                            logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
+                            return false;
+                        }
+                        if (childValue == null) {
+                            continue; // consider compatible
+                        }
+
+                        if (!isCompatibleDataType(childValue, childField.getDataType())) {
+                            return false;
+                        }
+                    }
+                    return true;
+                }
                 if (!(value instanceof Record)) {
                     return false;
                 }
@@ -687,6 +711,9 @@ public class DataTypeUtils {
             return convertRecordMapToJavaMap((Map) value, ((MapDataType) dataType).getValueType());
         } else if (dataType != null && isScalarValue(dataType, value)) {
             return value;
+        } else if (value instanceof Object[] && dataType instanceof ArrayDataType) {
+            // This is likely a Map whose values are represented as an array. Return a new array with each element converted to a Java object
+            return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) dataType).getElementType());
         }
 
         throw new IllegalTypeConversionException("Cannot convert value of class " + value.getClass().getName() + " because the type is not supported");
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
index 45b65b4..cef0eec 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java
@@ -108,6 +108,34 @@ public class TestDataTypeUtils {
     }
 
     @Test
+    public void testConvertArrayOfRecordsToJavaArray() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("stringField", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("intField", RecordFieldType.INT.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values1 = new HashMap<>();
+        values1.put("stringField", "hello");
+        values1.put("intField", 5);
+        final Record inputRecord1 = new MapRecord(schema, values1);
+
+        final Map<String, Object> values2 = new HashMap<>();
+        values2.put("stringField", "world");
+        values2.put("intField", 50);
+        final Record inputRecord2 = new MapRecord(schema, values2);
+
+        Object[] recordArray = {inputRecord1, inputRecord2};
+        Object resultObj = DataTypeUtils.convertRecordFieldtoObject(recordArray, RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(schema)));
+        assertNotNull(resultObj);
+        assertTrue(resultObj instanceof Object[]);
+        Object[] resultArray = (Object[]) resultObj;
+        for(Object o : resultArray) {
+            assertTrue(o instanceof Map);
+        }
+    }
+
+    @Test
     @SuppressWarnings("unchecked")
     public void testConvertRecordFieldToObject() {
         assertNull(DataTypeUtils.convertRecordFieldtoObject(null, null));
@@ -252,4 +280,11 @@ public class TestDataTypeUtils {
             }
         }
     }
+
+    @Test
+    public void testIsCompatibleDataTypeMap() {
+        Map<String,Object> testMap = new HashMap<>();
+        testMap.put("Hello", "World");
+        assertTrue(DataTypeUtils.isCompatibleDataType(testMap, RecordFieldType.RECORD.getDataType()));
+    }
 }


[nifi] 02/02: NIFI-6105: Fix handling of arrays of records/maps in record utilities

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 2846d3c3c6e454f7960763bd69e36e879684a033
Author: Koji Kawamura <ij...@apache.org>
AuthorDate: Wed Mar 13 14:34:16 2019 +0900

    NIFI-6105: Fix handling of arrays of records/maps in record utilities
    
    Refactored to use the same check logic for Record and Map types
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3367
---
 .../serialization/record/util/DataTypeUtils.java   | 96 ++++++++++------------
 1 file changed, 42 insertions(+), 54 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index fb6cdbd..63db142 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -201,59 +201,8 @@ public class DataTypeUtils {
             case LONG:
                 return isLongTypeCompatible(value);
             case RECORD: {
-                if (value == null) {
-                    return false;
-                }
-
-                // value may be a Map even when type is RECORD
-                if (value instanceof Map) {
-                    final RecordSchema schema = ((RecordDataType) dataType).getChildSchema();
-                    if (schema == null) {
-                        return true;
-                    }
-                    Map<String, Object> record = ((Map<String, Object>) value);
-                    for (final RecordField childField : schema.getFields()) {
-                        final Object childValue = record.get(childField.getFieldName());
-                        if (childValue == null && !childField.isNullable()) {
-                            logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
-                            return false;
-                        }
-                        if (childValue == null) {
-                            continue; // consider compatible
-                        }
-
-                        if (!isCompatibleDataType(childValue, childField.getDataType())) {
-                            return false;
-                        }
-                    }
-                    return true;
-                }
-                if (!(value instanceof Record)) {
-                    return false;
-                }
-
                 final RecordSchema schema = ((RecordDataType) dataType).getChildSchema();
-                if (schema == null) {
-                    return true;
-                }
-
-                final Record record = (Record) value;
-                for (final RecordField childField : schema.getFields()) {
-                    final Object childValue = record.getValue(childField);
-                    if (childValue == null && !childField.isNullable()) {
-                        logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
-                        return false;
-                    }
-                    if (childValue == null) {
-                        continue; // consider compatible
-                    }
-
-                    if (!isCompatibleDataType(childValue, childField.getDataType())) {
-                        return false;
-                    }
-                }
-
-                return true;
+                return isRecordTypeCompatible(schema, value);
             }
             case SHORT:
                 return isShortTypeCompatible(value);
@@ -539,8 +488,47 @@ public class DataTypeUtils {
         return RecordFieldType.RECORD.getRecordDataType(schema);
     }
 
-    public static boolean isRecordTypeCompatible(final Object value) {
-        return value instanceof Record;
+    /**
+     * Check if the given record structured object compatible with the schema.
+     * @param schema record schema, schema validation will not be performed if schema is null
+     * @param value the record structured object, i.e. Record or Map
+     * @return True if the object is compatible with the schema
+     */
+    private static boolean isRecordTypeCompatible(RecordSchema schema, Object value) {
+
+        if (value == null) {
+            return false;
+        }
+
+        if (!(value instanceof Record) && !(value instanceof Map)) {
+            return false;
+        }
+
+        if (schema == null) {
+            return true;
+        }
+
+        for (final RecordField childField : schema.getFields()) {
+            final Object childValue;
+            if (value instanceof Record) {
+                childValue = ((Record) value).getValue(childField);
+            } else {
+                childValue = ((Map) value).get(childField.getFieldName());
+            }
+
+            if (childValue == null && !childField.isNullable()) {
+                logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
+                return false;
+            }
+            if (childValue == null) {
+                continue; // consider compatible
+            }
+
+            if (!isCompatibleDataType(childValue, childField.getDataType())) {
+                return false;
+            }
+        }
+        return true;
     }
 
     public static Object[] toArray(final Object value, final String fieldName, final DataType elementDataType) {