You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/19 06:10:06 UTC

[4/6] nifi git commit: NIFI-3863: Initial implementation of Lookup Services. Implemented LookupRecord processors. This required some refactoring of RecordSetWriter interface, so refactored that interface and all implementations and references of it

NIFI-3863: Initial implementation of Lookup Services. Implemented LookupRecord processors. This required some refactoring of RecordSetWriter interface, so refactored that interface and all implementations and references of it


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9bd0246a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9bd0246a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9bd0246a

Branch: refs/heads/master
Commit: 9bd0246a96cfcda01b972834823250a9cddf77a8
Parents: 3b98abb
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed May 10 13:09:45 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri May 19 01:02:41 2017 -0400

----------------------------------------------------------------------
 .../controller/AbstractControllerService.java   |  28 ++
 nifi-assembly/pom.xml                           |   5 +
 .../nifi/record/path/StandardFieldValue.java    |   7 +-
 .../serialization/AbstractRecordSetWriter.java  | 105 ++++++
 .../nifi/serialization/RecordSetWriter.java     |  21 +-
 .../apache/nifi/serialization/RecordWriter.java |   7 +-
 .../hadoop/AbstractFetchHDFSRecord.java         |  11 +-
 .../serialization/record/MockRecordParser.java  |   5 +-
 .../serialization/record/MockRecordWriter.java  |  66 ++--
 .../repository/StandardProcessSession.java      |  12 +-
 .../processors/kafka/pubsub/ConsumerLease.java  |  32 +-
 .../kafka/pubsub/PublishKafkaRecord_0_10.java   |  14 +-
 .../processors/kafka/pubsub/PublisherLease.java |  14 +-
 .../pubsub/TestPublishKafkaRecord_0_10.java     |  19 +-
 .../kafka/pubsub/util/MockRecordWriter.java     |  19 +-
 .../processors/parquet/FetchParquetTest.java    |   4 +-
 .../services/AvroSchemaValidator.java           |   5 +-
 .../record/script/ScriptedRecordSetWriter.java  |   9 +-
 .../script/ScriptedRecordSetWriterTest.groovy   |   7 +-
 .../groovy/test_record_writer_inline.groovy     |  35 +-
 .../nifi-standard-processors/pom.xml            |   4 +
 .../standard/AbstractRecordProcessor.java       |  33 +-
 .../standard/AbstractRouteRecord.java           | 223 +++++++++++++
 .../nifi/processors/standard/LookupRecord.java  | 208 ++++++++++++
 .../nifi/processors/standard/QueryRecord.java   |  29 +-
 .../nifi/processors/standard/SplitRecord.java   |  31 +-
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/standard/TestLookupRecord.java   | 232 +++++++++++++
 .../processors/standard/TestQueryRecord.java    |  20 +-
 .../nifi-lookup-service-api/pom.xml             |  36 ++
 .../nifi/lookup/LookupFailureException.java     |  37 +++
 .../org/apache/nifi/lookup/LookupService.java   |  41 +++
 .../apache/nifi/lookup/RecordLookupService.java |  41 +++
 .../apache/nifi/lookup/StringLookupService.java |  39 +++
 .../nifi-lookup-services-nar/pom.xml            |  35 ++
 .../src/main/resources/META-INF/NOTICE          |  73 ++++
 .../nifi-lookup-services/pom.xml                |  51 +++
 .../lookup/SimpleKeyValueLookupService.java     |  58 ++++
 .../nifi/lookup/maxmind/AnonymousIpSchema.java  |  35 ++
 .../apache/nifi/lookup/maxmind/CitySchema.java  |  55 +++
 .../nifi/lookup/maxmind/ContainerSchema.java    |  37 +++
 .../nifi/lookup/maxmind/DatabaseReader.java     | 252 ++++++++++++++
 .../nifi/lookup/maxmind/IPLookupService.java    | 332 +++++++++++++++++++
 .../apache/nifi/lookup/maxmind/IspSchema.java   |  34 ++
 ...org.apache.nifi.controller.ControllerService |  17 +
 .../additionalDetails.html                      | 102 ++++++
 .../nifi-lookup-services-bundle/pom.xml         |  28 ++
 .../serialization/RecordSetWriterFactory.java   |   5 +-
 .../apache/nifi/avro/AvroRecordSetWriter.java   |   8 +-
 .../org/apache/nifi/avro/WriteAvroResult.java   |  14 +-
 .../avro/WriteAvroResultWithExternalSchema.java |  77 ++---
 .../nifi/avro/WriteAvroResultWithSchema.java    |  63 ++--
 .../org/apache/nifi/csv/CSVRecordSetWriter.java |   6 +-
 .../org/apache/nifi/csv/WriteCSVResult.java     |  89 ++---
 .../apache/nifi/json/JsonRecordSetWriter.java   |   6 +-
 .../org/apache/nifi/json/WriteJsonResult.java   |  75 ++---
 .../nifi/text/FreeFormTextRecordSetWriter.java  |   5 +-
 .../apache/nifi/text/FreeFormTextWriter.java    |  36 +-
 .../apache/nifi/avro/TestWriteAvroResult.java   |  27 +-
 .../avro/TestWriteAvroResultWithSchema.java     |   6 +-
 .../avro/TestWriteAvroResultWithoutSchema.java  |   6 +-
 .../org/apache/nifi/csv/TestWriteCSVResult.java |  57 ++--
 .../apache/nifi/json/TestWriteJsonResult.java   |  37 +--
 .../nifi-standard-services-api-nar/pom.xml      |   5 +
 nifi-nar-bundles/nifi-standard-services/pom.xml |   2 +
 pom.xml                                         |  16 +
 66 files changed, 2619 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
index 6a27761..9762f3e 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
@@ -16,7 +16,11 @@
  */
 package org.apache.nifi.controller;
 
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessorInitializationContext;
@@ -28,6 +32,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
     private ControllerServiceLookup serviceLookup;
     private ComponentLog logger;
     private StateManager stateManager;
+    private volatile ConfigurationContext configurationContext;
 
     @Override
     public final void initialize(final ControllerServiceInitializationContext context) throws InitializationException {
@@ -75,4 +80,27 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
     protected StateManager getStateManager() {
         return stateManager;
     }
+
+    @OnEnabled
+    public final void abstractStoreConfigContext(final ConfigurationContext configContext) {
+        this.configurationContext = configContext;
+    }
+
+    @OnDisabled
+    public final void abstractClearConfigContext() {
+        this.configurationContext = null;
+    }
+
+    protected ConfigurationContext getConfigurationContext() {
+        final ConfigurationContext context = this.configurationContext;
+        if (context == null) {
+            throw new IllegalStateException("No Configuration Context exists");
+        }
+
+        return configurationContext;
+    }
+
+    protected PropertyValue getProperty(final PropertyDescriptor descriptor) {
+        return getConfigurationContext().getProperty(descriptor);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 18cc689..5c7bff7 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -198,6 +198,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-lookup-services-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-poi-nar</artifactId>
             <type>nar</type>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
index 4447fed..b02deb4 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
@@ -103,7 +103,12 @@ public class StandardFieldValue implements FieldValue {
     public void updateValue(final Object newValue) {
         final Optional<Record> parentRecord = getParentRecord();
         if (!parentRecord.isPresent()) {
-            throw new UnsupportedOperationException("Cannot update the field value because the value is not associated with any record");
+            if (value instanceof Record) {
+                ((Record) value).setValue(getField().getFieldName(), newValue);
+                return;
+            } else {
+                throw new UnsupportedOperationException("Cannot update the field value because the value is not associated with any record");
+            }
         }
 
         parentRecord.get().setValue(getField().getFieldName(), newValue);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
new file mode 100644
index 0000000..5feb264
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+public abstract class AbstractRecordSetWriter implements RecordSetWriter {
+    private final OutputStream out;
+    private int recordCount = 0;
+    private boolean activeRecordSet = false;
+
+    public AbstractRecordSetWriter(final OutputStream out) {
+        this.out = out;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.out.close();
+    }
+
+    @Override
+    public WriteResult write(final RecordSet recordSet) throws IOException {
+        beginRecordSet();
+        Record record;
+        while ((record = recordSet.next()) != null) {
+            write(record);
+            recordCount++;
+        }
+        return finishRecordSet();
+    }
+
+    protected OutputStream getOutputStream() {
+        return out;
+    }
+
+    protected final int getRecordCount() {
+        return recordCount;
+    }
+
+    protected final boolean isRecordSetActive() {
+        return activeRecordSet;
+    }
+
+    @Override
+    public final void beginRecordSet() throws IOException {
+        if (activeRecordSet) {
+            throw new IllegalStateException("Cannot begin a RecordSet because a RecordSet has already begun");
+        }
+
+        activeRecordSet = true;
+        onBeginRecordSet();
+    }
+
+    @Override
+    public final WriteResult finishRecordSet() throws IOException {
+        if (!isRecordSetActive()) {
+            throw new IllegalStateException("Cannot finish RecordSet because no RecordSet has begun");
+        }
+
+        final Map<String, String> attributes = onFinishRecordSet();
+        return WriteResult.of(recordCount, attributes == null ? Collections.emptyMap() : attributes);
+    }
+
+    /**
+     * Method that is called as a result of {@link #beginRecordSet()} being called. This gives subclasses
+     * the chance to react to a new RecordSet beginning but prevents the subclass from changing how this
+     * implementation maintains its internal state. By default, this method does nothing.
+     *
+     * @throws IOException if unable to write the necessary data for a new RecordSet
+     */
+    protected void onBeginRecordSet() throws IOException {
+    }
+
+    /**
+     * Method that is called by {@link #finishRecordSet()} when a RecordSet is finished. This gives subclasses
+     * the chance to react to a RecordSet being completed but prevents the subclass from changing how this
+     * implementation maintains its internal state.
+     *
+     * @return a Map of key/value pairs that should be added to the FlowFile as attributes
+     */
+    protected Map<String, String> onFinishRecordSet() throws IOException {
+        return Collections.emptyMap();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java
index 7d6fa1c..7c29cfe 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java
@@ -37,9 +37,26 @@ public interface RecordSetWriter extends RecordWriter {
      * Writes the given result set to the given output stream
      *
      * @param recordSet the record set to serialize
-     * @param out the OutputStream to write to
+     *
+     * @return the results of writing the data
+     * @throws IOException if unable to write to the given OutputStream
+     */
+    WriteResult write(RecordSet recordSet) throws IOException;
+
+    /**
+     * Begins a new RecordSet
+     *
+     * @throws IOException if unable to write to the underlying OutputStream
+     * @throws IllegalStateException if a RecordSet has already been started
+     */
+    void beginRecordSet() throws IOException;
+
+    /**
+     * Finishes the currently active RecordSet and returns a WriteResult that includes information about what was written
+     *
      * @return the results of writing the data
      * @throws IOException if unable to write to the given OutputStream
+     * @throws IllegalStateException if a RecordSet has not been started via {@link #beginRecordSet()}
      */
-    WriteResult write(RecordSet recordSet, OutputStream out) throws IOException;
+    WriteResult finishRecordSet() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
index aa298d9..720953c 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
@@ -17,21 +17,20 @@
 
 package org.apache.nifi.serialization;
 
+import java.io.Closeable;
 import java.io.IOException;
-import java.io.OutputStream;
 
 import org.apache.nifi.serialization.record.Record;
 
-public interface RecordWriter {
+public interface RecordWriter extends Closeable {
     /**
      * Writes the given result set to the given output stream
      *
      * @param record the record set to serialize
-     * @param out the OutputStream to write to
      * @return the results of writing the data
      * @throws IOException if unable to write to the given OutputStream
      */
-    WriteResult write(Record record, OutputStream out) throws IOException;
+    WriteResult write(Record record) throws IOException;
 
     /**
      * @return the MIME Type that the Result Set Writer produces. This will be added to FlowFiles using

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
index 8883965..7cc6bb5 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
@@ -188,12 +188,14 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
 
                 final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
                 final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, new NullInputStream(0));
-                final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema);
 
                 final StopWatch stopWatch = new StopWatch(true);
 
                 // use a child FlowFile so that if any error occurs we can route the original untouched FlowFile to retry/failure
                 child = session.create(originalFlowFile);
+
+                final FlowFile writableFlowFile = child;
+                final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
                 child = session.write(child, (final OutputStream rawOut) -> {
                     try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
                          final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
@@ -212,7 +214,10 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
                             }
                         };
 
-                        writeResult.set(recordSetWriter.write(recordSet, out));
+                        try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) {
+                            writeResult.set(recordSetWriter.write(recordSet));
+                            mimeTypeRef.set(recordSetWriter.getMimeType());
+                        }
                     } catch (Exception e) {
                         exceptionHolder.set(e);
                     }
@@ -230,7 +235,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
 
                 final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes());
                 attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount()));
-                attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
+                attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
                 successFlowFile = session.putAllAttributes(successFlowFile, attributes);
 
                 final URI uri = path.toUri();

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
index e3ed23e..251eb46 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
@@ -42,7 +42,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
 public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
     private final List<Object[]> records = new ArrayList<>();
     private final List<RecordField> fields = new ArrayList<>();
-    private final int failAfterN;
+    private int failAfterN;
 
     public MockRecordParser() {
         this(-1);
@@ -52,6 +52,9 @@ public class MockRecordParser extends AbstractControllerService implements Recor
         this.failAfterN = failAfterN;
     }
 
+    public void failAfter(final int failAfterN) {
+        this.failAfterN = failAfterN;
+    }
 
     public void addSchemaField(final String fieldName, final RecordFieldType type) {
         fields.add(new RecordField(fieldName, type.getDataType()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index b4253ee..525a51f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -36,6 +36,10 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
     private final int failAfterN;
     private final boolean quoteValues;
 
+    public MockRecordWriter() {
+        this(null);
+    }
+
     public MockRecordWriter(final String header) {
         this(header, true, -1);
     }
@@ -56,12 +60,16 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) {
+    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
         return new RecordSetWriter() {
+            private int recordCount = 0;
+
             @Override
-            public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
-                out.write(header.getBytes());
-                out.write("\n".getBytes());
+            public WriteResult write(final RecordSet rs) throws IOException {
+                if (header != null) {
+                    out.write(header.getBytes());
+                    out.write("\n".getBytes());
+                }
 
                 int recordCount = 0;
                 Record record = null;
@@ -75,14 +83,14 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
                     int i = 0;
                     for (final String fieldName : record.getSchema().getFieldNames()) {
                         final String val = record.getAsString(fieldName);
-                        if (quoteValues) {
-                            out.write("\"".getBytes());
-                            if (val != null) {
+                        if (val != null) {
+                            if (quoteValues) {
+                                out.write("\"".getBytes());
+                                out.write(val.getBytes());
+                                out.write("\"".getBytes());
+                            } else {
                                 out.write(val.getBytes());
                             }
-                            out.write("\"".getBytes());
-                        } else if (val != null) {
-                            out.write(val.getBytes());
                         }
 
                         if (i++ < numCols - 1) {
@@ -101,20 +109,24 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
             }
 
             @Override
-            public WriteResult write(Record record, OutputStream out) throws IOException {
-                out.write(header.getBytes());
-                out.write("\n".getBytes());
+            public WriteResult write(Record record) throws IOException {
+                if (header != null) {
+                    out.write(header.getBytes());
+                    out.write("\n".getBytes());
+                }
 
                 final int numCols = record.getSchema().getFieldCount();
                 int i = 0;
                 for (final String fieldName : record.getSchema().getFieldNames()) {
                     final String val = record.getAsString(fieldName);
-                    if (quoteValues) {
-                        out.write("\"".getBytes());
-                        out.write(val.getBytes());
-                        out.write("\"".getBytes());
-                    } else {
-                        out.write(val.getBytes());
+                    if (val != null) {
+                        if (quoteValues) {
+                            out.write("\"".getBytes());
+                            out.write(val.getBytes());
+                            out.write("\"".getBytes());
+                        } else {
+                            out.write(val.getBytes());
+                        }
                     }
 
                     if (i++ < numCols - 1) {
@@ -123,8 +135,24 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
                 }
                 out.write("\n".getBytes());
 
+                recordCount++;
+
                 return WriteResult.of(1, Collections.emptyMap());
             }
+
+            @Override
+            public void close() throws IOException {
+                out.close();
+            }
+
+            @Override
+            public void beginRecordSet() throws IOException {
+            }
+
+            @Override
+            public WriteResult finishRecordSet() throws IOException {
+                return WriteResult.of(recordCount, Collections.emptyMap());
+            }
         };
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index b307930..62326a2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2460,19 +2460,21 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
                 @Override
                 public void close() throws IOException {
+                    if (closed) {
+                        return;
+                    }
+
+                    closed = true;
                     writeRecursionSet.remove(sourceFlowFile);
 
                     final long bytesWritten = countingOut.getBytesWritten();
-                    if (!closed) {
-                        StandardProcessSession.this.bytesWritten += bytesWritten;
-                        closed = true;
-                    }
+                    StandardProcessSession.this.bytesWritten += bytesWritten;
 
                     openOutputStreams.remove(sourceFlowFile);
 
+                    flush();
                     removeTemporaryClaim(record);
 
-                    flush();
                     final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
                         .fromFlowFile(record.getCurrent())
                         .contentClaim(updatedClaim)

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 1ccce07..563ece6 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -413,16 +413,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
 
         FlowFile flowFile = session.create();
         try {
-            final RecordSetWriter writer;
+            final RecordSchema schema;
+
             try {
-                final RecordSchema schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
-                writer = writerFactory.createWriter(logger, schema);
+                schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
             } catch (final Exception e) {
-                logger.error(
-                    "Failed to obtain a Record Writer for serializing Kafka messages. This generally happens because the "
-                        + "Record Writer cannot obtain the appropriate Schema, due to failure to connect to a remote Schema Registry "
-                        + "or due to the Schema Access Strategy being dependent upon FlowFile Attributes that are not available. "
-                        + "Will roll back the Kafka message offsets.", e);
+                logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
 
                 try {
                     rollback(topicPartition);
@@ -436,6 +432,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
 
             final FlowFile ff = flowFile;
             final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
+            final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
 
             flowFile = session.write(flowFile, rawOut -> {
                 final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
@@ -479,15 +476,28 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                     }
                 };
 
-                try (final OutputStream out = new BufferedOutputStream(rawOut)) {
-                    writeResult.set(writer.write(recordSet, out));
+                try (final OutputStream out = new BufferedOutputStream(rawOut);
+                    final RecordSetWriter writer = writerFactory.createWriter(logger, schema, ff, out)) {
+                    writeResult.set(writer.write(recordSet));
+                    mimeTypeRef.set(writer.getMimeType());
+                } catch (final Exception e) {
+                    logger.error("Failed to write records to FlowFile. Will roll back the Kafka message offsets.", e);
+
+                    try {
+                        rollback(topicPartition);
+                    } catch (final Exception rollbackException) {
+                        logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
+                    }
+
+                    yield();
+                    throw new ProcessException(e);
                 }
             });
 
             final WriteResult result = writeResult.get();
             if (result.getRecordCount() > 0) {
                 final Map<String, String> attributes = new HashMap<>(result.getAttributes());
-                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
                 attributes.put("record.count", String.valueOf(result.getRecordCount()));
 
                 attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
index 442ccc5..e48568b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
@@ -58,7 +58,6 @@ import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.RecordWriter;
 import org.apache.nifi.serialization.record.RecordSchema;
 
 @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"})
@@ -324,14 +323,13 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
                 final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
                 final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
 
-                final RecordWriter writer;
-                try (final InputStream in = new BufferedInputStream(session.read(flowFile))) {
-                    final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-                    final RecordSchema schema = writerFactory.getSchema(flowFile, in);
+                final RecordSchema schema;
+                final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-                    writer = writerFactory.createWriter(getLogger(), schema);
+                try (final InputStream in = new BufferedInputStream(session.read(flowFile))) {
+                    schema = writerFactory.getSchema(flowFile, in);
                 } catch (final Exception e) {
-                    getLogger().error("Failed to create a Record Writer for {}; routing to failure", new Object[] {flowFile, e});
+                    getLogger().error("Failed to determine Schema for writing messages to Kafka for {}; routing to failure", new Object[] {flowFile, e});
                     session.transfer(flowFile, REL_FAILURE);
                     continue;
                 }
@@ -342,7 +340,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
                         public void process(final InputStream rawIn) throws IOException {
                             try (final InputStream in = new BufferedInputStream(rawIn)) {
                                 final RecordReader reader = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, in, getLogger());
-                                lease.publish(flowFile, reader, writer, messageKeyField, topic);
+                                lease.publish(flowFile, reader, writerFactory, schema, messageKeyField, topic);
                             } catch (final SchemaNotFoundException | MalformedRecordException e) {
                                 throw new ProcessException(e);
                             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index be2697b..4b3a3ae 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -32,9 +32,12 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.stream.io.exception.TokenTooLargeException;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
@@ -93,7 +96,8 @@ public class PublisherLease implements Closeable {
         }
     }
 
-    void publish(final FlowFile flowFile, final RecordReader reader, final RecordWriter writer, final String messageKeyField, final String topic) throws IOException {
+    void publish(final FlowFile flowFile, final RecordReader reader, final RecordSetWriterFactory writerFactory, final RecordSchema schema,
+        final String messageKeyField, final String topic) throws IOException {
         if (tracker == null) {
             tracker = new InFlightMessageTracker();
         }
@@ -104,11 +108,11 @@ public class PublisherLease implements Closeable {
         final RecordSet recordSet = reader.createRecordSet();
         int recordCount = 0;
 
-        try {
+        try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, flowFile, baos)) {
             while ((record = recordSet.next()) != null) {
                 recordCount++;
                 baos.reset();
-                writer.write(record, baos);
+                writer.write(record);
 
                 final byte[] messageContent = baos.toByteArray();
                 final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
@@ -127,6 +131,8 @@ public class PublisherLease implements Closeable {
             }
         } catch (final TokenTooLargeException ttle) {
             tracker.fail(flowFile, ttle);
+        } catch (final SchemaNotFoundException snfe) {
+            throw new IOException(snfe);
         } catch (final Exception e) {
             tracker.fail(flowFile, e);
             poison();

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
index 8c6efb7..7cff2a7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
@@ -44,8 +44,8 @@ import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.RecordWriter;
 import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -65,7 +65,8 @@ public class TestPublishKafkaRecord_0_10 {
     public void setup() throws InitializationException, IOException {
         mockPool = mock(PublisherPool.class);
         mockLease = mock(PublisherLease.class);
-        Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), any(String.class), any(String.class));
+        Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class),
+            any(RecordSchema.class), any(String.class), any(String.class));
 
         when(mockPool.obtainPublisher()).thenReturn(mockLease);
 
@@ -103,7 +104,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
 
-        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -122,7 +123,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 3);
 
-        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -137,7 +138,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 1);
 
-        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
     }
@@ -154,7 +155,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 3);
 
-        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
     }
@@ -176,7 +177,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
 
-        verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
@@ -206,7 +207,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
 
-        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -240,7 +241,7 @@ public class TestPublishKafkaRecord_0_10 {
         runner.assertTransferCount(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
         runner.assertTransferCount(PublishKafkaRecord_0_10.REL_FAILURE, 2);
 
-        verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
index a1abda4..27df57b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
@@ -58,10 +58,10 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) {
+    public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
         return new RecordSetWriter() {
             @Override
-            public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
+            public WriteResult write(final RecordSet rs) throws IOException {
                 out.write(header.getBytes());
                 out.write("\n".getBytes());
 
@@ -102,7 +102,20 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
             }
 
             @Override
-            public WriteResult write(Record record, OutputStream out) throws IOException {
+            public WriteResult write(Record record) throws IOException {
+                return null;
+            }
+
+            @Override
+            public void close() throws IOException {
+            }
+
+            @Override
+            public void beginRecordSet() throws IOException {
+            }
+
+            @Override
+            public WriteResult finishRecordSet() throws IOException {
                 return null;
             }
         };

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
index 5b4819a..6ecfa59 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
@@ -217,11 +217,11 @@ public class FetchParquetTest {
         configure(proc);
 
         final RecordSetWriter recordSetWriter = Mockito.mock(RecordSetWriter.class);
-        when(recordSetWriter.write(any(RecordSet.class), any(OutputStream.class))).thenThrow(new IOException("IOException"));
+        when(recordSetWriter.write(any(RecordSet.class))).thenThrow(new IOException("IOException"));
 
         final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class);
         when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory");
-        when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class))).thenReturn(recordSetWriter);
+        when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class), any(FlowFile.class), any(OutputStream.class))).thenReturn(recordSetWriter);
 
         testRunner.addControllerService("mock-writer-factory", recordSetWriterFactory);
         testRunner.enableControllerService(recordSetWriterFactory);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
index 32b700f..904aab6 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
@@ -18,9 +18,11 @@
 package org.apache.nifi.schemaregistry.services;
 
 import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
 
 public class AvroSchemaValidator implements Validator {
 
@@ -36,7 +38,8 @@ public class AvroSchemaValidator implements Validator {
         }
 
         try {
-            new Schema.Parser().parse(input);
+            final Schema avroSchema = new Schema.Parser().parse(input);
+            AvroTypeUtil.createSchema(avroSchema, input, SchemaIdentifier.EMPTY);
 
             return new ValidationResult.Builder()
                 .input(input)

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
index 1dde047..b18e9de 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
@@ -34,6 +34,7 @@ import javax.script.Invocable;
 import javax.script.ScriptException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Collection;
 import java.util.HashSet;
@@ -52,15 +53,12 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
         super.onEnabled(context);
     }
 
-    public RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream in) throws SchemaNotFoundException, IOException {
-        return createWriter(logger, getSchema(flowFile, in));
-    }
 
     @Override
-    public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) throws SchemaNotFoundException, IOException {
+    public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException {
         if (recordFactory.get() != null) {
             try {
-                return recordFactory.get().createWriter(logger, schema);
+                return recordFactory.get().createWriter(logger, schema, flowFile, out);
             } catch (UndeclaredThrowableException ute) {
                 throw new IOException(ute.getCause());
             }
@@ -131,6 +129,7 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
             }
 
         } catch (final Exception ex) {
+            ex.printStackTrace();
             final ComponentLog logger = getLogger();
             final String message = "Unable to load script: " + ex.getLocalizedMessage();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
index 2e1c03d..96fda19 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
@@ -104,7 +104,9 @@ class ScriptedRecordSetWriterTest {
         InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)
 
 		def schema = recordSetWriterFactory.getSchema(mockFlowFile, inStream)
-        RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema)
+        
+		ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
+        RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, mockFlowFile, outputStream)
         assertNotNull(recordSetWriter)
 
         def recordSchema = new SimpleRecordSchema(
@@ -119,8 +121,7 @@ class ScriptedRecordSetWriterTest {
                 new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300])
         ] as MapRecord[]
 
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
-        recordSetWriter.write(RecordSet.of(recordSchema, records), outputStream)
+        recordSetWriter.write(RecordSet.of(recordSchema, records))
 
         def xml = new XmlSlurper().parseText(outputStream.toString())
         assertEquals('1', xml.record[0].id.toString())

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
index 4fae4fe..b0daaca 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
@@ -34,9 +34,15 @@ import org.apache.nifi.stream.io.NonCloseableOutputStream
 
 
 class GroovyRecordSetWriter implements RecordSetWriter {
-
+    private int recordCount = 0;
+    private final OutputStream out;
+    
+    public GroovyRecordSetWriter(final OutputStream out) {
+        this.out = out;
+    }
+    
     @Override
-    WriteResult write(Record r, OutputStream out) throws IOException {
+    WriteResult write(Record r) throws IOException {
         new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
             new MarkupBuilder(osw).record {
                 r.schema.fieldNames.each {fieldName ->
@@ -44,7 +50,9 @@ class GroovyRecordSetWriter implements RecordSetWriter {
                 }
             }
         }
-        WriteResult.of(0, [:])
+        
+        recordCount++;
+        WriteResult.of(1, [:])
     }
 
     @Override
@@ -53,10 +61,10 @@ class GroovyRecordSetWriter implements RecordSetWriter {
     }
 
     @Override
-    WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException {
+    WriteResult write(final RecordSet rs) throws IOException {
         int count = 0
 
-        new OutputStreamWriter(new NonCloseableOutputStream(rawOut)).with {osw ->
+        new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
             new MarkupBuilder(osw).recordSet {
 
                 Record r
@@ -73,6 +81,18 @@ class GroovyRecordSetWriter implements RecordSetWriter {
         }
         WriteResult.of(count, [:])
     }
+    
+    public void beginRecordSet() throws IOException {
+    }
+    
+    @Override
+    public WriteResult finishRecordSet() throws IOException {
+        return WriteResult.of(recordCount, [:]);
+    }
+    
+    @Override
+    public void close() throws IOException {
+    }
 }
 
 class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
@@ -83,9 +103,10 @@ class GroovyRecordSetWriterFactory extends AbstractControllerService implements
     }
     
     @Override
-    RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) throws SchemaNotFoundException, IOException {
-        return new GroovyRecordSetWriter()
+    RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException {
+        return new GroovyRecordSetWriter(out)
     }
+    
 }
 
 writer = new GroovyRecordSetWriterFactory()

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index a86c836..a0932e0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -42,6 +42,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-lookup-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-flowfile-packager</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
index b6cc83b..52fcbb8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
@@ -27,7 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
@@ -101,26 +101,27 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
 
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-        final RecordSetWriter writer;
         final RecordSchema writeSchema;
         try (final InputStream rawIn = session.read(flowFile);
             final InputStream in = new BufferedInputStream(rawIn)) {
             writeSchema = writerFactory.getSchema(flowFile, in);
-            writer = writerFactory.createWriter(getLogger(), writeSchema);
         } catch (final Exception e) {
-            getLogger().error("Failed to convert records for {}; will route to failure", new Object[] {flowFile, e});
+            getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e});
             session.transfer(flowFile, REL_FAILURE);
             return;
         }
 
-        final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
+        final Map<String, String> attributes = new HashMap<>();
+        final AtomicInteger recordCount = new AtomicInteger();
 
         final FlowFile original = flowFile;
         try {
             flowFile = session.write(flowFile, new StreamCallback() {
                 @Override
                 public void process(final InputStream in, final OutputStream out) throws IOException {
+
                     try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
+                        final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out);
 
                         final RecordSet recordSet = new RecordSet() {
                             @Override
@@ -151,8 +152,11 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
                             }
                         };
 
-                        final WriteResult writeResult = writer.write(recordSet, out);
-                        writeResultRef.set(writeResult);
+                        final WriteResult writeResult = writer.write(recordSet);
+                        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                        attributes.putAll(writeResult.getAttributes());
+                        recordCount.set(writeResult.getRecordCount());
 
                     } catch (final SchemaNotFoundException | MalformedRecordException e) {
                         throw new ProcessException("Could not parse incoming data", e);
@@ -160,22 +164,17 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
                 }
             });
         } catch (final Exception e) {
-            getLogger().error("Failed to convert {}", new Object[] {flowFile, e});
+            getLogger().error("Failed to process {}", new Object[] {flowFile, e});
             session.transfer(flowFile, REL_FAILURE);
             return;
         }
 
-        final WriteResult writeResult = writeResultRef.get();
-
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
-        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
-        attributes.putAll(writeResult.getAttributes());
-
         flowFile = session.putAllAttributes(flowFile, attributes);
         session.transfer(flowFile, REL_SUCCESS);
-        session.adjustCounter("Records Processed", writeResult.getRecordCount(), false);
-        getLogger().info("Successfully converted {} records for {}", new Object[] {writeResult.getRecordCount(), flowFile});
+
+        final int count = recordCount.get();
+        session.adjustCounter("Records Processed", count, false);
+        getLogger().info("Successfully converted {} records for {}", new Object[] {count, flowFile});
     }
 
     protected abstract Record process(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
new file mode 100644
index 0000000..955023f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("Specifies the Controller Service to use for reading incoming data")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .required(true)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Controller Service to use for writing out the records")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("If a FlowFile cannot be transformed from the configured input format to the configured output format, "
+            + "the unchanged FlowFile will be routed to this relationship")
+        .build();
+    static final Relationship REL_ORIGINAL = new Relationship.Builder()
+        .name("original")
+        .description("Once a FlowFile has been processed and any derivative FlowFiles have been transferred, the original FlowFile will be transferred to this relationship.")
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(RECORD_READER);
+        properties.add(RECORD_WRITER);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        if (isRouteOriginal()) {
+            relationships.add(REL_ORIGINAL);
+        }
+
+        relationships.add(REL_FAILURE);
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final T flowFileContext = getFlowFileContext(flowFile, context);
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final RecordSchema writeSchema;
+        try (final InputStream rawIn = session.read(flowFile);
+            final InputStream in = new BufferedInputStream(rawIn)) {
+            writeSchema = writerFactory.getSchema(flowFile, in);
+        } catch (final Exception e) {
+            getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final AtomicInteger numRecords = new AtomicInteger(0);
+        final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
+        final FlowFile original = flowFile;
+        try {
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
+
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+                            final Set<Relationship> relationships = route(record, writeSchema, original, context, flowFileContext);
+                            numRecords.incrementAndGet();
+
+                            for (final Relationship relationship : relationships) {
+                                final RecordSetWriter recordSetWriter;
+                                Tuple<FlowFile, RecordSetWriter> tuple = writers.get(relationship);
+                                if (tuple == null) {
+                                    FlowFile outFlowFile = session.create(original);
+                                    final OutputStream out = session.write(outFlowFile);
+                                    recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, original, out);
+                                    recordSetWriter.beginRecordSet();
+
+                                    tuple = new Tuple<>(outFlowFile, recordSetWriter);
+                                    writers.put(relationship, tuple);
+                                } else {
+                                    recordSetWriter = tuple.getValue();
+                                }
+
+                                recordSetWriter.write(record);
+                            }
+                        }
+                    } catch (final SchemaNotFoundException | MalformedRecordException e) {
+                        throw new ProcessException("Could not parse incoming data", e);
+                    }
+                }
+            });
+
+            for (final Map.Entry<Relationship, Tuple<FlowFile, RecordSetWriter>> entry : writers.entrySet()) {
+                final Relationship relationship = entry.getKey();
+                final Tuple<FlowFile, RecordSetWriter> tuple = entry.getValue();
+                final RecordSetWriter writer = tuple.getValue();
+                FlowFile childFlowFile = tuple.getKey();
+
+                final WriteResult writeResult = writer.finishRecordSet();
+
+                try {
+                    writer.close();
+                } catch (final IOException ioe) {
+                    getLogger().warn("Failed to close Writer for {}", new Object[] {childFlowFile});
+                }
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+
+                childFlowFile = session.putAllAttributes(childFlowFile, attributes);
+                session.transfer(childFlowFile, relationship);
+                session.adjustCounter("Records Processed", writeResult.getRecordCount(), false);
+                session.adjustCounter("Records Routed to " + relationship.getName(), writeResult.getRecordCount(), false);
+
+                session.getProvenanceReporter().route(childFlowFile, relationship);
+            }
+        } catch (final Exception e) {
+            getLogger().error("Failed to process {}", new Object[] {flowFile, e});
+
+            for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
+                try {
+                    tuple.getValue().close();
+                } catch (final Exception e1) {
+                    getLogger().warn("Failed to close Writer for {}; some resources may not be cleaned up appropriately", new Object[] {tuple.getKey()});
+                }
+
+                session.remove(tuple.getKey());
+            }
+
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        } finally {
+            for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
+                final RecordSetWriter writer = tuple.getValue();
+                try {
+                    writer.close();
+                } catch (final Exception e) {
+                    getLogger().warn("Failed to close Record Writer for {}; some resources may not be properly cleaned up", new Object[] {tuple.getKey(), e});
+                }
+            }
+        }
+
+        if (isRouteOriginal()) {
+            flowFile = session.putAttribute(flowFile, "record.count", String.valueOf(numRecords));
+            session.transfer(flowFile, REL_ORIGINAL);
+        } else {
+            session.remove(flowFile);
+        }
+
+        getLogger().info("Successfully processed {}, creating {} derivative FlowFiles and processing {} records", new Object[] {flowFile, writers.size(), numRecords});
+    }
+
+    protected abstract Set<Relationship> route(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context, T flowFileContext);
+
+    protected abstract boolean isRouteOriginal();
+
+    protected abstract T getFlowFileContext(FlowFile flowFile, ProcessContext context);
+}