You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/04/01 20:42:34 UTC

[nifi] 03/18: NIFI-6105: Fix handling of arrays of records/maps in record utilities

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch NIFI-6169-RC1
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 77bf89c971ec1948e3268e0478d02ec666d672b7
Author: Koji Kawamura <ij...@apache.org>
AuthorDate: Wed Mar 13 14:34:16 2019 +0900

    NIFI-6105: Fix handling of arrays of records/maps in record utilities
    
    Refactored to use the same check logic for Record and Map types
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3367
---
 .../serialization/record/util/DataTypeUtils.java   | 96 ++++++++++------------
 1 file changed, 42 insertions(+), 54 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index fb6cdbd..63db142 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -201,59 +201,8 @@ public class DataTypeUtils {
             case LONG:
                 return isLongTypeCompatible(value);
             case RECORD: {
-                if (value == null) {
-                    return false;
-                }
-
-                // value may be a Map even when type is RECORD
-                if (value instanceof Map) {
-                    final RecordSchema schema = ((RecordDataType) dataType).getChildSchema();
-                    if (schema == null) {
-                        return true;
-                    }
-                    Map<String, Object> record = ((Map<String, Object>) value);
-                    for (final RecordField childField : schema.getFields()) {
-                        final Object childValue = record.get(childField.getFieldName());
-                        if (childValue == null && !childField.isNullable()) {
-                            logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
-                            return false;
-                        }
-                        if (childValue == null) {
-                            continue; // consider compatible
-                        }
-
-                        if (!isCompatibleDataType(childValue, childField.getDataType())) {
-                            return false;
-                        }
-                    }
-                    return true;
-                }
-                if (!(value instanceof Record)) {
-                    return false;
-                }
-
                 final RecordSchema schema = ((RecordDataType) dataType).getChildSchema();
-                if (schema == null) {
-                    return true;
-                }
-
-                final Record record = (Record) value;
-                for (final RecordField childField : schema.getFields()) {
-                    final Object childValue = record.getValue(childField);
-                    if (childValue == null && !childField.isNullable()) {
-                        logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
-                        return false;
-                    }
-                    if (childValue == null) {
-                        continue; // consider compatible
-                    }
-
-                    if (!isCompatibleDataType(childValue, childField.getDataType())) {
-                        return false;
-                    }
-                }
-
-                return true;
+                return isRecordTypeCompatible(schema, value);
             }
             case SHORT:
                 return isShortTypeCompatible(value);
@@ -539,8 +488,47 @@ public class DataTypeUtils {
         return RecordFieldType.RECORD.getRecordDataType(schema);
     }
 
-    public static boolean isRecordTypeCompatible(final Object value) {
-        return value instanceof Record;
+    /**
+     * Check if the given record structured object compatible with the schema.
+     * @param schema record schema, schema validation will not be performed if schema is null
+     * @param value the record structured object, i.e. Record or Map
+     * @return True if the object is compatible with the schema
+     */
+    private static boolean isRecordTypeCompatible(RecordSchema schema, Object value) {
+
+        if (value == null) {
+            return false;
+        }
+
+        if (!(value instanceof Record) && !(value instanceof Map)) {
+            return false;
+        }
+
+        if (schema == null) {
+            return true;
+        }
+
+        for (final RecordField childField : schema.getFields()) {
+            final Object childValue;
+            if (value instanceof Record) {
+                childValue = ((Record) value).getValue(childField);
+            } else {
+                childValue = ((Map) value).get(childField.getFieldName());
+            }
+
+            if (childValue == null && !childField.isNullable()) {
+                logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
+                return false;
+            }
+            if (childValue == null) {
+                continue; // consider compatible
+            }
+
+            if (!isCompatibleDataType(childValue, childField.getDataType())) {
+                return false;
+            }
+        }
+        return true;
     }
 
     public static Object[] toArray(final Object value, final String fieldName, final DataType elementDataType) {