You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jw...@apache.org on 2017/09/06 04:32:16 UTC

nifi git commit: NIFI-4116: Allow fields of Record returned from Lookup Service to be placed into record in the input, instead of requiring that the 'wrapper record' returned from Lookup be included

Repository: nifi
Updated Branches:
  refs/heads/master 20a6374bf -> bfd6c0aef


NIFI-4116: Allow fields of Record returned from Lookup Service to be placed into record in the input, instead of requiring that the 'wrapper record' returned from Lookup be included

Signed-off-by: James Wing <jv...@gmail.com>

This closes #2110.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bfd6c0ae
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bfd6c0ae
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bfd6c0ae

Branch: refs/heads/master
Commit: bfd6c0aef768f53be8fdea62afdcd5404099e089
Parents: 20a6374
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Aug 25 16:22:43 2017 -0400
Committer: James Wing <jv...@gmail.com>
Committed: Tue Sep 5 21:29:16 2017 -0700

----------------------------------------------------------------------
 .../nifi/processors/standard/LookupRecord.java  |  56 +++++-
 .../processors/standard/TestLookupRecord.java   | 173 +++++++++++++++++++
 2 files changed, 224 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bfd6c0ae/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 10539bc..286f7ee 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -95,6 +95,11 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
         "Records will be routed to either a 'matched' or an 'unmatched' Relationship depending on whether or not there was a match in the configured Lookup Service. "
             + "A single input FlowFile may result in two different output FlowFiles.");
 
+    static final AllowableValue RESULT_ENTIRE_RECORD = new AllowableValue("insert-entire-record", "Insert Entire Record",
+        "The entire Record that is retrieved from the Lookup Service will be inserted into the destination path.");
+    static final AllowableValue RESULT_RECORD_FIELDS = new AllowableValue("record-fields", "Insert Record Fields",
+        "All of the fields in the Record that is retrieved from the Lookup Service will be inserted into the destination path.");
+
     static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder()
         .name("lookup-service")
         .displayName("Lookup Service")
@@ -114,6 +119,16 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
         .required(false)
         .build();
 
+    static final PropertyDescriptor RESULT_CONTENTS = new PropertyDescriptor.Builder()
+        .name("result-contents")
+        .displayName("Record Result Contents")
+        .description("When a result is obtained that contains a Record, this property determines whether the Record itself is inserted at the configured "
+            + "path or if the contents of the Record (i.e., the sub-fields) will be inserted at the configured path.")
+        .allowableValues(RESULT_ENTIRE_RECORD, RESULT_RECORD_FIELDS)
+        .defaultValue(RESULT_ENTIRE_RECORD.getValue())
+        .required(true)
+        .build();
+
     static final PropertyDescriptor ROUTING_STRATEGY = new PropertyDescriptor.Builder()
         .name("routing-strategy")
         .displayName("Routing Strategy")
@@ -161,6 +176,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
         properties.add(LOOKUP_SERVICE);
         properties.add(RESULT_RECORD_PATH);
         properties.add(ROUTING_STRATEGY);
+        properties.add(RESULT_CONTENTS);
         return properties;
     }
 
@@ -272,14 +288,14 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
             lookupCoordinates.put(coordinateKey, coordinateValue);
         }
 
-        final Optional<?> lookupValue;
+        final Optional<?> lookupValueOption;
         try {
-            lookupValue = lookupService.lookup(lookupCoordinates);
+            lookupValueOption = lookupService.lookup(lookupCoordinates);
         } catch (final Exception e) {
             throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
         }
 
-        if (!lookupValue.isPresent()) {
+        if (!lookupValueOption.isPresent()) {
             final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
             return rels;
         }
@@ -289,9 +305,39 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
         if (resultPath != null) {
             record.incorporateSchema(writeSchema);
 
-            final Object replacementValue = lookupValue.get();
+            final Object lookupValue = lookupValueOption.get();
             final RecordPathResult resultPathResult = flowFileContext.getValue().evaluate(record);
-            resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(replacementValue));
+
+            final String resultContentsValue = context.getProperty(RESULT_CONTENTS).getValue();
+            if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) {
+                final Record lookupRecord = (Record) lookupValue;
+
+                // Use wants to add all fields of the resultant Record to the specified Record Path.
+                // If the destination Record Path returns to us a Record, then we will add all field values of
+                // the Lookup Record to the destination Record. However, if the destination Record Path returns
+                // something other than a Record, then we can't add the fields to it. We can only replace it,
+                // because it doesn't make sense to add fields to anything but a Record.
+                resultPathResult.getSelectedFields().forEach(fieldVal -> {
+                    final Object destinationValue = fieldVal.getValue();
+
+                    if (destinationValue instanceof Record) {
+                        final Record destinationRecord = (Record) destinationValue;
+
+                        for (final String fieldName : lookupRecord.getRawFieldNames()) {
+                            final Object value = lookupRecord.getValue(fieldName);
+                            destinationRecord.setValue(fieldName, value);
+                        }
+                    } else {
+                        final Optional<Record> parentOption = fieldVal.getParentRecord();
+
+                        if (parentOption.isPresent()) {
+                            parentOption.get().setValue(fieldVal.getField().getFieldName(), lookupRecord);
+                        }
+                    }
+                });
+            } else {
+                resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue));
+            }
         }
 
         final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;

http://git-wip-us.apache.org/repos/asf/nifi/blob/bfd6c0ae/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
index b84f518..29966e7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -17,19 +17,30 @@
 
 package org.apache.nifi.processors.standard;
 
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.lookup.RecordLookupService;
 import org.apache.nifi.lookup.StringLookupService;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.MockRecordParser;
 import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -227,6 +238,137 @@ public class TestLookupRecord {
     }
 
 
+    @Test
+    public void testAddFieldsToExistingRecord() throws InitializationException, IOException {
+        final RecordLookup lookupService = new RecordLookup();
+        runner.addControllerService("lookup", lookupService);
+        runner.enableControllerService(lookupService);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Record sports = new MapRecord(schema, new HashMap<String, Object>());
+
+        sports.setValue("favorite", "basketball");
+        sports.setValue("least", "soccer");
+
+        lookupService.addValue("John Doe", sports);
+
+        recordReader = new MockRecordParser();
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("favorite", RecordFieldType.STRING);
+        recordReader.addSchemaField("least", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, null, "baseball");
+
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+
+        runner.setProperty("lookup", "/name");
+        runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/");
+        runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
+
+        runner.enqueue("");
+        runner.run();
+
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+        out.assertContentEquals("John Doe,48,basketball,soccer\n");
+    }
+
+    /**
+     * If the output fields are added to a record that doesn't exist, the result should be that a Record is
+     * created and the results added to it.
+     */
+    @Test
+    public void testAddFieldsToNonExistentRecord() throws InitializationException {
+        final RecordLookup lookupService = new RecordLookup();
+        runner.addControllerService("lookup", lookupService);
+        runner.enableControllerService(lookupService);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Record sports = new MapRecord(schema, new HashMap<String, Object>());
+
+        sports.setValue("favorite", "basketball");
+        sports.setValue("least", "soccer");
+
+        lookupService.addValue("John Doe", sports);
+
+        recordReader = new MockRecordParser();
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.RECORD);
+
+        recordReader.addRecord("John Doe", 48, null);
+
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+
+        runner.setProperty("lookup", "/name");
+        runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
+        runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
+
+        runner.enqueue("");
+        runner.run();
+
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+
+        // We can't be sure of the order of the fields in the record, so we allow either 'least' or 'favorite' to be first
+        final String outputContents = new String(out.toByteArray());
+        assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n")
+            || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
+    }
+
+    /**
+     * If the output fields are added to a non-record field, then the result should be that the field
+     * becomes a UNION that does allow the Record and the value is set to a Record.
+     */
+    @Test
+    public void testAddFieldsToNonRecordField() throws InitializationException {
+        final RecordLookup lookupService = new RecordLookup();
+        runner.addControllerService("lookup", lookupService);
+        runner.enableControllerService(lookupService);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Record sports = new MapRecord(schema, new HashMap<String, Object>());
+
+        sports.setValue("favorite", "basketball");
+        sports.setValue("least", "soccer");
+
+        lookupService.addValue("John Doe", sports);
+
+        recordReader = new MockRecordParser();
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, null);
+
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+
+        runner.setProperty("lookup", "/name");
+        runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
+        runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
+
+        runner.enqueue("");
+        runner.run();
+
+        final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+
+        // We can't be sure of the order of the fields in the record, so we allow either 'least' or 'favorite' to be first
+        final String outputContents = new String(out.toByteArray());
+        assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n")
+            || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
+    }
+
 
     private static class MapLookup extends AbstractControllerService implements StringLookupService {
         private final Map<String, String> values = new HashMap<>();
@@ -260,4 +402,35 @@ public class TestLookupRecord {
         }
     }
 
+    private static class RecordLookup extends AbstractControllerService implements RecordLookupService {
+        private final Map<String, Record> values = new HashMap<>();
+
+        public void addValue(final String key, final Record value) {
+            values.put(key, value);
+        }
+
+        @Override
+        public Class<?> getValueType() {
+            return String.class;
+        }
+
+        @Override
+        public Optional<Record> lookup(final Map<String, String> coordinates) {
+            if (coordinates == null) {
+                return Optional.empty();
+            }
+
+            final String key = coordinates.get("lookup");
+            if (key == null) {
+                return Optional.empty();
+            }
+
+            return Optional.ofNullable(values.get(key));
+        }
+
+        @Override
+        public Set<String> getRequiredKeys() {
+            return Collections.singleton("lookup");
+        }
+    }
 }