You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/09/20 02:54:10 UTC

[nifi] branch master updated: NIFI-6691: This closes #3754. Fixed bug in MapRecord's algorithm for incorporating Inactive Fields, by recursing into any 'child fields' that contain records; also fixed bug in UpdateRecord that caused it to incorrectly map the first result to all elements if the key of the property points to an array

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new d0371a5  NIFI-6691: This closes #3754. Fixed bug in MapRecord's algorithm for incorporating Inactive Fields, by recursing into any 'child fields' that contain records; also fixed bug in UpdateRecord that caused it to incorrectly map the first result to all elements if the key of the property points to an array
d0371a5 is described below

commit d0371a5ef1f1abe59b7f1de310f21fb93e8fbacb
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Sep 19 16:22:19 2019 -0400

    NIFI-6691: This closes #3754. Fixed bug in MapRecord's algorithm for incorporating Inactive Fields, by recursing into any 'child fields' that contain records; also fixed bug in UpdateRecord that caused it to incorrectly map the first result to all elements if the key of the property points to an array
---
 .../nifi/serialization/record/MapRecord.java       | 121 +++++++++++++++++++--
 .../nifi-standard-processors/pom.xml               |  20 ++--
 .../nifi/processors/standard/UpdateRecord.java     |   2 -
 .../nifi/processors/standard/TestUpdateRecord.java |  42 +++++--
 .../TestUpdateRecord/input/addresses.json          |  11 ++
 .../TestUpdateRecord/output/full-addresses.json    |  13 +++
 6 files changed, 182 insertions(+), 27 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index 2f8a766..ce8708c 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -20,7 +20,9 @@ package org.apache.nifi.serialization.record;
 import org.apache.nifi.serialization.SchemaValidationException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 
@@ -434,22 +436,125 @@ public class MapRecord implements Record {
         this.schema = DataTypeUtils.merge(this.schema, other);
     }
 
+
+
+
     @Override
     public void incorporateInactiveFields() {
-        if (inactiveFields == null) {
-            return;
+        final List<RecordField> updatedFields = new ArrayList<>();
+
+        for (final RecordField field : schema.getFields()) {
+            updatedFields.add(getUpdatedRecordField(field));
+        }
+
+        if (inactiveFields != null) {
+            for (final RecordField field : inactiveFields) {
+                if (!updatedFields.contains(field)) {
+                    updatedFields.add(field);
+                }
+            }
+        }
+
+        this.schema = new SimpleRecordSchema(updatedFields);
+    }
+
+    private RecordField getUpdatedRecordField(final RecordField field) {
+        final DataType dataType = field.getDataType();
+        final RecordFieldType fieldType = dataType.getFieldType();
+
+        if (isSimpleType(fieldType)) {
+            return field;
+        }
+
+        final Object value = getValue(field);
+        if (value == null) {
+            return field;
+        }
+
+        if (fieldType == RecordFieldType.RECORD && value instanceof Record) {
+            final Record childRecord = (Record) value;
+            childRecord.incorporateInactiveFields();
+
+            final RecordSchema definedChildSchema = ((RecordDataType) dataType).getChildSchema();
+            final RecordSchema actualChildSchema = childRecord.getSchema();
+            final RecordSchema combinedChildSchema = DataTypeUtils.merge(definedChildSchema, actualChildSchema);
+            final DataType combinedDataType = RecordFieldType.RECORD.getRecordDataType(combinedChildSchema);
+
+            final RecordField updatedField = new RecordField(field.getFieldName(), combinedDataType, field.getDefaultValue(), field.getAliases(), field.isNullable());
+            return updatedField;
+        }
+
+        if (fieldType == RecordFieldType.ARRAY && value instanceof Object[]) {
+            final DataType elementType = ((ArrayDataType) dataType).getElementType();
+            final RecordFieldType elementFieldType = elementType.getFieldType();
+
+            if (elementFieldType == RecordFieldType.RECORD) {
+                final Object[] array = (Object[]) value;
+                RecordSchema mergedSchema = ((RecordDataType) elementType).getChildSchema();
+
+                for (final Object element : array) {
+                    if (element == null) {
+                        continue;
+                    }
+
+                    final Record record = (Record) element;
+                    record.incorporateInactiveFields();
+                    mergedSchema = DataTypeUtils.merge(mergedSchema, record.getSchema());
+                }
+
+                final DataType mergedRecordType = RecordFieldType.RECORD.getRecordDataType(mergedSchema);
+                final DataType mergedDataType = RecordFieldType.ARRAY.getArrayDataType(mergedRecordType);
+                final RecordField updatedField = new RecordField(field.getFieldName(), mergedDataType, field.getDefaultValue(), field.getAliases(), field.isNullable());
+                return updatedField;
+            }
+
+            return field;
         }
 
-        final List<RecordField> allFields = new ArrayList<>(schema.getFieldCount() + inactiveFields.size());
-        allFields.addAll(schema.getFields());
+        if (fieldType == RecordFieldType.CHOICE) {
+            final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+            final List<DataType> possibleTypes = choiceDataType.getPossibleSubTypes();
+
+            final DataType chosenDataType = DataTypeUtils.chooseDataType(value, choiceDataType);
+            if (chosenDataType.getFieldType() != RecordFieldType.RECORD || !(value instanceof Record)) {
+                return field;
+            }
 
-        for (final RecordField field : inactiveFields) {
-            if (!allFields.contains(field)) {
-                allFields.add(field);
+            final RecordDataType recordDataType = (RecordDataType) chosenDataType;
+            final Record childRecord = (Record) value;
+            childRecord.incorporateInactiveFields();
+
+            final RecordSchema definedChildSchema = recordDataType.getChildSchema();
+            final RecordSchema actualChildSchema = childRecord.getSchema();
+            final RecordSchema combinedChildSchema = DataTypeUtils.merge(definedChildSchema, actualChildSchema);
+            final DataType combinedDataType = RecordFieldType.RECORD.getRecordDataType(combinedChildSchema);
+
+            final List<DataType> updatedPossibleTypes = new ArrayList<>(possibleTypes.size());
+            for (final DataType possibleType : possibleTypes) {
+                if (possibleType.equals(chosenDataType)) {
+                    updatedPossibleTypes.add(combinedDataType);
+                } else {
+                    updatedPossibleTypes.add(possibleType);
+                }
             }
+
+            final DataType mergedDataType = RecordFieldType.CHOICE.getChoiceDataType(updatedPossibleTypes);
+            return new RecordField(field.getFieldName(), mergedDataType, field.getDefaultValue(), field.getAliases(), field.isNullable());
+        }
+
+        return field;
+    }
+
+    private boolean isSimpleType(final RecordFieldType fieldType) {
+        switch (fieldType) {
+            case ARRAY:
+            case RECORD:
+            case MAP:
+            case CHOICE:
+                return false;
         }
 
-        this.schema = new SimpleRecordSchema(allFields);
+        return true;
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index ed51c40..5a3cee4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -1,13 +1,13 @@
 <?xml version="1.0"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
-    license agreements. See the NOTICE file distributed with this work for additional 
-    information regarding copyright ownership. The ASF licenses this file to 
-    You under the Apache License, Version 2.0 (the "License"); you may not use 
-    this file except in compliance with the License. You may obtain a copy of 
-    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-    by applicable law or agreed to in writing, software distributed under the 
-    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
-    OF ANY KIND, either express or implied. See the License for the specific 
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+    license agreements. See the NOTICE file distributed with this work for additional
+    information regarding copyright ownership. The ASF licenses this file to
+    You under the Apache License, Version 2.0 (the "License"); you may not use
+    this file except in compliance with the License. You may obtain a copy of
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+    by applicable law or agreed to in writing, software distributed under the
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+    OF ANY KIND, either express or implied. See the License for the specific
     language governing permissions and limitations under the License. -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
@@ -554,6 +554,8 @@
                         <exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and1.json</exclude>
                         <exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and2.json</exclude>
                         <exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-streets.json</exclude>
+                        <exclude>src/test/resources/TestUpdateRecord/input/addresses.json</exclude>
+                        <exclude>src/test/resources/TestUpdateRecord/output/full-addresses.json</exclude>
                         <exclude>src/test/resources/xxe_template.xml</exclude>
                         <exclude>src/test/resources/xxe_from_report.xml</exclude>
                         <exclude>src/test/resources/TestForkRecord/single-element-nested-array-strings.json</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
index 20e5dd1..5af5262 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
@@ -203,8 +203,6 @@ public class UpdateRecord extends AbstractRecordProcessor {
             final List<FieldValue> selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList());
             final Object replacementObject = getReplacementObject(selectedFields);
             updateFieldValue(fieldVal, replacementObject);
-
-            record = updateRecord(destinationFieldValues, selectedFields, record);
         }
 
         return record;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
index e732695..daf57a8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
@@ -17,19 +17,12 @@
 
 package org.apache.nifi.processors.standard;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Collections;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.json.JsonRecordSetWriter;
 import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
 import org.apache.nifi.serialization.record.MockRecordParser;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.RecordFieldType;
@@ -39,6 +32,14 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class TestUpdateRecord {
 
     private TestRunner runner;
@@ -159,6 +160,31 @@ public class TestUpdateRecord {
     }
 
     @Test
+    public void testConcatWithArrayInferredSchema() throws InitializationException, IOException {
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
+        runner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", jsonWriter);
+        runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+        runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+        runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
+        runner.enableControllerService(jsonWriter);
+
+        runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/addresses.json"));
+        runner.setProperty("/addresses[*]/full", "concat(../street, ' ', ../city, ' ', ../state)");
+        runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
+        final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/full-addresses.json")));
+        runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
+    }
+
+    @Test
     public void testChangingSchema() throws InitializationException, IOException {
         final JsonTreeReader jsonReader = new JsonTreeReader();
         runner.addControllerService("reader", jsonReader);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/addresses.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/addresses.json
new file mode 100644
index 0000000..3e0abb0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/addresses.json
@@ -0,0 +1,11 @@
+[ {
+  "addresses" : [ {
+    "street" : "1234 My Street",
+    "city" : "My City",
+    "state" : "MS"
+  }, {
+    "street" : "4321 Your Street",
+    "city" : "Your City",
+    "state" : "YS"
+  } ]
+} ]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/full-addresses.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/full-addresses.json
new file mode 100644
index 0000000..bf45236
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/full-addresses.json
@@ -0,0 +1,13 @@
+[ {
+  "addresses" : [ {
+    "street" : "1234 My Street",
+    "city" : "My City",
+    "state" : "MS",
+    "full" : "1234 My Street My City MS"
+  }, {
+    "street" : "4321 Your Street",
+    "city" : "Your City",
+    "state" : "YS",
+    "full" : "4321 Your Street Your City YS"
+  } ]
+} ]
\ No newline at end of file