You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/04/01 20:42:34 UTC
[nifi] 03/18: NIFI-6105: Fix handling of arrays of records/maps in
record utilities
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch NIFI-6169-RC1
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 77bf89c971ec1948e3268e0478d02ec666d672b7
Author: Koji Kawamura <ij...@apache.org>
AuthorDate: Wed Mar 13 14:34:16 2019 +0900
NIFI-6105: Fix handling of arrays of records/maps in record utilities
Refactored to use the same check logic for Record and Map types
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3367
---
.../serialization/record/util/DataTypeUtils.java | 96 ++++++++++------------
1 file changed, 42 insertions(+), 54 deletions(-)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index fb6cdbd..63db142 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -201,59 +201,8 @@ public class DataTypeUtils {
case LONG:
return isLongTypeCompatible(value);
case RECORD: {
- if (value == null) {
- return false;
- }
-
- // value may be a Map even when type is RECORD
- if (value instanceof Map) {
- final RecordSchema schema = ((RecordDataType) dataType).getChildSchema();
- if (schema == null) {
- return true;
- }
- Map<String, Object> record = ((Map<String, Object>) value);
- for (final RecordField childField : schema.getFields()) {
- final Object childValue = record.get(childField.getFieldName());
- if (childValue == null && !childField.isNullable()) {
- logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
- return false;
- }
- if (childValue == null) {
- continue; // consider compatible
- }
-
- if (!isCompatibleDataType(childValue, childField.getDataType())) {
- return false;
- }
- }
- return true;
- }
- if (!(value instanceof Record)) {
- return false;
- }
-
final RecordSchema schema = ((RecordDataType) dataType).getChildSchema();
- if (schema == null) {
- return true;
- }
-
- final Record record = (Record) value;
- for (final RecordField childField : schema.getFields()) {
- final Object childValue = record.getValue(childField);
- if (childValue == null && !childField.isNullable()) {
- logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
- return false;
- }
- if (childValue == null) {
- continue; // consider compatible
- }
-
- if (!isCompatibleDataType(childValue, childField.getDataType())) {
- return false;
- }
- }
-
- return true;
+ return isRecordTypeCompatible(schema, value);
}
case SHORT:
return isShortTypeCompatible(value);
@@ -539,8 +488,47 @@ public class DataTypeUtils {
return RecordFieldType.RECORD.getRecordDataType(schema);
}
- public static boolean isRecordTypeCompatible(final Object value) {
- return value instanceof Record;
+ /**
+ * Check if the given record structured object compatible with the schema.
+ * @param schema record schema, schema validation will not be performed if schema is null
+ * @param value the record structured object, i.e. Record or Map
+ * @return True if the object is compatible with the schema
+ */
+ private static boolean isRecordTypeCompatible(RecordSchema schema, Object value) {
+
+ if (value == null) {
+ return false;
+ }
+
+ if (!(value instanceof Record) && !(value instanceof Map)) {
+ return false;
+ }
+
+ if (schema == null) {
+ return true;
+ }
+
+ for (final RecordField childField : schema.getFields()) {
+ final Object childValue;
+ if (value instanceof Record) {
+ childValue = ((Record) value).getValue(childField);
+ } else {
+ childValue = ((Map) value).get(childField.getFieldName());
+ }
+
+ if (childValue == null && !childField.isNullable()) {
+ logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
+ return false;
+ }
+ if (childValue == null) {
+ continue; // consider compatible
+ }
+
+ if (!isCompatibleDataType(childValue, childField.getDataType())) {
+ return false;
+ }
+ }
+ return true;
}
public static Object[] toArray(final Object value, final String fieldName, final DataType elementDataType) {