You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/04/27 16:52:03 UTC

nifi git commit: NIFI-3659: This closes #1707. Added Processor to Split a FlowFile consisting of Record-oriented data into multiple FlowFiles, each containing a subset of the original FlowFile's records

Repository: nifi
Updated Branches:
  refs/heads/master 55b8c7dda -> a1bffbcc8


NIFI-3659: This closes #1707. Added Processor to Split a FlowFile consisting of Record-oriented data into multiple FlowFiles, each containing a subset of the original FlowFile's records

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a1bffbcc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a1bffbcc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a1bffbcc

Branch: refs/heads/master
Commit: a1bffbcc872c009e4de477f72a29ebe33310ec3f
Parents: 55b8c7d
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Apr 12 15:22:53 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Thu Apr 27 12:51:01 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/SplitRecord.java   | 206 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/standard/TestSplitRecord.java    | 180 ++++++++++++++++
 .../standard/util/record/MockRecordWriter.java  |  23 ++-
 .../serialization/record/PushBackRecordSet.java |  67 ++++++
 .../nifi/serialization/record/RecordSet.java    |  38 ++++
 6 files changed, 514 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bffbcc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
new file mode 100644
index 0000000..05aa98c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@SideEffectFree
+@Tags({"split", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"})
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer for the FlowFiles routed to the 'splits' Relationship."),
+    @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship.")
+})
+@CapabilityDescription("Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles")
+public class SplitRecord extends AbstractProcessor {
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+        .name("Record Reader")
+        .description("Specifies the Controller Service to use for reading incoming data")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .required(true)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("Record Writer")
+        .description("Specifies the Controller Service to use for writing out the records")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    static final PropertyDescriptor RECORDS_PER_SPLIT = new PropertyDescriptor.Builder()
+        .name("Records Per Split")
+        .description("Specifies how many records should be written to each 'split' or 'segment' FlowFile")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(true)
+        .build();
+
+    static final Relationship REL_SPLITS = new Relationship.Builder()
+        .name("splits")
+        .description("The individual 'segments' of the original FlowFile will be routed to this relationship.")
+        .build();
+    static final Relationship REL_ORIGINAL = new Relationship.Builder()
+        .name("original")
+        .description("Upon successfully splitting an input FlowFile, the original FlowFile will be sent to this relationship.")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("If a FlowFile cannot be transformed from the configured input format to the configured output format, "
+            + "the unchanged FlowFile will be routed to this relationship.")
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(RECORD_READER);
+        properties.add(RECORD_WRITER);
+        properties.add(RECORDS_PER_SPLIT);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SPLITS);
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final RecordSetWriter writer;
+        try (final InputStream rawIn = session.read(original);
+            final InputStream in = new BufferedInputStream(rawIn)) {
+            writer = writerFactory.createWriter(getLogger(), original, in);
+        } catch (final Exception e) {
+            getLogger().error("Failed to create Record Writer for {}; routing to failure", new Object[] {original, e});
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        final int maxRecords = context.getProperty(RECORDS_PER_SPLIT).evaluateAttributeExpressions(original).asInteger();
+
+        final List<FlowFile> splits = new ArrayList<>();
+        try {
+            session.read(original, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
+
+                        final RecordSet recordSet = reader.createRecordSet();
+                        final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
+
+                        while (pushbackSet.isAnotherRecord()) {
+                            FlowFile split = session.create(original);
+
+                            try {
+                                final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
+                                split = session.write(split, new OutputStreamCallback() {
+                                    @Override
+                                    public void process(final OutputStream out) throws IOException {
+                                        if (maxRecords == 1) {
+                                            final Record record = pushbackSet.next();
+                                            writeResultRef.set(writer.write(record, out));
+                                        } else {
+                                            final RecordSet limitedSet = pushbackSet.limit(maxRecords);
+                                            writeResultRef.set(writer.write(limitedSet, out));
+                                        }
+                                    }
+                                });
+
+                                final WriteResult writeResult = writeResultRef.get();
+
+                                final Map<String, String> attributes = new HashMap<>();
+                                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                                attributes.putAll(writeResult.getAttributes());
+
+                                session.adjustCounter("Records Split", writeResult.getRecordCount(), false);
+                                split = session.putAllAttributes(split, attributes);
+                            } finally {
+                                splits.add(split);
+                            }
+                        }
+                    } catch (final SchemaNotFoundException | MalformedRecordException e) {
+                        throw new ProcessException("Failed to parse incoming data", e);
+                    }
+                }
+            });
+        } catch (final ProcessException pe) {
+            getLogger().error("Failed to split {}", new Object[] {original, pe});
+            session.remove(splits);
+            session.transfer(original, REL_FAILURE);
+            return;
+        }
+
+        session.transfer(original, REL_ORIGINAL);
+        session.transfer(splits, REL_SPLITS);
+        getLogger().info("Successfully split {} into {} FlowFiles, each containing up to {} records", new Object[] {original, splits.size(), maxRecords});
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bffbcc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index f82c637..b4085c8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -88,6 +88,7 @@ org.apache.nifi.processors.standard.ScanContent
 org.apache.nifi.processors.standard.SegmentContent
 org.apache.nifi.processors.standard.SplitContent
 org.apache.nifi.processors.standard.SplitJson
+org.apache.nifi.processors.standard.SplitRecord
 org.apache.nifi.processors.standard.SplitText
 org.apache.nifi.processors.standard.SplitXml
 org.apache.nifi.processors.standard.TailFile

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bffbcc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java
new file mode 100644
index 0000000..4c3bff4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.nifi.processors.standard.util.record.MockRecordParser;
+import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestSplitRecord {
+
+    @Test
+    public void testIndividualRecordPerSplit() throws InitializationException {
+        final MockRecordParser readerService = new MockRecordParser();
+        final MockRecordWriter writerService = new MockRecordWriter("header", false);
+
+        final TestRunner runner = TestRunners.newTestRunner(SplitRecord.class);
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(SplitRecord.RECORD_READER, "reader");
+        runner.setProperty(SplitRecord.RECORD_WRITER, "writer");
+        runner.setProperty(SplitRecord.RECORDS_PER_SPLIT, "1");
+
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        readerService.addRecord("John Doe", 48);
+        readerService.addRecord("Jane Doe", 47);
+        readerService.addRecord("Jimmy Doe", 14);
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(SplitRecord.REL_SPLITS, 3);
+        runner.assertTransferCount(SplitRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitRecord.REL_FAILURE, 0);
+        final List<MockFlowFile> out = runner.getFlowFilesForRelationship(SplitRecord.REL_SPLITS);
+
+        for (final MockFlowFile mff : out) {
+            mff.assertAttributeEquals("record.count", "1");
+            mff.assertAttributeEquals("mime.type", "text/plain");
+        }
+
+        assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJohn Doe,48\n")).count());
+        assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJane Doe,47\n")).count());
+        assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJimmy Doe,14\n")).count());
+    }
+
+    @Test
+    public void testMultipleRecordsPerSplit() throws InitializationException {
+        final MockRecordParser readerService = new MockRecordParser();
+        final MockRecordWriter writerService = new MockRecordWriter("header", false);
+
+        final TestRunner runner = TestRunners.newTestRunner(SplitRecord.class);
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(SplitRecord.RECORD_READER, "reader");
+        runner.setProperty(SplitRecord.RECORD_WRITER, "writer");
+        runner.setProperty(SplitRecord.RECORDS_PER_SPLIT, "2");
+
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        readerService.addRecord("John Doe", 48);
+        readerService.addRecord("Jane Doe", 47);
+        readerService.addRecord("Jimmy Doe", 14);
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(SplitRecord.REL_SPLITS, 2);
+        runner.assertTransferCount(SplitRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitRecord.REL_FAILURE, 0);
+        final List<MockFlowFile> out = runner.getFlowFilesForRelationship(SplitRecord.REL_SPLITS);
+
+        assertEquals(1, out.stream().filter(mff -> mff.getAttribute("record.count").equals("1")).count());
+        assertTrue(out.stream().allMatch(mff -> mff.getAttribute("mime.type").equals("text/plain")));
+
+        assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJohn Doe,48\nJane Doe,47\n")).count());
+        assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJimmy Doe,14\n")).count());
+    }
+
+    @Test
+    public void testAllSplitsOneDesintation() throws InitializationException {
+        final MockRecordParser readerService = new MockRecordParser();
+        final MockRecordWriter writerService = new MockRecordWriter("header", false);
+
+        final TestRunner runner = TestRunners.newTestRunner(SplitRecord.class);
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(SplitRecord.RECORD_READER, "reader");
+        runner.setProperty(SplitRecord.RECORD_WRITER, "writer");
+        runner.setProperty(SplitRecord.RECORDS_PER_SPLIT, "3");
+
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        readerService.addRecord("John Doe", 48);
+        readerService.addRecord("Jane Doe", 47);
+        readerService.addRecord("Jimmy Doe", 14);
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(SplitRecord.REL_SPLITS, 1);
+        runner.assertTransferCount(SplitRecord.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitRecord.REL_FAILURE, 0);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(SplitRecord.REL_SPLITS).get(0);
+
+        out.assertAttributeEquals("record.count", "3");
+        out.assertAttributeEquals("mime.type", "text/plain");
+
+        out.assertContentEquals("header\nJohn Doe,48\nJane Doe,47\nJimmy Doe,14\n");
+    }
+
+
+    @Test
+    public void testReadFailure() throws InitializationException {
+        final MockRecordParser readerService = new MockRecordParser(2);
+        final MockRecordWriter writerService = new MockRecordWriter("header", false);
+
+        final TestRunner runner = TestRunners.newTestRunner(SplitRecord.class);
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(SplitRecord.RECORD_READER, "reader");
+        runner.setProperty(SplitRecord.RECORD_WRITER, "writer");
+        runner.setProperty(SplitRecord.RECORDS_PER_SPLIT, "1");
+
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        readerService.addRecord("John Doe", 48);
+        readerService.addRecord("Jane Doe", 47);
+        readerService.addRecord("Jimmy Doe", 14);
+
+        final MockFlowFile original = runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(SplitRecord.REL_FAILURE, 1);
+        final MockFlowFile failed = runner.getFlowFilesForRelationship(SplitRecord.REL_FAILURE).get(0);
+        assertTrue(original == failed);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bffbcc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
index 1dbfd04..ca16bcd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
@@ -96,7 +96,28 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
 
             @Override
             public WriteResult write(Record record, OutputStream out) throws IOException {
-                return null;
+                out.write(header.getBytes());
+                out.write("\n".getBytes());
+
+                final int numCols = record.getSchema().getFieldCount();
+                int i = 0;
+                for (final String fieldName : record.getSchema().getFieldNames()) {
+                    final String val = record.getAsString(fieldName);
+                    if (quoteValues) {
+                        out.write("\"".getBytes());
+                        out.write(val.getBytes());
+                        out.write("\"".getBytes());
+                    } else {
+                        out.write(val.getBytes());
+                    }
+
+                    if (i++ < numCols - 1) {
+                        out.write(",".getBytes());
+                    }
+                }
+                out.write("\n".getBytes());
+
+                return WriteResult.of(1, Collections.emptyMap());
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bffbcc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java
new file mode 100644
index 0000000..a186611
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/PushBackRecordSet.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import java.io.IOException;
+
+public class PushBackRecordSet implements RecordSet {
+    private final RecordSet original;
+    private Record pushback;
+
+    public PushBackRecordSet(final RecordSet original) {
+        this.original = original;
+    }
+
+    @Override
+    public RecordSchema getSchema() throws IOException {
+        return original.getSchema();
+    }
+
+    @Override
+    public Record next() throws IOException {
+        if (pushback != null) {
+            final Record record = pushback;
+            pushback = null;
+            return record;
+        }
+
+        return original.next();
+    }
+
+    public void pushback(final Record record) {
+        if (pushback != null) {
+            throw new IllegalStateException("RecordSet already has a Record pushed back. Cannot push back more than one record at a time.");
+        }
+
+        this.pushback = record;
+    }
+
+    public boolean isAnotherRecord() throws IOException {
+        if (pushback != null) {
+            return true;
+        }
+
+        final Record nextRecord = next();
+        if (nextRecord == null) {
+            return false;
+        }
+
+        pushback(nextRecord);
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a1bffbcc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSet.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSet.java
index 25bbcdc..9e67346 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSet.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/RecordSet.java
@@ -31,6 +31,44 @@ public interface RecordSet {
      */
     Record next() throws IOException;
 
+    /**
+     * Returns a new Record Set that will return no more than {@code maxRecords} records from this
+     * RecordSet. Any Records that are pulled from this newly created RecordSet will also advance
+     * the cursor in this Record Set and vice versa.
+     *
+     * @param maxRecords the maximum number of records to return from the new RecordSet
+     * @return a view of this RecordSet that limits the number of records returned
+     */
+    default RecordSet limit(final int maxRecords) {
+        if (maxRecords < 0) {
+            throw new IllegalArgumentException("Cannot limit number of records to " + maxRecords + ". Limit must be a non-negative integer");
+        }
+
+        final RecordSet original = this;
+        return new RecordSet() {
+            private int count = 0;
+
+            @Override
+            public RecordSchema getSchema() throws IOException {
+                return original.getSchema();
+            }
+
+            @Override
+            public Record next() throws IOException {
+                if (count >= maxRecords) {
+                    return null;
+                }
+
+                final Record record = original.next();
+                if (record != null) {
+                    count++;
+                }
+
+                return record;
+            }
+        };
+    }
+
     public static RecordSet of(final RecordSchema schema, final Record... records) {
         return new RecordSet() {
             private int index = 0;