You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jw...@apache.org on 2017/09/06 04:32:16 UTC
nifi git commit: NIFI-4116: Allow fields of Record returned from
Lookup Service to be placed into record in the input,
instead of requiring that the 'wrapper record' returned from Lookup be
included
Repository: nifi
Updated Branches:
refs/heads/master 20a6374bf -> bfd6c0aef
NIFI-4116: Allow fields of Record returned from Lookup Service to be placed into record in the input, instead of requiring that the 'wrapper record' returned from Lookup be included
Signed-off-by: James Wing <jv...@gmail.com>
This closes #2110.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bfd6c0ae
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bfd6c0ae
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bfd6c0ae
Branch: refs/heads/master
Commit: bfd6c0aef768f53be8fdea62afdcd5404099e089
Parents: 20a6374
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Aug 25 16:22:43 2017 -0400
Committer: James Wing <jv...@gmail.com>
Committed: Tue Sep 5 21:29:16 2017 -0700
----------------------------------------------------------------------
.../nifi/processors/standard/LookupRecord.java | 56 +++++-
.../processors/standard/TestLookupRecord.java | 173 +++++++++++++++++++
2 files changed, 224 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/bfd6c0ae/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 10539bc..286f7ee 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -95,6 +95,11 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
"Records will be routed to either a 'matched' or an 'unmatched' Relationship depending on whether or not there was a match in the configured Lookup Service. "
+ "A single input FlowFile may result in two different output FlowFiles.");
+ static final AllowableValue RESULT_ENTIRE_RECORD = new AllowableValue("insert-entire-record", "Insert Entire Record",
+ "The entire Record that is retrieved from the Lookup Service will be inserted into the destination path.");
+ static final AllowableValue RESULT_RECORD_FIELDS = new AllowableValue("record-fields", "Insert Record Fields",
+ "All of the fields in the Record that is retrieved from the Lookup Service will be inserted into the destination path.");
+
static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder()
.name("lookup-service")
.displayName("Lookup Service")
@@ -114,6 +119,16 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
.required(false)
.build();
+ static final PropertyDescriptor RESULT_CONTENTS = new PropertyDescriptor.Builder()
+ .name("result-contents")
+ .displayName("Record Result Contents")
+ .description("When a result is obtained that contains a Record, this property determines whether the Record itself is inserted at the configured "
+ + "path or if the contents of the Record (i.e., the sub-fields) will be inserted at the configured path.")
+ .allowableValues(RESULT_ENTIRE_RECORD, RESULT_RECORD_FIELDS)
+ .defaultValue(RESULT_ENTIRE_RECORD.getValue())
+ .required(true)
+ .build();
+
static final PropertyDescriptor ROUTING_STRATEGY = new PropertyDescriptor.Builder()
.name("routing-strategy")
.displayName("Routing Strategy")
@@ -161,6 +176,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
properties.add(LOOKUP_SERVICE);
properties.add(RESULT_RECORD_PATH);
properties.add(ROUTING_STRATEGY);
+ properties.add(RESULT_CONTENTS);
return properties;
}
@@ -272,14 +288,14 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
lookupCoordinates.put(coordinateKey, coordinateValue);
}
- final Optional<?> lookupValue;
+ final Optional<?> lookupValueOption;
try {
- lookupValue = lookupService.lookup(lookupCoordinates);
+ lookupValueOption = lookupService.lookup(lookupCoordinates);
} catch (final Exception e) {
throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
}
- if (!lookupValue.isPresent()) {
+ if (!lookupValueOption.isPresent()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
return rels;
}
@@ -289,9 +305,39 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
if (resultPath != null) {
record.incorporateSchema(writeSchema);
- final Object replacementValue = lookupValue.get();
+ final Object lookupValue = lookupValueOption.get();
final RecordPathResult resultPathResult = flowFileContext.getValue().evaluate(record);
- resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(replacementValue));
+
+ final String resultContentsValue = context.getProperty(RESULT_CONTENTS).getValue();
+ if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) {
+ final Record lookupRecord = (Record) lookupValue;
+
+ // Use wants to add all fields of the resultant Record to the specified Record Path.
+ // If the destination Record Path returns to us a Record, then we will add all field values of
+ // the Lookup Record to the destination Record. However, if the destination Record Path returns
+ // something other than a Record, then we can't add the fields to it. We can only replace it,
+ // because it doesn't make sense to add fields to anything but a Record.
+ resultPathResult.getSelectedFields().forEach(fieldVal -> {
+ final Object destinationValue = fieldVal.getValue();
+
+ if (destinationValue instanceof Record) {
+ final Record destinationRecord = (Record) destinationValue;
+
+ for (final String fieldName : lookupRecord.getRawFieldNames()) {
+ final Object value = lookupRecord.getValue(fieldName);
+ destinationRecord.setValue(fieldName, value);
+ }
+ } else {
+ final Optional<Record> parentOption = fieldVal.getParentRecord();
+
+ if (parentOption.isPresent()) {
+ parentOption.get().setValue(fieldVal.getField().getFieldName(), lookupRecord);
+ }
+ }
+ });
+ } else {
+ resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue));
+ }
}
final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
http://git-wip-us.apache.org/repos/asf/nifi/blob/bfd6c0ae/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
index b84f518..29966e7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -17,19 +17,30 @@
package org.apache.nifi.processors.standard;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.lookup.RecordLookupService;
import org.apache.nifi.lookup.StringLookupService;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -227,6 +238,137 @@ public class TestLookupRecord {
}
+ @Test
+ public void testAddFieldsToExistingRecord() throws InitializationException, IOException {
+ final RecordLookup lookupService = new RecordLookup();
+ runner.addControllerService("lookup", lookupService);
+ runner.enableControllerService(lookupService);
+
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+ final Record sports = new MapRecord(schema, new HashMap<String, Object>());
+
+ sports.setValue("favorite", "basketball");
+ sports.setValue("least", "soccer");
+
+ lookupService.addValue("John Doe", sports);
+
+ recordReader = new MockRecordParser();
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+ recordReader.addSchemaField("favorite", RecordFieldType.STRING);
+ recordReader.addSchemaField("least", RecordFieldType.STRING);
+
+ recordReader.addRecord("John Doe", 48, null, "baseball");
+
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+
+ runner.setProperty("lookup", "/name");
+ runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/");
+ runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
+
+ runner.enqueue("");
+ runner.run();
+
+ final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+ out.assertContentEquals("John Doe,48,basketball,soccer\n");
+ }
+
+ /**
+ * If the output fields are added to a record that doesn't exist, the result should be that a Record is
+ * created and the results added to it.
+ */
+ @Test
+ public void testAddFieldsToNonExistentRecord() throws InitializationException {
+ final RecordLookup lookupService = new RecordLookup();
+ runner.addControllerService("lookup", lookupService);
+ runner.enableControllerService(lookupService);
+
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+ final Record sports = new MapRecord(schema, new HashMap<String, Object>());
+
+ sports.setValue("favorite", "basketball");
+ sports.setValue("least", "soccer");
+
+ lookupService.addValue("John Doe", sports);
+
+ recordReader = new MockRecordParser();
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+ recordReader.addSchemaField("sport", RecordFieldType.RECORD);
+
+ recordReader.addRecord("John Doe", 48, null);
+
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+
+ runner.setProperty("lookup", "/name");
+ runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
+ runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
+
+ runner.enqueue("");
+ runner.run();
+
+ final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+
+ // We can't be sure of the order of the fields in the record, so we allow either 'least' or 'favorite' to be first
+ final String outputContents = new String(out.toByteArray());
+ assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n")
+ || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
+ }
+
+ /**
+ * If the output fields are added to a non-record field, then the result should be that the field
+ * becomes a UNION that does allow the Record and the value is set to a Record.
+ */
+ @Test
+ public void testAddFieldsToNonRecordField() throws InitializationException {
+ final RecordLookup lookupService = new RecordLookup();
+ runner.addControllerService("lookup", lookupService);
+ runner.enableControllerService(lookupService);
+
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+ final Record sports = new MapRecord(schema, new HashMap<String, Object>());
+
+ sports.setValue("favorite", "basketball");
+ sports.setValue("least", "soccer");
+
+ lookupService.addValue("John Doe", sports);
+
+ recordReader = new MockRecordParser();
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("age", RecordFieldType.INT);
+ recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+ recordReader.addRecord("John Doe", 48, null);
+
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+
+ runner.setProperty("lookup", "/name");
+ runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
+ runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
+
+ runner.enqueue("");
+ runner.run();
+
+ final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
+
+ // We can't be sure of the order of the fields in the record, so we allow either 'least' or 'favorite' to be first
+ final String outputContents = new String(out.toByteArray());
+ assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n")
+ || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
+ }
+
private static class MapLookup extends AbstractControllerService implements StringLookupService {
private final Map<String, String> values = new HashMap<>();
@@ -260,4 +402,35 @@ public class TestLookupRecord {
}
}
+ private static class RecordLookup extends AbstractControllerService implements RecordLookupService {
+ private final Map<String, Record> values = new HashMap<>();
+
+ public void addValue(final String key, final Record value) {
+ values.put(key, value);
+ }
+
+ @Override
+ public Class<?> getValueType() {
+ return String.class;
+ }
+
+ @Override
+ public Optional<Record> lookup(final Map<String, String> coordinates) {
+ if (coordinates == null) {
+ return Optional.empty();
+ }
+
+ final String key = coordinates.get("lookup");
+ if (key == null) {
+ return Optional.empty();
+ }
+
+ return Optional.ofNullable(values.get(key));
+ }
+
+ @Override
+ public Set<String> getRequiredKeys() {
+ return Collections.singleton("lookup");
+ }
+ }
}