You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/03/17 12:47:56 UTC

[nifi] 20/47: NIFI-7197 - In-place replacement in LookupRecord processor

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.11.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit c82c1db628c4f8191fb965f1ada821c1bb3fbc55
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Tue Feb 25 08:53:36 2020 -0800

    NIFI-7197 - In-place replacement in LookupRecord processor
    
    This closes #4088
    
    Signed-off-by: Mark Payne <ma...@hotmail.com>
---
 .../nifi-standard-processors/pom.xml               |   2 +
 .../nifi/processors/standard/LookupRecord.java     | 126 ++++++++++--
 .../additionalDetails.html                         | 215 +++++++++++++++++++++
 .../nifi/processors/standard/TestLookupRecord.java |  89 +++++++++
 .../TestLookupRecord/lookup-array-input.json       |  29 +++
 .../TestLookupRecord/lookup-array-output.json      |   1 +
 6 files changed, 446 insertions(+), 16 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 2cd98d6..896c85f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -581,6 +581,8 @@
                         <exclude>src/test/resources/TestValidateRecord/nested-map-schema.avsc</exclude>
                         <exclude>src/test/resources/TestValidateRecord/timestamp.avsc</exclude>
                         <exclude>src/test/resources/TestValidateRecord/timestamp.json</exclude>
+                        <exclude>src/test/resources/TestLookupRecord/lookup-array-input.json</exclude>
+                        <exclude>src/test/resources/TestLookupRecord/lookup-array-output.json</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 23d1325..28705cc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -105,6 +105,14 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
     static final AllowableValue RESULT_RECORD_FIELDS = new AllowableValue("record-fields", "Insert Record Fields",
         "All of the fields in the Record that is retrieved from the Lookup Service will be inserted into the destination path.");
 
+    static final AllowableValue USE_PROPERTY = new AllowableValue("use-property", "Use Property",
+            "The \"Result RecordPath\" property will be used to determine which part of the record should be updated with the value returned by the Lookup Service");
+    static final AllowableValue REPLACE_EXISTING_VALUES = new AllowableValue("replace-existing-values", "Replace Existing Values",
+            "The \"Result RecordPath\" property will be ignored and the lookup service must be a single simple key lookup service. Every dynamic property value should "
+            + "be a record path. For each dynamic property, the value contained in the field corresponding to the record path will be used as the key in the Lookup "
+            + "Service and the value returned by the Lookup Service will be used to replace the existing value. It is possible to configure multiple dynamic properties "
+            + "to replace multiple values in one execution. This strategy only supports simple types replacements (strings, integers, etc).");
+
     static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder()
         .name("lookup-service")
         .displayName("Lookup Service")
@@ -144,6 +152,16 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
         .required(true)
         .build();
 
+    static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder()
+        .name("record-update-strategy")
+        .displayName("Record Update Strategy")
+        .description("This property defines the strategy to use when updating the record with the value returned by the Lookup Service.")
+        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .allowableValues(REPLACE_EXISTING_VALUES, USE_PROPERTY)
+        .defaultValue(USE_PROPERTY.getValue())
+        .required(true)
+        .build();
+
     static final Relationship REL_MATCHED = new Relationship.Builder()
         .name("matched")
         .description("All records for which the lookup returns a value will be routed to this relationship")
@@ -182,6 +200,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
         properties.add(RESULT_RECORD_PATH);
         properties.add(ROUTING_STRATEGY);
         properties.add(RESULT_CONTENTS);
+        properties.add(REPLACEMENT_STRATEGY);
         return properties;
     }
 
@@ -214,24 +233,37 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
         }
 
         final Set<String> requiredKeys = validationContext.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys();
-        final Set<String> missingKeys = requiredKeys.stream()
-            .filter(key -> !dynamicPropNames.contains(key))
-            .collect(Collectors.toSet());
 
-        if (!missingKeys.isEmpty()) {
-            final List<ValidationResult> validationResults = new ArrayList<>();
-            for (final String missingKey : missingKeys) {
-                final ValidationResult result = new ValidationResult.Builder()
-                    .subject(missingKey)
-                    .valid(false)
-                    .explanation("The configured Lookup Services requires that a key be provided with the name '" + missingKey
-                        + "'. Please add a new property to this Processor with a name '" + missingKey
-                        + "' and provide a RecordPath that can be used to retrieve the appropriate value.")
-                    .build();
-                validationResults.add(result);
+        if(validationContext.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue())) {
+            // it must be a single key lookup service
+            if(requiredKeys.size() != 1) {
+                return Collections.singleton(new ValidationResult.Builder()
+                        .subject(LOOKUP_SERVICE.getDisplayName())
+                        .valid(false)
+                        .explanation("When using \"" + REPLACE_EXISTING_VALUES.getDisplayName() + "\" as Record Update Strategy, "
+                                + "only a Lookup Service requiring a single key can be used.")
+                        .build());
+            }
+        } else {
+            final Set<String> missingKeys = requiredKeys.stream()
+                .filter(key -> !dynamicPropNames.contains(key))
+                .collect(Collectors.toSet());
+
+            if (!missingKeys.isEmpty()) {
+                final List<ValidationResult> validationResults = new ArrayList<>();
+                for (final String missingKey : missingKeys) {
+                    final ValidationResult result = new ValidationResult.Builder()
+                        .subject(missingKey)
+                        .valid(false)
+                        .explanation("The configured Lookup Services requires that a key be provided with the name '" + missingKey
+                            + "'. Please add a new property to this Processor with a name '" + missingKey
+                            + "' and provide a RecordPath that can be used to retrieve the appropriate value.")
+                        .build();
+                    validationResults.add(result);
+                }
+
+                return validationResults;
             }
-
-            return validationResults;
         }
 
         return Collections.emptyList();
@@ -263,6 +295,68 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
     protected Set<Relationship> route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context,
         final Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
 
+        final boolean isInPlaceReplacement = context.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue());
+
+        if(isInPlaceReplacement) {
+            return doInPlaceReplacement(record, flowFile, context, flowFileContext);
+        } else {
+            return doResultPathReplacement(record, flowFile, context, flowFileContext);
+        }
+
+    }
+
+    private Set<Relationship> doInPlaceReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
+
+        final String lookupKey = (String) context.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys().iterator().next();
+
+        final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
+        final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
+
+        for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
+            final String coordinateKey = entry.getKey();
+            final RecordPath recordPath = entry.getValue();
+
+            final RecordPathResult pathResult = recordPath.evaluate(record);
+            final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields()
+                .filter(fieldVal -> fieldVal.getValue() != null)
+                .collect(Collectors.toList());
+
+            if (lookupFieldValues.isEmpty()) {
+                final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
+                getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels});
+                return rels;
+            }
+
+            for (FieldValue fieldValue : lookupFieldValues) {
+                final Object coordinateValue = (fieldValue.getValue() instanceof Number || fieldValue.getValue() instanceof Boolean)
+                        ? fieldValue.getValue() : DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+                lookupCoordinates.put(lookupKey, coordinateValue);
+
+                final Optional<?> lookupValueOption;
+                try {
+                    lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
+                } catch (final Exception e) {
+                    throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
+                }
+
+                if (!lookupValueOption.isPresent()) {
+                    final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
+                    return rels;
+                }
+
+                final Object lookupValue = lookupValueOption.get();
+
+                final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
+                fieldValue.updateValue(lookupValue, inferredDataType);
+
+            }
+        }
+
+        final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
+        return rels;
+    }
+
+    private Set<Relationship> doResultPathReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
         final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
         final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.LookupRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.LookupRecord/additionalDetails.html
new file mode 100644
index 0000000..df83708
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.LookupRecord/additionalDetails.html
@@ -0,0 +1,215 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>LookupRecord</title>
+
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+    	<p>
+    		LookupRecord makes use of the NiFi <a href="../../../../../html/record-path-guide.html">
+    		RecordPath Domain-Specific Language (DSL)</a> to allow the user to indicate which field(s), 
+    		depending on the Record Update Strategy, in the Record should be updated. The Record will 
+    		be updated using the value returned by the provided Lookup Service.
+    	</p>
+
+		<h3>Record Update Strategy - Use Property</h3>
+		
+    	<p>
+    		In this case, the user should add, to the Processor's configuration, as much User-defined 
+    		Properties as required by the Lookup Service to form the lookup coordinates. The name of 
+    		the properties should match the names expected by the Lookup Service.
+    	</p>
+		
+    	<p>
+    		The field evaluated using the path configured in the "Result RecordPath" property will be 
+    		the field updated with the value returned by the Lookup Service.
+    	</p>
+    	
+    	<p>
+    		Let's assume a Simple Key Value Lookup Service containing the following key/value pairs:
+    	</p>
+    	
+<code>
+<pre>
+FR => France
+CA => Canada
+</pre>
+</code>
+		
+    	<p>
+    		Let's assume the following JSON with three records as input:
+    	</p>
+    	
+<code>
+<pre>
+[
+	{
+		"country": null,
+		"code": "FR"
+	}, {
+		"country": null,
+		"code": "CA"
+	}, {
+		"country": null,
+		"code": "JP"
+	}
+]
+</pre>
+</code>
+
+    	<p>
+    		The processor is configured with "Use Property" as "Record Update Strategy", the "Result 
+    		RecordPath" is configured with "/country" and a user-defined property is added with the 
+    		name "key" (as required by this Lookup Service) and the value "/code".
+    	</p>
+
+    	<p>
+    		When triggered, the processor will look for the value associated to the "/code" path and 
+    		will use the value as the "key" of the Lookup Service. The value returned by the Lookup 
+    		Service will be used to update the value corresponding to "/country". With the above 
+    		examples, it will produce:
+    	</p>
+
+<code>
+<pre>
+[
+	{
+		"country": "France",
+		"code": "FR"
+	}, {
+		"country": "Canada",
+		"code": "CA"
+	}, {
+		"country": null,
+		"code": "JP"
+	}
+]
+</pre>
+</code>
+
+		<h3>Record Update Strategy - Replace Existing Values</h3>
+
+    	<p>
+    		With this strategy, the "Result RecordPath" property will be ignored and the configured Lookup 
+    		Service must be a single single key lookup service. For each user-defined property, the value 
+    		contained in the field corresponding to the record path will be used as the key in the Lookup 
+    		Service and will be replaced by the value returned by the Lookup Service. It is possible to 
+    		configure multiple dynamic properties to update multiple fields in one execution. This strategy 
+    		only supports simple types replacements (strings, integers, etc).
+    	</p>
+
+    	<p>
+    		Since this strategy allows in-place replacement, it is possible to use Record Paths for fields 
+    		contained in arrays.
+    	</p>
+    	
+    	<p>
+    		Let's assume a Simple Key Value Lookup Service containing the following key/value pairs:
+    	</p>
+    	
+<code>
+<pre>
+FR => France
+CA => Canada
+fr => French
+en => English
+</pre>
+</code>
+		
+    	<p>
+    		Let's assume the following JSON with two records as input:
+    	</p>
+    	
+<code>
+<pre>
+[
+	{
+		"locales": [
+			{
+				"region": "FR",
+				"language": "fr"
+			}, {
+				"region": "US",
+				"language": "en"
+			}
+		]
+	}, {
+		"locales": [
+			{
+				"region": "CA",
+				"language": "fr"
+			}, 
+			{
+				"region": "JP",
+				"language": "ja"
+			}
+		]
+	}
+]
+</pre>
+</code>
+
+    	<p>
+    		The processor is configured with "Replace Existing Values" as "Record Update Strategy", 
+    		two user-defined properties are added: "region" => "/locales[*]/region" and "language 
+    		=> "/locales[*]/language"..
+    	</p>
+
+    	<p>
+    		When triggered, the processor will loop over the user-defined properties. First, it'll 
+    		search for the fields corresponding to "/locales[*]/region", for each value from the 
+    		record, the value will be used as the key with the Lookup Service and the value will 
+    		be replaced by the result returned by the Lookup Service. Example: the first region is 
+    		"FR" and this key is associated to the value "France" in the Lookup Service, so the 
+    		value "FR" is replaced by "France" in the record. With the above examples, it will 
+    		produce:
+    	</p>
+    	
+<code>
+<pre>
+[
+	{
+		"locales": [
+			{
+				"region": "France",
+				"language": "French"
+			}, {
+				"region": "US",
+				"language": "English"
+			}
+		]
+	}, {
+		"locales": [
+			{
+				"region": "Canada",
+				"language": "French"
+			}, 
+			{
+				"region": "JP",
+				"language": "ja"
+			}
+		]
+	}
+]
+</pre>
+</code>
+
+	</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
index f8fb158..86bba8a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -19,9 +19,13 @@ package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.lookup.RecordLookupService;
 import org.apache.nifi.lookup.StringLookupService;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
@@ -37,6 +41,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -436,6 +442,88 @@ public class TestLookupRecord {
         out.assertContentEquals("John Doe,48,soccer,basketball\nJane Doe,47\n");
     }
 
+    @Test
+    public void testLookupArray() throws InitializationException, IOException {
+        TestRunner runner = TestRunners.newTestRunner(LookupRecord.class);
+        final MapLookup lookupService = new MapLookup();
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", jsonWriter);
+        runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+
+        runner.addControllerService("reader", jsonReader);
+        runner.enableControllerService(jsonReader);
+        runner.addControllerService("writer", jsonWriter);
+        runner.enableControllerService(jsonWriter);
+        runner.addControllerService("lookup", lookupService);
+        runner.enableControllerService(lookupService);
+
+        runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS);
+        runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES);
+        runner.setProperty(LookupRecord.RECORD_READER, "reader");
+        runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
+        runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
+        runner.setProperty("lookupLanguage", "/locales[*]/language");
+        runner.setProperty("lookupRegion", "/locales[*]/region");
+        runner.setProperty("lookupFoo", "/foo/foo");
+
+        lookupService.addValue("FR", "France");
+        lookupService.addValue("CA", "Canada");
+        lookupService.addValue("fr", "French");
+        lookupService.addValue("key", "value");
+
+        runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input.json").toPath());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0);
+        out.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output.json").toPath());
+    }
+
+    @Test
+    public void testLookupArrayKeyNotInLRS() throws InitializationException, IOException {
+        TestRunner runner = TestRunners.newTestRunner(LookupRecord.class);
+        final MapLookup lookupService = new MapLookup();
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", jsonWriter);
+        runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+
+        runner.addControllerService("reader", jsonReader);
+        runner.enableControllerService(jsonReader);
+        runner.addControllerService("writer", jsonWriter);
+        runner.enableControllerService(jsonWriter);
+        runner.addControllerService("lookup", lookupService);
+        runner.enableControllerService(lookupService);
+
+        runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
+        runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES);
+        runner.setProperty(LookupRecord.RECORD_READER, "reader");
+        runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
+        runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
+        runner.setProperty("lookupLanguage", "/locales[*]/language");
+        runner.setProperty("lookupRegion", "/locales[*]/region");
+        runner.setProperty("lookupFoo", "/foo/foo");
+
+        lookupService.addValue("FR", "France");
+        lookupService.addValue("CA", "Canada");
+        lookupService.addValue("fr", "French");
+        lookupService.addValue("badkey", "value");
+
+        runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input.json").toPath());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED);
+    }
+
     private static class MapLookup extends AbstractControllerService implements StringLookupService {
         private final Map<String, String> values = new HashMap<>();
         private Map<String, Object> expectedContext;
@@ -449,6 +537,7 @@ public class TestLookupRecord {
             return String.class;
         }
 
+        @Override
         public Optional<String> lookup(final Map<String, Object> coordinates, Map<String, String> context) {
             validateContext(context);
             return lookup(coordinates);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-input.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-input.json
new file mode 100644
index 0000000..f2902cd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-input.json
@@ -0,0 +1,29 @@
+[
+  {
+    "foo" : {
+      "foo" : "key"
+    },
+    "locales": [
+      {
+        "language" : "fr",
+        "region" : "CA"
+      }, {
+        "language" : "fr",
+        "region" : "FR"
+      }
+    ]
+  }, {
+    "foo" : {
+      "foo" : "key"
+    },
+    "locales": [
+      {
+        "language" : "fr",
+        "region" : "CA"
+      }, {
+        "language" : "fr",
+        "region" : "FR"
+      }
+    ]
+  }
+]
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-output.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-output.json
new file mode 100644
index 0000000..10169f8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-output.json
@@ -0,0 +1 @@
+[{"foo":{"foo":"value"},"locales":[{"language":"French","region":"Canada"},{"language":"French","region":"France"}]},{"foo":{"foo":"value"},"locales":[{"language":"French","region":"Canada"},{"language":"French","region":"France"}]}]
\ No newline at end of file