You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/04/13 20:49:51 UTC

[nifi] branch main updated: NIFI-9903: This closes #5955. When using the 'success' relationship only for LookupRecord, lookup all records until a match is found, in order to determine the resultant schema. Refactored code to eliminate AbstractRouteRecord, because LookupRecord is the last processor that extended from it. Refactored code to use an inner interface to clean up code.

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 05f3d7510f NIFI-9903: This closes #5955. When using the 'success' relationship only for LookupRecord, lookup all records until a match is found, in order to determine the resultant schema. Refactored code to eliminate AbstractRouteRecord, because LookupRecord is the last processor that extended from it. Refactored code to use an inner interface to clean up code.
05f3d7510f is described below

commit 05f3d7510f24c24bd307461399c74195859d40e6
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Apr 11 13:50:06 2022 -0400

    NIFI-9903: This closes #5955. When using the 'success' relationship only for LookupRecord, lookup all records until a match is found, in order to determine the resultant schema. Refactored code to eliminate AbstractRouteRecord, because LookupRecord is the last processor that extended from it. Refactored code to use an inner interface to clean up code.
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../processors/standard/AbstractRouteRecord.java   | 243 ---------
 .../nifi/processors/standard/LookupRecord.java     | 564 ++++++++++++++++-----
 .../nifi/processors/standard/TestLookupRecord.java |  18 +
 3 files changed, 459 insertions(+), 366 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
deleted file mode 100644
index 1a31d90eae..0000000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.Tuple;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
-    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
-        .name("record-reader")
-        .displayName("Record Reader")
-        .description("Specifies the Controller Service to use for reading incoming data")
-        .identifiesControllerService(RecordReaderFactory.class)
-        .required(true)
-        .build();
-    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
-        .name("record-writer")
-        .displayName("Record Writer")
-        .description("Specifies the Controller Service to use for writing out the records")
-        .identifiesControllerService(RecordSetWriterFactory.class)
-        .required(true)
-        .build();
-
-    static final Relationship REL_FAILURE = new Relationship.Builder()
-        .name("failure")
-        .description("If a FlowFile cannot be transformed from the configured input format to the configured output format, "
-            + "the unchanged FlowFile will be routed to this relationship")
-        .build();
-    static final Relationship REL_ORIGINAL = new Relationship.Builder()
-        .name("original")
-        .description("Once a FlowFile has been processed and any derivative FlowFiles have been transferred, the original FlowFile will be transferred to this relationship.")
-        .build();
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(RECORD_READER);
-        properties.add(RECORD_WRITER);
-        return properties;
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        if (isRouteOriginal()) {
-            relationships.add(REL_ORIGINAL);
-        }
-
-        relationships.add(REL_FAILURE);
-        return relationships;
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final T flowFileContext;
-        try {
-            flowFileContext = getFlowFileContext(flowFile, context);
-        } catch (final Exception e) {
-            getLogger().error("Failed to process {}; routing to failure", new Object[] {flowFile, e});
-            session.transfer(flowFile, REL_FAILURE);
-            return;
-        }
-
-        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
-        final AtomicInteger numRecords = new AtomicInteger(0);
-        final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
-        final FlowFile original = flowFile;
-        final Map<String, String> originalAttributes = original.getAttributes();
-
-        try {
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream in) throws IOException {
-                    try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) {
-
-                        final Record firstRecord = reader.nextRecord();
-                        if (firstRecord == null) {
-                            getLogger().info("{} has no Records, so routing just the original FlowFile to 'original'", new Object[] {original});
-                            return;
-                        }
-
-                        final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, firstRecord.getSchema());
-
-                        final Set<Relationship> firstRecordRelationships = route(firstRecord, writeSchema, original, context, flowFileContext);
-                        for (final Relationship relationship : firstRecordRelationships) {
-                            writeRecord(firstRecord, relationship, writers, session, original, originalAttributes, writerFactory);
-                        }
-
-                        Record record;
-                        while ((record = reader.nextRecord()) != null) {
-                            final Set<Relationship> relationships = route(record, writeSchema, original, context, flowFileContext);
-                            numRecords.incrementAndGet();
-
-                            for (final Relationship relationship : relationships) {
-                                writeRecord(record, relationship, writers, session, original, originalAttributes, writerFactory);
-                            }
-                        }
-                    } catch (final SchemaNotFoundException | MalformedRecordException e) {
-                        throw new ProcessException("Could not parse incoming data", e);
-                    }
-                }
-            });
-
-            for (final Map.Entry<Relationship, Tuple<FlowFile, RecordSetWriter>> entry : writers.entrySet()) {
-                final Relationship relationship = entry.getKey();
-                final Tuple<FlowFile, RecordSetWriter> tuple = entry.getValue();
-                final RecordSetWriter writer = tuple.getValue();
-                FlowFile childFlowFile = tuple.getKey();
-
-                final WriteResult writeResult = writer.finishRecordSet();
-
-                try {
-                    writer.close();
-                } catch (final IOException ioe) {
-                    getLogger().warn("Failed to close Writer for {}", new Object[] {childFlowFile});
-                }
-
-                final Map<String, String> attributes = new HashMap<>();
-                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
-                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
-                attributes.putAll(writeResult.getAttributes());
-
-                childFlowFile = session.putAllAttributes(childFlowFile, attributes);
-                session.transfer(childFlowFile, relationship);
-                session.adjustCounter("Records Processed", writeResult.getRecordCount(), false);
-                session.adjustCounter("Records Routed to " + relationship.getName(), writeResult.getRecordCount(), false);
-
-                session.getProvenanceReporter().route(childFlowFile, relationship);
-            }
-        } catch (final Exception e) {
-            getLogger().error("Failed to process {}", new Object[] {flowFile, e});
-
-            for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
-                try {
-                    tuple.getValue().close();
-                } catch (final Exception e1) {
-                    getLogger().warn("Failed to close Writer for {}; some resources may not be cleaned up appropriately", new Object[] {tuple.getKey()});
-                }
-
-                session.remove(tuple.getKey());
-            }
-
-            session.transfer(flowFile, REL_FAILURE);
-            return;
-        } finally {
-            for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
-                final RecordSetWriter writer = tuple.getValue();
-                try {
-                    writer.close();
-                } catch (final Exception e) {
-                    getLogger().warn("Failed to close Record Writer for {}; some resources may not be properly cleaned up", new Object[] {tuple.getKey(), e});
-                }
-            }
-        }
-
-        if (isRouteOriginal()) {
-            flowFile = session.putAttribute(flowFile, "record.count", String.valueOf(numRecords));
-            session.transfer(flowFile, REL_ORIGINAL);
-        } else {
-            session.remove(flowFile);
-        }
-
-        getLogger().info("Successfully processed {}, creating {} derivative FlowFiles and processing {} records", new Object[] {flowFile, writers.size(), numRecords});
-    }
-
-    private void writeRecord(final Record record, final Relationship relationship, final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers, final ProcessSession session,
-                             final FlowFile original, final Map<String, String> originalAttributes, final RecordSetWriterFactory writerFactory) throws IOException, SchemaNotFoundException {
-        final RecordSetWriter recordSetWriter;
-        Tuple<FlowFile, RecordSetWriter> tuple = writers.get(relationship);
-
-        if (tuple == null) {
-            final FlowFile outFlowFile = session.create(original);
-            final OutputStream out = session.write(outFlowFile);
-
-            final RecordSchema recordWriteSchema = writerFactory.getSchema(originalAttributes, record.getSchema());
-            recordSetWriter = writerFactory.createWriter(getLogger(), recordWriteSchema, out, outFlowFile);
-            recordSetWriter.beginRecordSet();
-
-            tuple = new Tuple<>(outFlowFile, recordSetWriter);
-            writers.put(relationship, tuple);
-        } else {
-            recordSetWriter = tuple.getValue();
-        }
-
-        recordSetWriter.write(record);
-    }
-
-    protected abstract Set<Relationship> route(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context, T flowFileContext);
-
-    protected abstract boolean isRouteOriginal();
-
-    protected abstract T getFlowFileContext(FlowFile flowFile, ProcessContext context);
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index e25077dc38..4a25736229 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -35,15 +35,28 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.lookup.LookupFailureException;
 import org.apache.nifi.lookup.LookupService;
+import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.record.path.FieldValue;
 import org.apache.nifi.record.path.RecordPath;
 import org.apache.nifi.record.path.RecordPathResult;
 import org.apache.nifi.record.path.util.RecordPathCache;
 import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
@@ -52,6 +65,9 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.util.Tuple;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -89,9 +105,9 @@ import java.util.stream.Collectors;
                     description = "A RecordPath that points to the field whose value will be looked up in the configured Lookup Service")
 @SeeAlso(value = {ConvertRecord.class, SplitRecord.class},
         classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService", "org.apache.nifi.lookup.db.DatabaseRecordLookupService"})
-public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPath>, RecordPath>> {
+public class LookupRecord extends AbstractProcessor {
 
-    private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
+    private final RecordPathCache recordPathCache = new RecordPathCache(25);
     private volatile LookupService<?> lookupService;
 
     static final AllowableValue ROUTE_TO_SUCCESS = new AllowableValue("route-to-success", "Route to 'success'",
@@ -113,6 +129,22 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
             + "Service and the value returned by the Lookup Service will be used to replace the existing value. It is possible to configure multiple dynamic properties "
             + "to replace multiple values in one execution. This strategy only supports simple types replacements (strings, integers, etc).");
 
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("Specifies the Controller Service to use for reading incoming data")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Controller Service to use for writing out the records")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+
     static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder()
         .name("lookup-service")
         .displayName("Lookup Service")
@@ -174,6 +206,10 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
         .name("success")
         .description("All records will be sent to this Relationship if configured to do so, unless a failure occurs")
         .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("If a FlowFile cannot be enriched, the unchanged FlowFile will be routed to this relationship")
+        .build();
 
     private static final Set<Relationship> MATCHED_COLLECTION = Collections.singleton(REL_MATCHED);
     private static final Set<Relationship> UNMATCHED_COLLECTION = Collections.singleton(REL_UNMATCHED);
@@ -195,7 +231,8 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.addAll(super.getSupportedPropertyDescriptors());
+        properties.add(RECORD_READER);
+        properties.add(RECORD_WRITER);
         properties.add(LOOKUP_SERVICE);
         properties.add(RESULT_RECORD_PATH);
         properties.add(ROUTING_STRATEGY);
@@ -291,177 +328,375 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
         }
     }
 
+
     @Override
-    protected Set<Relationship> route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context,
-        final Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
 
-        final boolean isInPlaceReplacement = context.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue());
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        if(isInPlaceReplacement) {
-            return doInPlaceReplacement(record, flowFile, context, flowFileContext);
-        } else {
-            return doResultPathReplacement(record, flowFile, context, flowFileContext);
+        final FlowFile original = flowFile;
+        final Map<String, String> originalAttributes = original.getAttributes();
+
+        final LookupContext lookupContext = createLookupContext(flowFile, context, session, writerFactory);
+        final ReplacementStrategy replacementStrategy = createReplacementStrategy(context);
+
+        final RecordSchema enrichedSchema;
+        try {
+            enrichedSchema = replacementStrategy.determineResultSchema(readerFactory, writerFactory, context, session, flowFile, lookupContext);
+        } catch (final Exception e) {
+            getLogger().error("Could not determine schema to use for enriched FlowFiles", e);
+            session.transfer(original, REL_FAILURE);
+            return;
         }
 
-    }
+        try {
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) {
+
+                        final Map<Relationship, RecordSchema> writeSchemas = new HashMap<>();
+
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+                            final Set<Relationship> relationships = replacementStrategy.lookup(record, context, lookupContext);
+
+                            for (final Relationship relationship : relationships) {
+                                // Determine the Write Schema to use for each relationship
+                                RecordSchema writeSchema = writeSchemas.get(relationship);
+                                if (writeSchema == null) {
+                                    final RecordSchema outputSchema = enrichedSchema == null ? record.getSchema() : enrichedSchema;
+                                    writeSchema = writerFactory.getSchema(originalAttributes, outputSchema);
+                                    writeSchemas.put(relationship, writeSchema);
+                                }
 
-    private Set<Relationship> doInPlaceReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
-        final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
-        final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
-        final String coordinateKey = lookupService.getRequiredKeys().iterator().next();
-        boolean hasUnmatchedValue = false;
+                                final RecordSetWriter writer = lookupContext.getRecordWriterForRelationship(relationship, writeSchema);
+                                writer.write(record);
+                            }
+                        }
+                    } catch (final SchemaNotFoundException | MalformedRecordException e) {
+                        throw new ProcessException("Could not parse incoming data", e);
+                    }
+                }
+            });
 
-        for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
-            final RecordPath recordPath = entry.getValue();
+            for (final Relationship relationship : lookupContext.getRelationshipsUsed()) {
+                final RecordSetWriter writer = lookupContext.getExistingRecordWriterForRelationship(relationship);
+                FlowFile childFlowFile = lookupContext.getFlowFileForRelationship(relationship);
 
-            final RecordPathResult pathResult = recordPath.evaluate(record);
-            final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields()
-                .filter(fieldVal -> fieldVal.getValue() != null)
-                .collect(Collectors.toList());
+                final WriteResult writeResult = writer.finishRecordSet();
 
-            if (lookupFieldValues.isEmpty()) {
-                final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
-                getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels});
-                return rels;
+                try {
+                    writer.close();
+                } catch (final IOException ioe) {
+                    getLogger().warn("Failed to close Writer for {}", new Object[] {childFlowFile});
+                }
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+
+                childFlowFile = session.putAllAttributes(childFlowFile, attributes);
+                session.transfer(childFlowFile, relationship);
+                session.adjustCounter("Records Processed", writeResult.getRecordCount(), false);
+                session.adjustCounter("Records Routed to " + relationship.getName(), writeResult.getRecordCount(), false);
+
+                session.getProvenanceReporter().route(childFlowFile, relationship);
             }
 
-            for (FieldValue fieldValue : lookupFieldValues) {
-                final Object coordinateValue = DataTypeUtils.convertType(fieldValue.getValue(), fieldValue.getField().getDataType(), null, null, null, fieldValue.getField().getFieldName());
+        } catch (final Exception e) {
+            getLogger().error("Failed to process {}", new Object[]{flowFile, e});
 
-                lookupCoordinates.clear();
-                lookupCoordinates.put(coordinateKey, coordinateValue);
+            for (final Relationship relationship : lookupContext.getRelationshipsUsed()) {
+                final RecordSetWriter writer = lookupContext.getExistingRecordWriterForRelationship(relationship);
+                final FlowFile childFlowFile = lookupContext.getFlowFileForRelationship(relationship);
 
-                final Optional<?> lookupValueOption;
                 try {
-                    lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
+                    writer.close();
+                } catch (final Exception e1) {
+                    getLogger().warn("Failed to close Writer for {}; some resources may not be cleaned up appropriately", writer);
+                }
+
+                session.remove(childFlowFile);
+            }
+
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        } finally {
+            for (final Relationship relationship : lookupContext.getRelationshipsUsed()) {
+                final RecordSetWriter writer = lookupContext.getExistingRecordWriterForRelationship(relationship);
+
+                try {
+                    writer.close();
                 } catch (final Exception e) {
-                    throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
+                    getLogger().warn("Failed to close Record Writer for {}; some resources may not be properly cleaned up", writer, e);
                 }
+            }
+        }
+
+        session.remove(flowFile);
+        getLogger().info("Successfully processed {}, creating {} derivative FlowFiles and processing {} records",
+            flowFile, lookupContext.getRelationshipsUsed().size(), replacementStrategy.getLookupCount());
+    }
+
+    private ReplacementStrategy createReplacementStrategy(final ProcessContext context) {
+        final boolean isInPlaceReplacement = context.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue());
+
+        if (isInPlaceReplacement) {
+            return new InPlaceReplacementStrategy();
+        } else {
+            return new RecordPathReplacementStrategy();
+        }
+    }
+
+
+    private class InPlaceReplacementStrategy implements ReplacementStrategy {
+        private int lookupCount = 0;
+
+        @Override
+        public Set<Relationship> lookup(final Record record, final ProcessContext context, final LookupContext lookupContext) {
+            lookupCount++;
+
+            final Map<String, RecordPath> recordPaths = lookupContext.getRecordPathsByCoordinateKey();
+            final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
+            final String coordinateKey = lookupService.getRequiredKeys().iterator().next();
+            final FlowFile flowFile = lookupContext.getOriginalFlowFile();
+
+            boolean hasUnmatchedValue = false;
+            for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
+                final RecordPath recordPath = entry.getValue();
+
+                final RecordPathResult pathResult = recordPath.evaluate(record);
+                final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields()
+                    .filter(fieldVal -> fieldVal.getValue() != null)
+                    .collect(Collectors.toList());
 
-                if (!lookupValueOption.isPresent()) {
-                    hasUnmatchedValue = true;
-                    continue;
+                if (lookupFieldValues.isEmpty()) {
+                    final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
+                    getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels});
+                    return rels;
                 }
 
-                final Object lookupValue = lookupValueOption.get();
+                for (final FieldValue fieldValue : lookupFieldValues) {
+                    final Object coordinateValue = DataTypeUtils.convertType(fieldValue.getValue(), fieldValue.getField().getDataType(), null, null, null, fieldValue.getField().getFieldName());
 
-                final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
-                fieldValue.updateValue(lookupValue, inferredDataType);
+                    lookupCoordinates.clear();
+                    lookupCoordinates.put(coordinateKey, coordinateValue);
+
+                    final Optional<?> lookupValueOption;
+                    try {
+                        lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
+                    } catch (final Exception e) {
+                        throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
+                    }
+
+                    if (!lookupValueOption.isPresent()) {
+                        hasUnmatchedValue = true;
+                        continue;
+                    }
+
+                    final Object lookupValue = lookupValueOption.get();
+
+                    final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
+                    fieldValue.updateValue(lookupValue, inferredDataType);
+                }
+            }
+
+            if (hasUnmatchedValue) {
+                return routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
+            } else {
+                return routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
             }
         }
 
-        if (hasUnmatchedValue) {
-            return routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
-        } else {
-            return routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
+        @Override
+        public RecordSchema determineResultSchema(final RecordReaderFactory readerFactory, final RecordSetWriterFactory writerFactory, final ProcessContext context, final ProcessSession session,
+                                                  final FlowFile flowFile, final LookupContext lookupContext) throws IOException, SchemaNotFoundException, MalformedRecordException {
+
+            try (final InputStream in = session.read(flowFile);
+                 final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+                return reader.getSchema();
+            }
+        }
+
+        @Override
+        public int getLookupCount() {
+            return lookupCount;
         }
     }
 
-    private Set<Relationship> doResultPathReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
-        final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
-        final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
 
-        for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
-            final String coordinateKey = entry.getKey();
-            final RecordPath recordPath = entry.getValue();
+    private class RecordPathReplacementStrategy implements ReplacementStrategy {
+        private int lookupCount = 0;
 
-            final RecordPathResult pathResult = recordPath.evaluate(record);
-            final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields()
-                .filter(fieldVal -> fieldVal.getValue() != null)
-                .collect(Collectors.toList());
+        @Override
+        public Set<Relationship> lookup(final Record record, final ProcessContext context, final LookupContext lookupContext) {
+            lookupCount++;
 
-            if (lookupFieldValues.isEmpty()) {
+            final Map<String, Object> lookupCoordinates = createLookupCoordinates(record, lookupContext, true);
+            if (lookupCoordinates.isEmpty()) {
                 final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
-                getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels});
                 return rels;
             }
 
-            if (lookupFieldValues.size() > 1) {
+            final FlowFile flowFile = lookupContext.getOriginalFlowFile();
+            final Optional<?> lookupValueOption;
+            try {
+                lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
+            } catch (final Exception e) {
+                throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
+            }
+
+            if (!lookupValueOption.isPresent()) {
                 final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
-                getLogger().debug("RecordPath for property '{}' matched {} fields in a record for {}; routing record to {}",
-                    new Object[] {coordinateKey, lookupFieldValues.size(), flowFile, rels});
                 return rels;
             }
 
-            final FieldValue fieldValue = lookupFieldValues.get(0);
-            final Object coordinateValue = DataTypeUtils.convertType(fieldValue.getValue(), fieldValue.getField().getDataType(), null, null, null, fieldValue.getField().getFieldName());
-            lookupCoordinates.put(coordinateKey, coordinateValue);
-        }
+            applyLookupResult(record, context, lookupContext, lookupValueOption.get());
 
-        final Optional<?> lookupValueOption;
-        try {
-            lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
-        } catch (final Exception e) {
-            throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
-        }
-
-        if (!lookupValueOption.isPresent()) {
-            final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
+            final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
             return rels;
         }
 
-        // Ensure that the Record has the appropriate schema to account for the newly added values
-        final RecordPath resultPath = flowFileContext.getValue();
-        if (resultPath != null) {
-            final Object lookupValue = lookupValueOption.get();
-            final RecordPathResult resultPathResult = flowFileContext.getValue().evaluate(record);
-
-            final String resultContentsValue = context.getProperty(RESULT_CONTENTS).getValue();
-            if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) {
-                final Record lookupRecord = (Record) lookupValue;
-
-                // User wants to add all fields of the resultant Record to the specified Record Path.
-                // If the destination Record Path returns to us a Record, then we will add all field values of
-                // the Lookup Record to the destination Record. However, if the destination Record Path returns
-                // something other than a Record, then we can't add the fields to it. We can only replace it,
-                // because it doesn't make sense to add fields to anything but a Record.
-                resultPathResult.getSelectedFields().forEach(fieldVal -> {
-                    final Object destinationValue = fieldVal.getValue();
-
-                    if (destinationValue instanceof Record) {
-                        final Record destinationRecord = (Record) destinationValue;
-
-                        for (final String fieldName : lookupRecord.getRawFieldNames()) {
-                            final Object value = lookupRecord.getValue(fieldName);
-
-                            final Optional<RecordField> recordFieldOption = lookupRecord.getSchema().getField(fieldName);
-                            if (recordFieldOption.isPresent()) {
-                                // Even if the looked up field is not nullable, if the lookup key didn't match with any record,
-                                // and matched/unmatched records are written to the same FlowFile routed to 'success' relationship,
-                                // then enriched fields should be nullable to support unmatched records whose enriched fields will be null.
-                                RecordField field = recordFieldOption.get();
-                                if (!routeToMatchedUnmatched && !field.isNullable()) {
-                                    field = new RecordField(field.getFieldName(), field.getDataType(), field.getDefaultValue(), field.getAliases(), true);
+        private void applyLookupResult(final Record record, final ProcessContext context, final LookupContext lookupContext, final Object lookupValue) {
+            // Ensure that the Record has the appropriate schema to account for the newly added values
+            final RecordPath resultPath = lookupContext.getResultRecordPath();
+            if (resultPath != null) {
+                final RecordPathResult resultPathResult = resultPath.evaluate(record);
+
+                final String resultContentsValue = context.getProperty(RESULT_CONTENTS).getValue();
+                if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) {
+                    final Record lookupRecord = (Record) lookupValue;
+
+                    // User wants to add all fields of the resultant Record to the specified Record Path.
+                    // If the destination Record Path returns to us a Record, then we will add all field values of
+                    // the Lookup Record to the destination Record. However, if the destination Record Path returns
+                    // something other than a Record, then we can't add the fields to it. We can only replace it,
+                    // because it doesn't make sense to add fields to anything but a Record.
+                    resultPathResult.getSelectedFields().forEach(fieldVal -> {
+                        final Object destinationValue = fieldVal.getValue();
+
+                        if (destinationValue instanceof Record) {
+                            final Record destinationRecord = (Record) destinationValue;
+
+                            for (final String fieldName : lookupRecord.getRawFieldNames()) {
+                                final Object value = lookupRecord.getValue(fieldName);
+
+                                final Optional<RecordField> recordFieldOption = lookupRecord.getSchema().getField(fieldName);
+                                if (recordFieldOption.isPresent()) {
+                                    // Even if the looked up field is not nullable, if the lookup key didn't match with any record,
+                                    // and matched/unmatched records are written to the same FlowFile routed to 'success' relationship,
+                                    // then enriched fields should be nullable to support unmatched records whose enriched fields will be null.
+                                    RecordField field = recordFieldOption.get();
+                                    if (!routeToMatchedUnmatched && !field.isNullable()) {
+                                        field = new RecordField(field.getFieldName(), field.getDataType(), field.getDefaultValue(), field.getAliases(), true);
+                                    }
+                                    destinationRecord.setValue(field, value);
+                                } else {
+                                    destinationRecord.setValue(fieldName, value);
                                 }
-                                destinationRecord.setValue(field, value);
-                            } else {
-                                destinationRecord.setValue(fieldName, value);
                             }
+                        } else {
+                            final Optional<Record> parentOption = fieldVal.getParentRecord();
+                            parentOption.ifPresent(parent -> parent.setValue(fieldVal.getField(), lookupRecord));
                         }
-                    } else {
-                        final Optional<Record> parentOption = fieldVal.getParentRecord();
-                        parentOption.ifPresent(parent -> parent.setValue(fieldVal.getField(), lookupRecord));
+                    });
+                } else {
+                    final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
+                    resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue, inferredDataType));
+                }
+
+                record.incorporateInactiveFields();
+            }
+        }
+
+        @Override
+        public RecordSchema determineResultSchema(final RecordReaderFactory readerFactory, final RecordSetWriterFactory writerFactory, final ProcessContext context, final ProcessSession session,
+                                                  final FlowFile flowFile, final LookupContext lookupContext)
+                throws IOException, SchemaNotFoundException, MalformedRecordException, LookupFailureException {
+
+            final Map<String, String> flowFileAttributes = flowFile.getAttributes();
+            try (final InputStream in = session.read(flowFile);
+                 final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    final Map<String, Object> lookupCoordinates = createLookupCoordinates(record, lookupContext, false);
+                    if (lookupCoordinates.isEmpty()) {
+                        continue;
                     }
-                });
-            } else {
-                final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
-                resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue, inferredDataType));
+
+                    final Optional<?> lookupResult = lookupService.lookup(lookupCoordinates, flowFileAttributes);
+                    if (!lookupResult.isPresent()) {
+                        continue;
+                    }
+
+                    applyLookupResult(record, context, lookupContext, lookupResult.get());
+                    getLogger().debug("Found a Record for {} that returned a result from the LookupService. Will provide the following schema to the Writer: {}", flowFile, record.getSchema());
+                    return record.getSchema();
+                }
+
+                getLogger().debug("Found no Record for {} that returned a result from the LookupService. Will provider Reader's schema to the Writer.", flowFile);
+                return reader.getSchema();
+            }
+        }
+
+        private Map<String, Object> createLookupCoordinates(final Record record, final LookupContext lookupContext, final boolean logIfNotMatched) {
+            final Map<String, RecordPath> recordPaths = lookupContext.getRecordPathsByCoordinateKey();
+            final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
+            final FlowFile flowFile = lookupContext.getOriginalFlowFile();
+
+            for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
+                final String coordinateKey = entry.getKey();
+                final RecordPath recordPath = entry.getValue();
+
+                final RecordPathResult pathResult = recordPath.evaluate(record);
+                final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields()
+                    .filter(fieldVal -> fieldVal.getValue() != null)
+                    .collect(Collectors.toList());
+
+                if (lookupFieldValues.isEmpty()) {
+                    if (logIfNotMatched) {
+                        final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
+                        getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", coordinateKey, flowFile, rels);
+                    }
+
+                    return Collections.emptyMap();
+                }
+
+                if (lookupFieldValues.size() > 1) {
+                    if (logIfNotMatched) {
+                        final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
+                        getLogger().debug("RecordPath for property '{}' matched {} fields in a record for {}; routing record to {}",
+                            coordinateKey, lookupFieldValues.size(), flowFile, rels);
+                    }
+
+                    return Collections.emptyMap();
+                }
+
+                final FieldValue fieldValue = lookupFieldValues.get(0);
+                final Object coordinateValue = DataTypeUtils.convertType(fieldValue.getValue(), fieldValue.getField().getDataType(), null, null, null, fieldValue.getField().getFieldName());
+                lookupCoordinates.put(coordinateKey, coordinateValue);
             }
 
-            record.incorporateInactiveFields();
+            return lookupCoordinates;
         }
 
-        final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
-        return rels;
+        @Override
+        public int getLookupCount() {
+            return lookupCount;
+        }
     }
 
-    @Override
-    protected boolean isRouteOriginal() {
-        return false;
-    }
 
-    @Override
-    protected Tuple<Map<String, RecordPath>, RecordPath> getFlowFileContext(final FlowFile flowFile, final ProcessContext context) {
+    protected LookupContext createLookupContext(final FlowFile flowFile, final ProcessContext context, final ProcessSession session, final RecordSetWriterFactory writerFactory) {
         final Map<String, RecordPath> recordPaths = new HashMap<>();
         for (final PropertyDescriptor prop : context.getProperties().keySet()) {
             if (!prop.isDynamic()) {
@@ -481,7 +716,90 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
             resultRecordPath = null;
         }
 
-        return new Tuple<>(recordPaths, resultRecordPath);
+        return new LookupContext(recordPaths, resultRecordPath, session, flowFile, writerFactory, getLogger());
     }
 
+    private interface ReplacementStrategy {
+        Set<Relationship> lookup(Record record, ProcessContext context, LookupContext lookupContext);
+
+        RecordSchema determineResultSchema(RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, ProcessContext context, ProcessSession session, FlowFile flowFile,
+                                           LookupContext lookupContext) throws IOException, SchemaNotFoundException, MalformedRecordException, LookupFailureException;
+
+        int getLookupCount();
+    }
+
+
+    private static class LookupContext {
+        private final Map<String, RecordPath> recordPathsByCoordinateKey;
+        private final RecordPath resultRecordPath;
+        private final ProcessSession session;
+        private final FlowFile flowFile;
+        private final RecordSetWriterFactory writerFactory;
+        private final ComponentLog logger;
+
+        private final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writersByRelationship = new HashMap<>();
+
+
+        public LookupContext(final Map<String, RecordPath> recordPathsByCoordinateKey, final RecordPath resultRecordPath, final ProcessSession session, final FlowFile flowFile,
+                             final RecordSetWriterFactory writerFactory, final ComponentLog logger) {
+            this.recordPathsByCoordinateKey = recordPathsByCoordinateKey;
+            this.resultRecordPath = resultRecordPath;
+            this.session = session;
+            this.flowFile = flowFile;
+            this.writerFactory = writerFactory;
+            this.logger = logger;
+        }
+
+        public Map<String, RecordPath> getRecordPathsByCoordinateKey() {
+            return recordPathsByCoordinateKey;
+        }
+
+        public RecordPath getResultRecordPath() {
+            return resultRecordPath;
+        }
+
+        public FlowFile getOriginalFlowFile() {
+            return flowFile;
+        }
+
+        private Set<Relationship> getRelationshipsUsed() {
+            return writersByRelationship.keySet();
+        }
+
+        public FlowFile getFlowFileForRelationship(final Relationship relationship) {
+            final Tuple<FlowFile, RecordSetWriter> tuple = writersByRelationship.get(relationship);
+            return tuple.getKey();
+        }
+
+        public RecordSetWriter getExistingRecordWriterForRelationship(final Relationship relationship) {
+            final Tuple<FlowFile, RecordSetWriter> tuple = writersByRelationship.get(relationship);
+            return tuple.getValue();
+        }
+
+        public RecordSetWriter getRecordWriterForRelationship(final Relationship relationship, final RecordSchema schema) throws IOException, SchemaNotFoundException {
+            final Tuple<FlowFile, RecordSetWriter> tuple = writersByRelationship.get(relationship);
+            if (tuple != null) {
+                return tuple.getValue();
+            }
+
+            final FlowFile outFlowFile = session.create(flowFile);
+            final OutputStream out = session.write(outFlowFile);
+            try {
+                final RecordSchema recordWriteSchema = writerFactory.getSchema(flowFile.getAttributes(), schema);
+                final RecordSetWriter recordSetWriter = writerFactory.createWriter(logger, recordWriteSchema, out, outFlowFile);
+                recordSetWriter.beginRecordSet();
+
+                writersByRelationship.put(relationship, new Tuple<>(outFlowFile, recordSetWriter));
+                return recordSetWriter;
+            } catch (final Exception e) {
+                try {
+                    out.close();
+                } catch (final Exception e1) {
+                    e.addSuppressed(e1);
+                }
+
+                throw e;
+            }
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
index 251a24d9f5..89a0ed3759 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -194,6 +194,24 @@ public class TestLookupRecord {
         unmatched.assertContentEquals("Jane Doe,47,\n");
     }
 
+    @Test
+    public void testAllMatchButFirstRouteToSuccess() {
+        lookupService.addValue("Jane Doe", "Soccer");
+        lookupService.addValue("Jimmy Doe", "Football");
+        runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS);
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(LookupRecord.REL_FAILURE, 0);
+        runner.assertTransferCount(LookupRecord.REL_SUCCESS, 1);
+
+        final MockFlowFile matched = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0);
+        matched.assertAttributeEquals("record.count", "3");
+        matched.assertAttributeEquals("mime.type", "text/plain");
+        matched.assertContentEquals("John Doe,48,\nJane Doe,47,Soccer\nJimmy Doe,14,Football\n");
+    }
+
 
     @Test
     public void testResultPathNotFound() {