You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/06/19 15:30:54 UTC

[nifi] branch master updated: NIFI-7509: Added optional Record Writer property to all List* Processors

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a16002  NIFI-7509: Added optional Record Writer property to all List* Processors
0a16002 is described below

commit 0a16002076ad563ce6464c1ecb0440c6caec357b
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Jun 4 21:27:56 2020 -0400

    NIFI-7509: Added optional Record Writer property to all List* Processors
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4315.
---
 .../nifi-aws-bundle/nifi-aws-processors/pom.xml    |   5 +
 .../org/apache/nifi/processors/aws/s3/ListS3.java  | 451 ++++++++++++----
 .../additionalDetails.html                         | 114 ++++
 .../apache/nifi/processors/aws/s3/TestListS3.java  | 106 ++--
 .../azure/storage/ListAzureBlobStorage.java        |   7 +
 .../processors/azure/storage/utils/BlobInfo.java   |  61 +++
 .../nifi-processor-utils/pom.xml                   |  17 +-
 .../processor/util/list/AbstractListProcessor.java | 119 ++++-
 .../nifi/processor/util/list/ListableEntity.java   |   8 +
 .../processor/util/list/ListedEntityTracker.java   |  94 +++-
 .../util/list/TestAbstractListProcessor.java       |  61 ++-
 .../nifi/processors/gcp/AbstractGCPProcessor.java  |  16 +-
 .../processors/gcp/storage/FetchGCSObject.java     | 157 +-----
 .../nifi/processors/gcp/storage/ListGCSBucket.java | 435 ++++++++++-----
 .../processors/gcp/storage/StorageAttributes.java  |  79 +++
 .../additionalDetails.html                         | 144 +++++
 .../processors/gcp/storage/ListGCSBucketTest.java  | 581 ++++-----------------
 .../nifi-hdfs-processors/pom.xml                   |  10 +
 .../apache/nifi/processors/hadoop/ListHDFS.java    | 133 ++++-
 .../additionalDetails.html                         |  89 +++-
 .../nifi/processors/hadoop/TestListHDFS.java       |  73 ++-
 .../processors/standard/ListDatabaseTables.java    | 222 +++++++-
 .../apache/nifi/processors/standard/ListFTP.java   |  12 +-
 .../apache/nifi/processors/standard/ListFile.java  |   7 +
 .../nifi/processors/standard/ListFileTransfer.java |   7 +
 .../apache/nifi/processors/standard/ListSFTP.java  |  28 +-
 .../nifi/processors/standard/util/FileInfo.java    |  52 +-
 .../ListFTP-batch-high-level-flow.png              | Bin 0 -> 480307 bytes
 .../ListFTP-batch-processing.png                   | Bin 0 -> 467655 bytes
 .../additionalDetails.html                         | 150 ++++++
 .../ListFile-batch-high-level-flow.png             | Bin 0 -> 469420 bytes
 .../ListFile-batch-processing.png                  | Bin 0 -> 468988 bytes
 .../additionalDetails.html                         | 150 ++++++
 .../ListSFTP-batch-high-level-flow.png             | Bin 0 -> 467641 bytes
 .../ListSFTP-batch-processing.png                  | Bin 0 -> 479184 bytes
 .../additionalDetails.html                         | 150 ++++++
 .../standard/TestListDatabaseTables.java           |  34 ++
 37 files changed, 2539 insertions(+), 1033 deletions(-)

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index 6de133f..8d25db2 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -74,6 +74,11 @@
             <groupId>com.amazonaws</groupId>
             <artifactId>aws-java-sdk-sts</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 34c1dec..b9ebcd6 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -17,6 +17,9 @@
 package org.apache.nifi.processors.aws.s3;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -26,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.amazonaws.services.s3.internal.Constants;
 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
 import com.amazonaws.services.s3.model.GetObjectTaggingResult;
@@ -60,12 +64,23 @@ import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import com.amazonaws.services.s3.AmazonS3;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 
 @PrimaryNodeOnly
 @TriggerSerially
@@ -173,21 +188,49 @@ public class ListS3 extends AbstractS3Processor {
     public static final PropertyDescriptor WRITE_USER_METADATA = new PropertyDescriptor.Builder()
             .name("write-s3-user-metadata")
             .displayName("Write User Metadata")
-            .description("If set to 'True', the user defined metadata associated with the S3 object will be written as FlowFile attributes")
+            .description("If set to 'True', the user defined metadata associated with the S3 object will be added to FlowFile attributes/records")
             .required(true)
             .allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False"))
             .defaultValue("false")
             .build();
 
-
-    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
-            Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, WRITE_OBJECT_TAGS, WRITE_USER_METADATA, CREDENTIALS_FILE,
-                    AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
-                    SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME,
-                    PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE, REQUESTER_PAYS));
-
-    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
-            new HashSet<>(Collections.singletonList(REL_SUCCESS)));
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
+            "all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.")
+        .required(false)
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .build();
+
+
+    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+        BUCKET,
+        REGION,
+        ACCESS_KEY,
+        SECRET_KEY,
+        RECORD_WRITER,
+        MIN_AGE,
+        WRITE_OBJECT_TAGS,
+        WRITE_USER_METADATA,
+        CREDENTIALS_FILE,
+        AWS_CREDENTIALS_PROVIDER_SERVICE,
+        TIMEOUT,
+        SSL_CONTEXT_SERVICE,
+        ENDPOINT_OVERRIDE,
+        SIGNER_OVERRIDE,
+        PROXY_CONFIGURATION_SERVICE,
+        PROXY_HOST,
+        PROXY_HOST_PORT,
+        PROXY_USERNAME,
+        PROXY_PASSWORD,
+        DELIMITER,
+        PREFIX,
+        USE_VERSIONS,
+        LIST_TYPE,
+        REQUESTER_PAYS));
+
+    public static final Set<Relationship> relationships = Collections.singleton(REL_SUCCESS);
 
     public static final String CURRENT_TIMESTAMP = "currentTimestamp";
     public static final String CURRENT_KEY_PREFIX = "key-";
@@ -304,65 +347,89 @@ public class ListS3 extends AbstractS3Processor {
         final Set<String> listedKeys = new HashSet<>();
         getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", new Object[]{listingTimestamp, currentTimestamp, currentKeys});
 
-        do {
-            versionListing = bucketLister.listVersions();
-            for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
-                long lastModified = versionSummary.getLastModified().getTime();
-                if (lastModified < currentTimestamp
-                        || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())
-                        || lastModified > (listingTimestamp - minAgeMilliseconds)) {
-                    continue;
-                }
-
-                getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys});
-
-                // Create the attributes
-                final Map<String, String> attributes = new HashMap<>();
-                attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
-                attributes.put("s3.bucket", versionSummary.getBucketName());
-                if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
-                    attributes.put("s3.owner", versionSummary.getOwner().getId());
-                }
-                attributes.put("s3.etag", versionSummary.getETag());
-                attributes.put("s3.lastModified", String.valueOf(lastModified));
-                attributes.put("s3.length", String.valueOf(versionSummary.getSize()));
-                attributes.put("s3.storeClass", versionSummary.getStorageClass());
-                attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest()));
-                if (versionSummary.getVersionId() != null) {
-                    attributes.put("s3.version", versionSummary.getVersionId());
-                }
-
-                if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
-                    attributes.putAll(writeObjectTags(client, versionSummary));
-                }
-                if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
-                    attributes.putAll(writeUserMetadata(client, versionSummary));
-                }
-
-                // Create the flowfile
-                FlowFile flowFile = session.create();
-                flowFile = session.putAllAttributes(flowFile, attributes);
-                session.transfer(flowFile, REL_SUCCESS);
-
-                // Track the latest lastModified timestamp and keys having that timestamp.
-                // NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps.
-                if (lastModified > latestListedTimestampInThisCycle) {
-                    latestListedTimestampInThisCycle = lastModified;
-                    listedKeys.clear();
-                    listedKeys.add(versionSummary.getKey());
-
-                } else if (lastModified == latestListedTimestampInThisCycle) {
-                    listedKeys.add(versionSummary.getKey());
-                }
+        final S3ObjectWriter writer;
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        if (writerFactory == null) {
+            writer = new AttributeObjectWriter(session);
+        } else {
+            writer = new RecordObjectWriter(session, writerFactory, getLogger());
+        }
 
-                listCount++;
-            }
-            bucketLister.setNextMarker();
+        try {
+            writer.beginListing();
+
+                do {
+                    versionListing = bucketLister.listVersions();
+                    for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
+                        long lastModified = versionSummary.getLastModified().getTime();
+                        if (lastModified < currentTimestamp
+                            || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())
+                            || lastModified > (listingTimestamp - minAgeMilliseconds)) {
+                            continue;
+                        }
+
+                        getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys});
+
+                        // Get object tags if configured to do so
+                        GetObjectTaggingResult taggingResult = null;
+                        if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
+                            try {
+                                taggingResult = client.getObjectTagging(new GetObjectTaggingRequest(versionSummary.getBucketName(), versionSummary.getKey()));
+                            } catch (final Exception e) {
+                                getLogger().warn("Failed to obtain Object Tags for S3 Object {} in bucket {}. Will list S3 Object without the object tags",
+                                    new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
+                            }
+                        }
+
+                        // Get user metadata if configured to do so
+                        ObjectMetadata objectMetadata = null;
+                        if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
+                            try {
+                                objectMetadata = client.getObjectMetadata(new GetObjectMetadataRequest(versionSummary.getBucketName(), versionSummary.getKey()));
+                            } catch (final Exception e) {
+                                getLogger().warn("Failed to obtain User Metadata for S3 Object {} in bucket {}. Will list S3 Object without the user metadata",
+                                    new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
+                            }
+                        }
+
+                        // Write the entity to the listing
+                        writer.addToListing(versionSummary, taggingResult, objectMetadata);
+
+                        // Track the latest lastModified timestamp and keys having that timestamp.
+                        // NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps.
+                        if (lastModified > latestListedTimestampInThisCycle) {
+                            latestListedTimestampInThisCycle = lastModified;
+                            listedKeys.clear();
+                            listedKeys.add(versionSummary.getKey());
+
+                        } else if (lastModified == latestListedTimestampInThisCycle) {
+                            listedKeys.add(versionSummary.getKey());
+                        }
+
+                        listCount++;
+                    }
+                    bucketLister.setNextMarker();
+
+                    totalListCount += listCount;
+
+                    if (listCount > 0 && writer.isCheckpoint()) {
+                        getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount});
+                        session.commit();
+                    }
+
+                    listCount = 0;
+                } while (bucketLister.isTruncated());
+
+                writer.finishListing();
+        } catch (final Exception e) {
+            getLogger().error("Failed to list contents of bucket due to {}", new Object[] {e}, e);
+            writer.finishListingExceptionally(e);
+            session.rollback();
+            context.yield();
+            return;
+        }
 
-            totalListCount += listCount;
-            commit(context, session, listCount);
-            listCount = 0;
-        } while (bucketLister.isTruncated());
+        session.commit();
 
         // Update currentKeys.
         if (latestListedTimestampInThisCycle > currentTimestamp) {
@@ -383,50 +450,17 @@ public class ListS3 extends AbstractS3Processor {
         }
     }
 
-    private boolean commit(final ProcessContext context, final ProcessSession session, int listCount) {
-        boolean willCommit = listCount > 0;
-        if (willCommit) {
-            getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount});
-            session.commit();
-        }
-        return willCommit;
-    }
-
-    private Map<String, String> writeObjectTags(AmazonS3 client, S3VersionSummary versionSummary) {
-        final GetObjectTaggingResult taggingResult = client.getObjectTagging(new GetObjectTaggingRequest(versionSummary.getBucketName(), versionSummary.getKey()));
-        final Map<String, String> tagMap = new HashMap<>();
-
-        if (taggingResult != null) {
-            final List<Tag> tags = taggingResult.getTagSet();
-
-            for (final Tag tag : tags) {
-                tagMap.put("s3.tag." + tag.getKey(), tag.getValue());
-            }
-        }
-        return tagMap;
-    }
-
-    private Map<String, String> writeUserMetadata(AmazonS3 client, S3VersionSummary versionSummary) {
-        ObjectMetadata objectMetadata = client.getObjectMetadata(new GetObjectMetadataRequest(versionSummary.getBucketName(), versionSummary.getKey()));
-        final Map<String, String> metadata = new HashMap<>();
-        if (objectMetadata != null) {
-            for (Map.Entry<String, String> e : objectMetadata.getUserMetadata().entrySet()) {
-                metadata.put("s3.user.metadata." + e.getKey(), e.getValue());
-            }
-        }
-        return metadata;
-    }
 
     private interface S3BucketLister {
-        public void setBucketName(String bucketName);
-        public void setPrefix(String prefix);
-        public void setDelimiter(String delimiter);
-        public void setRequesterPays(boolean requesterPays);
+        void setBucketName(String bucketName);
+        void setPrefix(String prefix);
+        void setDelimiter(String delimiter);
+        void setRequesterPays(boolean requesterPays);
         // Versions have a superset of the fields that Objects have, so we'll use
         // them as a common interface
-        public VersionListing listVersions();
-        public void setNextMarker();
-        public boolean isTruncated();
+        VersionListing listVersions();
+        void setNextMarker();
+        boolean isTruncated();
     }
 
     public class S3ObjectBucketLister implements S3BucketLister {
@@ -597,4 +631,207 @@ public class ListS3 extends AbstractS3Processor {
             return (versionListing == null) ? false : versionListing.isTruncated();
         }
     }
+
+    interface S3ObjectWriter {
+        void beginListing() throws IOException, SchemaNotFoundException;
+
+        void addToListing(S3VersionSummary summary, GetObjectTaggingResult taggingResult, ObjectMetadata objectMetadata) throws IOException;
+
+        void finishListing() throws IOException;
+
+        void finishListingExceptionally(Exception cause);
+
+        boolean isCheckpoint();
+    }
+
+    static class RecordObjectWriter implements S3ObjectWriter {
+        private static final RecordSchema RECORD_SCHEMA;
+
+        private static final String KEY = "key";
+        private static final String BUCKET = "bucket";
+        private static final String OWNER = "owner";
+        private static final String ETAG = "etag";
+        private static final String LAST_MODIFIED = "lastModified";
+        private static final String SIZE = "size";
+        private static final String STORAGE_CLASS = "storageClass";
+        private static final String IS_LATEST = "latest";
+        private static final String VERSION_ID = "versionId";
+        private static final String TAGS = "tags";
+        private static final String USER_METADATA = "userMetadata";
+
+        static {
+            final List<RecordField> fields = new ArrayList<>();
+            fields.add(new RecordField(KEY, RecordFieldType.STRING.getDataType(), false));
+            fields.add(new RecordField(BUCKET, RecordFieldType.STRING.getDataType(), false));
+            fields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType(), true));
+            fields.add(new RecordField(ETAG, RecordFieldType.STRING.getDataType(), false));
+            fields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
+            fields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
+            fields.add(new RecordField(STORAGE_CLASS, RecordFieldType.STRING.getDataType(), false));
+            fields.add(new RecordField(IS_LATEST, RecordFieldType.BOOLEAN.getDataType(), false));
+            fields.add(new RecordField(VERSION_ID, RecordFieldType.STRING.getDataType(), true));
+            fields.add(new RecordField(TAGS, RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()), true));
+            fields.add(new RecordField(USER_METADATA, RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()), true));
+
+            RECORD_SCHEMA = new SimpleRecordSchema(fields);
+        }
+
+
+        private final ProcessSession session;
+        private final RecordSetWriterFactory writerFactory;
+        private final ComponentLog logger;
+        private RecordSetWriter recordWriter;
+        private FlowFile flowFile;
+
+        public RecordObjectWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory, final ComponentLog logger) {
+            this.session = session;
+            this.writerFactory = writerFactory;
+            this.logger = logger;
+        }
+
+        @Override
+        public void beginListing() throws IOException, SchemaNotFoundException {
+            flowFile = session.create();
+
+            final OutputStream out = session.write(flowFile);
+            recordWriter = writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile);
+            recordWriter.beginRecordSet();
+        }
+
+        @Override
+        public void addToListing(final S3VersionSummary summary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) throws IOException {
+            recordWriter.write(createRecordForListing(summary, taggingResult, objectMetadata));
+        }
+
+        @Override
+        public void finishListing() throws IOException {
+            final WriteResult writeResult = recordWriter.finishRecordSet();
+            recordWriter.close();
+
+            if (writeResult.getRecordCount() == 0) {
+                session.remove(flowFile);
+            } else {
+                final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                flowFile = session.putAllAttributes(flowFile, attributes);
+
+                session.transfer(flowFile, REL_SUCCESS);
+            }
+        }
+
+        @Override
+        public void finishListingExceptionally(final Exception cause) {
+            try {
+                recordWriter.close();
+            } catch (IOException e) {
+                logger.error("Failed to write listing as Records due to {}", new Object[] {e}, e);
+            }
+
+            session.remove(flowFile);
+        }
+
+        @Override
+        public boolean isCheckpoint() {
+            return false;
+        }
+
+        private Record createRecordForListing(final S3VersionSummary versionSummary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) {
+            final Map<String, Object> values = new HashMap<>();
+            values.put(KEY, versionSummary.getKey());
+            values.put(BUCKET, versionSummary.getBucketName());
+
+            if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
+                values.put(OWNER, versionSummary.getOwner().getId());
+            }
+
+            values.put(ETAG, versionSummary.getETag());
+            values.put(LAST_MODIFIED, new Timestamp(versionSummary.getLastModified().getTime()));
+            values.put(SIZE, versionSummary.getSize());
+            values.put(STORAGE_CLASS, versionSummary.getStorageClass());
+            values.put(IS_LATEST, versionSummary.isLatest());
+            final String versionId = versionSummary.getVersionId();
+            if (versionId != null && !versionId.equals(Constants.NULL_VERSION_ID)) {
+                values.put(VERSION_ID, versionSummary.getVersionId());
+            }
+
+            if (taggingResult != null) {
+                final Map<String, String> tags = new HashMap<>();
+                taggingResult.getTagSet().forEach(tag -> {
+                    tags.put(tag.getKey(), tag.getValue());
+                });
+
+                values.put(TAGS, tags);
+            }
+
+            if (objectMetadata != null) {
+                values.put(USER_METADATA, objectMetadata.getUserMetadata());
+            }
+
+            return new MapRecord(RECORD_SCHEMA, values);
+        }
+    }
+
+
+
+    static class AttributeObjectWriter implements S3ObjectWriter {
+        private final ProcessSession session;
+
+        public AttributeObjectWriter(final ProcessSession session) {
+            this.session = session;
+        }
+
+        @Override
+        public void beginListing() {
+        }
+
+        @Override
+        public void addToListing(final S3VersionSummary versionSummary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) {
+            // Create the attributes
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
+            attributes.put("s3.bucket", versionSummary.getBucketName());
+            if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
+                attributes.put("s3.owner", versionSummary.getOwner().getId());
+            }
+            attributes.put("s3.etag", versionSummary.getETag());
+            attributes.put("s3.lastModified", String.valueOf(versionSummary.getLastModified().getTime()));
+            attributes.put("s3.length", String.valueOf(versionSummary.getSize()));
+            attributes.put("s3.storeClass", versionSummary.getStorageClass());
+            attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest()));
+            if (versionSummary.getVersionId() != null) {
+                attributes.put("s3.version", versionSummary.getVersionId());
+            }
+
+            if (taggingResult != null) {
+                final List<Tag> tags = taggingResult.getTagSet();
+                for (final Tag tag : tags) {
+                    attributes.put("s3.tag." + tag.getKey(), tag.getValue());
+                }
+            }
+
+            if (objectMetadata != null) {
+                for (Map.Entry<String, String> e : objectMetadata.getUserMetadata().entrySet()) {
+                    attributes.put("s3.user.metadata." + e.getKey(), e.getValue());
+                }
+            }
+
+            // Create the flowfile
+            FlowFile flowFile = session.create();
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            session.transfer(flowFile, REL_SUCCESS);
+        }
+
+        @Override
+        public void finishListing() throws IOException {
+        }
+
+        @Override
+        public void finishListingExceptionally(final Exception cause) {
+        }
+
+        @Override
+        public boolean isCheckpoint() {
+            return false;
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.ListS3/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.ListS3/additionalDetails.html
new file mode 100644
index 0000000..c717e98
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.ListS3/additionalDetails.html
@@ -0,0 +1,114 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+<head>
+    <meta charset="utf-8"/>
+    <title>ListS3</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+<body>
+
+<h1>Streaming Versus Batch Processing</h1>
+
+<p>
+    ListS3 performs a listing of all S3 Objects that it encounters in the configured S3 bucket.
+    There are two common, broadly defined use cases.
+</p>
+
+<h3>Streaming Use Case</h3>
+
+<p>
+    By default, the Processor will create a separate FlowFile for each object in the bucket and add attributes for filename, bucket, etc.
+    A common use case is to connect ListS3 to the FetchS3 processor. These two processors used in conjunction with one another provide the ability to
+    easily monitor a bucket and fetch the contents of any new object as it lands in S3 in an efficient streaming fashion.
+</p>
+
+<h3>Batch Use Case</h3>
+<p>
+    Another common use case is the desire to process all newly arriving objects in a given bucket, and to then perform some action
+    only when all objects have completed their processing. The above approach of streaming the data makes this difficult, because NiFi is inherently
+    a streaming platform in that there is no "job" that has a beginning and an end. Data is simply picked up as it becomes available.
+</p>
+
+<p>
+    To solve this, the ListS3 Processor can optionally be configured with a Record Writer. When a Record Writer is configured, a single
+    FlowFile will be created that will contain a Record for each object in the bucket, instead of a separate FlowFile per object.
+    See the documentation for ListFile for an example of how to build a dataflow that allows for processing all of the objects before proceeding
+    with any other step.
+</p>
+
+<p>
+    One important difference between the data produced by ListFile and ListS3, though, is the structure of the Records that are emitted. The Records
+    emitted by ListFile have a different schema than those emitted by ListS3. ListS3 emits records that follow the following schema (in Avro format):
+</p>
+
+<code>
+    <pre>
+{
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [{
+    "name": "key",
+    "type": "string"
+  }, {
+    "name": "bucket",
+    "type": "string"
+  }, {
+    "name": "owner",
+    "type": ["null", "string"]
+  }, {
+    "name": "etag",
+    "type": "string"
+  }, {
+    "name": "lastModified",
+    "type": {
+      "type": "long",
+      "logicalType": "timestamp-millis"
+    }
+  }, {
+    "name": "size",
+    "type": "long"
+  }, {
+    "name": "storageClass",
+    "type": "string"
+  }, {
+    "name": "latest",
+    "type": "boolean"
+  }, {
+    "name": "versionId",
+    "type": ["null", "string"]
+  }, {
+    "name": "tags",
+    "type": ["null", {
+      "type": "map",
+      "values": "string"
+    }]
+  }, {
+    "name": "userMetadata",
+    "type": ["null", {
+      "type": "map",
+      "values": "string"
+    }]
+  }]
+}
+    </pre>
+</code>
+
+
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index 679ca7b..cab9e32 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -16,13 +16,7 @@
  */
 package org.apache.nifi.processors.aws.s3;
 
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
@@ -34,21 +28,28 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.amazonaws.services.s3.model.S3VersionSummary;
 import com.amazonaws.services.s3.model.VersionListing;
 import org.apache.commons.lang3.time.DateUtils;
-import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-
-import com.amazonaws.services.s3.AmazonS3Client;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -120,6 +121,57 @@ public class TestListS3 {
     }
 
     @Test
+    public void testListWithRecords() throws InitializationException {
+        runner.setProperty(ListS3.REGION, "eu-west-1");
+        runner.setProperty(ListS3.BUCKET, "test-bucket");
+
+        final MockRecordWriter recordWriter = new MockRecordWriter(null, false);
+        runner.addControllerService("record-writer", recordWriter);
+        runner.enableControllerService(recordWriter);
+        runner.setProperty(ListS3.RECORD_WRITER, "record-writer");
+
+        Date lastModified = new Date();
+        ObjectListing objectListing = new ObjectListing();
+        S3ObjectSummary objectSummary1 = new S3ObjectSummary();
+        objectSummary1.setBucketName("test-bucket");
+        objectSummary1.setKey("a");
+        objectSummary1.setLastModified(lastModified);
+        objectListing.getObjectSummaries().add(objectSummary1);
+        S3ObjectSummary objectSummary2 = new S3ObjectSummary();
+        objectSummary2.setBucketName("test-bucket");
+        objectSummary2.setKey("b/c");
+        objectSummary2.setLastModified(lastModified);
+        objectListing.getObjectSummaries().add(objectSummary2);
+        S3ObjectSummary objectSummary3 = new S3ObjectSummary();
+        objectSummary3.setBucketName("test-bucket");
+        objectSummary3.setKey("d/e");
+        objectSummary3.setLastModified(lastModified);
+        objectListing.getObjectSummaries().add(objectSummary3);
+        Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
+        runner.run();
+
+        ArgumentCaptor<ListObjectsRequest> captureRequest = ArgumentCaptor.forClass(ListObjectsRequest.class);
+        Mockito.verify(mockS3Client, Mockito.times(1)).listObjects(captureRequest.capture());
+        ListObjectsRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertFalse(request.isRequesterPays());
+        Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
+
+        runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
+
+        final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+        final String lastModifiedString = dateFormat.format(lastModified);
+
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals("record.count", "3");
+        flowFile.assertContentEquals("a,test-bucket,,," + lastModifiedString + ",0,,true,,,\n"
+            + "b/c,test-bucket,,," + lastModifiedString + ",0,,true,,,\n"
+            + "d/e,test-bucket,,," + lastModifiedString + ",0,,true,,,\n");
+    }
+
+    @Test
     public void testListWithRequesterPays() {
         runner.setProperty(ListS3.REGION, "eu-west-1");
         runner.setProperty(ListS3.BUCKET, "test-bucket");
@@ -453,34 +505,4 @@ public class TestListS3 {
 
         Mockito.verify(mockS3Client, Mockito.never()).listVersions(Mockito.any());
     }
-
-    @Test
-    public void testGetPropertyDescriptors() throws Exception {
-        ListS3 processor = new ListS3();
-        List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
-        assertEquals("size should be eq", 23, pd.size());
-        assertTrue(pd.contains(ListS3.ACCESS_KEY));
-        assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
-        assertTrue(pd.contains(ListS3.BUCKET));
-        assertTrue(pd.contains(ListS3.CREDENTIALS_FILE));
-        assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE));
-        assertTrue(pd.contains(ListS3.REGION));
-        assertTrue(pd.contains(ListS3.WRITE_OBJECT_TAGS));
-        assertTrue(pd.contains(ListS3.WRITE_USER_METADATA));
-        assertTrue(pd.contains(ListS3.SECRET_KEY));
-        assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE));
-        assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));
-        assertTrue(pd.contains(ListS3.TIMEOUT));
-        assertTrue(pd.contains(ListS3.DELIMITER));
-        assertTrue(pd.contains(ListS3.PREFIX));
-        assertTrue(pd.contains(ListS3.USE_VERSIONS));
-        assertTrue(pd.contains(ListS3.LIST_TYPE));
-        assertTrue(pd.contains(ListS3.MIN_AGE));
-        assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
-        assertTrue(pd.contains(ListS3.PROXY_HOST));
-        assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
-        assertTrue(pd.contains(ListS3.PROXY_USERNAME));
-        assertTrue(pd.contains(ListS3.PROXY_PASSWORD));
-        assertTrue(pd.contains(ListS3.REQUESTER_PAYS));
-    }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index 1670732..8c82dbe 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -60,6 +60,7 @@ import org.apache.nifi.processor.util.list.ListedEntityTracker;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
 import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
+import org.apache.nifi.serialization.record.RecordSchema;
 
 @PrimaryNodeOnly
 @TriggerSerially
@@ -96,6 +97,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
 
     private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
             LISTING_STRATEGY,
+            AbstractListProcessor.RECORD_WRITER,
             AzureStorageUtils.CONTAINER,
             AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
             AzureStorageUtils.ACCOUNT_NAME,
@@ -158,6 +160,11 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
     }
 
     @Override
+    protected RecordSchema getRecordSchema() {
+        return BlobInfo.getRecordSchema();
+    }
+
+    @Override
     protected String getDefaultTimePrecision() {
         // User does not have to choose one.
         // AUTO_DETECT can handle most cases, but it may incur longer latency
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
index 15b0fe0..96e1c8c 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
@@ -17,12 +17,52 @@
 package org.apache.nifi.processors.azure.storage.utils;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.nifi.processor.util.list.ListableEntity;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 
 public class BlobInfo implements Comparable<BlobInfo>, Serializable, ListableEntity {
     private static final long serialVersionUID = 1L;
 
+    private static final RecordSchema SCHEMA;
+    private static final String BLOB_NAME = "blobName";
+    private static final String BLOB_TYPE = "blobType";
+    private static final String FILENAME = "filename";
+    private static final String CONTAINER_NAME = "container";
+    private static final String LENGTH = "length";
+    private static final String LAST_MODIFIED = "lastModified";
+    private static final String ETAG = "etag";
+    private static final String CONTENT_LANGUAGE = "language";
+    private static final String CONTENT_TYPE = "contentType";
+    private static final String PRIMARY_URI = "primaryUri";
+    private static final String SECONDARY_URI = "secondaryUri";
+
+    static {
+        final List<RecordField> recordFields = new ArrayList<>();
+        recordFields.add(new RecordField(BLOB_NAME, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(BLOB_TYPE, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(CONTAINER_NAME, RecordFieldType.BOOLEAN.getDataType(), false));
+        recordFields.add(new RecordField(LENGTH, RecordFieldType.LONG.getDataType(), false));
+        recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
+        recordFields.add(new RecordField(ETAG, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(CONTENT_LANGUAGE, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(CONTENT_TYPE, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(PRIMARY_URI, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(SECONDARY_URI, RecordFieldType.STRING.getDataType()));
+        SCHEMA = new SimpleRecordSchema(recordFields);
+    }
+
+
     private final String primaryUri;
     private final String secondaryUri;
     private final String contentType;
@@ -78,6 +118,27 @@ public class BlobInfo implements Comparable<BlobInfo>, Serializable, ListableEnt
         return blobType;
     }
 
+    @Override
+    public Record toRecord() {
+        final Map<String, Object> values = new HashMap<>();
+        values.put(PRIMARY_URI, getPrimaryUri());
+        values.put(SECONDARY_URI, getSecondaryUri());
+        values.put(CONTENT_TYPE, getContentType());
+        values.put(CONTENT_LANGUAGE, getContentLanguage());
+        values.put(CONTAINER_NAME, getContainerName());
+        values.put(BLOB_NAME, getBlobName());
+        values.put(FILENAME, getName());
+        values.put(ETAG, getEtag());
+        values.put(LAST_MODIFIED, getLastModifiedTime());
+        values.put(LENGTH, getLength());
+        values.put(BLOB_TYPE, getBlobType());
+        return new MapRecord(SCHEMA, values);
+    }
+
+    public static RecordSchema getRecordSchema() {
+        return SCHEMA;
+    }
+
     public static final class Builder {
         private String primaryUri;
         private String secondaryUri;
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
index 96ba7dd..b01742e 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
@@ -25,7 +25,7 @@
     <description>
         This nifi-processor-utils module is designed to capture common patterns
         and utilities that can be leveraged by other processors or components to
-        help promote reuse.  These patterns may become framework level features 
+        help promote reuse.  These patterns may become framework level features
         or may simply be made available through this utility.  It is ok for this
         module to have dependencies but care should be taken when adding dependencies
         as this increases the cost of utilizing this module in various nars.
@@ -78,6 +78,21 @@
             <artifactId>junit</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index af5775f..1d9bba8 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -27,6 +27,7 @@ import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
@@ -44,6 +45,11 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.StringUtils;
 
 import java.io.File;
@@ -144,7 +150,7 @@ import java.util.stream.Collectors;
     + "The scope used depends on the implementation.")
 public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
 
-    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new Builder()
         .name("Distributed Cache Service")
         .description("NOTE: This property is used merely for migration from old NiFi version before state management was introduced at version 0.5.0. "
             + "The stored value in the cache service will be migrated into the state when this processor is started at the first time. "
@@ -164,7 +170,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
     public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds","For a target system that does not have millis precision, but has in seconds.");
     public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
 
-    public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new Builder()
         .name("target-system-timestamp-precision")
         .displayName("Target System Timestamp Precision")
         .description("Specify timestamp precision at the target system."
@@ -192,7 +198,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
                     " However additional DistributedMapCache controller service is required and more JVM heap memory is used." +
                     " See the description of 'Entity Tracking Time Window' property for further details on how it works.");
 
-    public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor LISTING_STRATEGY = new Builder()
         .name("listing-strategy")
         .displayName("Listing Strategy")
         .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
@@ -201,6 +207,15 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         .defaultValue(BY_TIMESTAMPS.getValue())
         .build();
 
+    public static final PropertyDescriptor RECORD_WRITER = new Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
+            "all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.")
+        .required(false)
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .build();
+
     /**
      * Represents the timestamp of an entity which was the latest one within those listed at the previous cycle.
      * It does not necessary mean it has been processed as well.
@@ -508,7 +523,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
             }
         }
 
-        int flowfilesCreated = 0;
+        int entitiesListed = 0;
 
         if (orderedEntries.size() > 0) {
             latestListedEntryTimestampThisCycleMillis = orderedEntries.lastKey();
@@ -554,27 +569,24 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
                 }
             }
 
-            for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
-                List<T> entities = timestampEntities.getValue();
-                if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
-                    // Filter out previously processed entities.
-                    entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
-                }
 
-                for (T entity : entities) {
-                    // Create the FlowFile for this path.
-                    final Map<String, String> attributes = createAttributes(entity, context);
-                    FlowFile flowFile = session.create();
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    session.transfer(flowFile, REL_SUCCESS);
-                    flowfilesCreated++;
+            final boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
+            if (writerSet) {
+                try {
+                    entitiesListed = createRecordsForEntities(context, session, orderedEntries);
+                } catch (final IOException | SchemaNotFoundException e) {
+                    getLogger().error("Failed to write listing to FlowFile", e);
+                    context.yield();
+                    return;
                 }
+            } else {
+                entitiesListed = createFlowFilesForEntities(context, session, orderedEntries);
             }
         }
 
         // As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
         if (latestListedEntryTimestampThisCycleMillis != null) {
-            boolean processedNewFiles = flowfilesCreated > 0;
+            boolean processedNewFiles = entitiesListed > 0;
             if (processedNewFiles) {
                 // If there have been files created, update the last timestamp we processed.
                 // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
@@ -587,7 +599,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
                 // Capture latestIdentifierProcessed.
                 latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
                 lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
-                getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated});
+                getLogger().info("Successfully created listing with {} new objects", new Object[]{entitiesListed});
                 session.commit();
             }
 
@@ -624,6 +636,69 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         }
     }
 
+    private int createRecordsForEntities(final ProcessContext context, final ProcessSession session, final Map<Long, List<T>> orderedEntries) throws IOException, SchemaNotFoundException {
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        int entitiesListed = 0;
+        FlowFile flowFile = session.create();
+        final WriteResult writeResult;
+
+        try (final OutputStream out = session.write(flowFile);
+            final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
+
+            recordSetWriter.beginRecordSet();
+            for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
+                List<T> entities = timestampEntities.getValue();
+                if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
+                    // Filter out previously processed entities.
+                    entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
+                }
+
+                for (T entity : entities) {
+                    entitiesListed++;
+                    recordSetWriter.write(entity.toRecord());
+                }
+            }
+
+            writeResult = recordSetWriter.finishRecordSet();
+        }
+
+        if (entitiesListed == 0) {
+            session.remove(flowFile);
+            return 0;
+        }
+
+        final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
+        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+        flowFile = session.putAllAttributes(flowFile, attributes);
+
+        session.transfer(flowFile, REL_SUCCESS);
+        return entitiesListed;
+    }
+
+    private int createFlowFilesForEntities(final ProcessContext context, final ProcessSession session, final Map<Long, List<T>> orderedEntries) {
+        int entitiesListed = 0;
+        for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
+            List<T> entities = timestampEntities.getValue();
+            if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
+                // Filter out previously processed entities.
+                entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
+            }
+
+            for (T entity : entities) {
+                entitiesListed++;
+
+                // Create the FlowFile for this path.
+                final Map<String, String> attributes = createAttributes(entity, context);
+                FlowFile flowFile = session.create();
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                session.transfer(flowFile, REL_SUCCESS);
+            }
+        }
+
+        return entitiesListed;
+    }
+
     /**
      * This method is intended to be overridden by SubClasses those do not support TARGET_SYSTEM_TIMESTAMP_PRECISION property.
      * So that it use return different precisions than PRECISION_AUTO_DETECT.
@@ -693,6 +768,10 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
      */
     protected abstract Scope getStateScope(final PropertyContext context);
 
+    /**
+     * @return the RecordSchema that will be used for any Records that are produced by the Processor
+     */
+    protected abstract RecordSchema getRecordSchema();
 
     private static class StringSerDe implements Serializer<String>, Deserializer<String> {
         @Override
@@ -732,7 +811,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
     }
 
     protected ListedEntityTracker<T> createListedEntityTracker() {
-        return new ListedEntityTracker<>(getIdentifier(), getLogger());
+        return new ListedEntityTracker<>(getIdentifier(), getLogger(), getRecordSchema());
     }
 
     private void listByTrackingEntities(ProcessContext context, ProcessSession session) throws ProcessException {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
index f769e78..212e7d6 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
@@ -17,6 +17,8 @@
 
 package org.apache.nifi.processor.util.list;
 
+import org.apache.nifi.serialization.record.Record;
+
 public interface ListableEntity {
 
     /**
@@ -42,4 +44,10 @@ public interface ListableEntity {
      */
     long getSize();
 
+    /**
+     * @return a Record that represents this entity
+     */
+    default Record toRecord() {
+        throw new UnsupportedOperationException("Entities of type " + getClass() + " do not support conversion to Records");
+    }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
index 6c36ba9..3bd1992 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
@@ -33,13 +33,21 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.stream.io.GZIPOutputStream;
 import org.apache.nifi.util.StringUtils;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -140,6 +148,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
 
     private final String componentId;
     private final ComponentLog logger;
+    private final RecordSchema recordSchema;
 
     /*
      * The scope, nodeId and mapCacheClient being used at the previous trackEntities method execution is captured,
@@ -149,18 +158,19 @@ public class ListedEntityTracker<T extends ListableEntity> {
     private String nodeId;
     private DistributedMapCacheClient mapCacheClient;
 
-    ListedEntityTracker(String componentId, ComponentLog logger) {
-        this(componentId, logger, DEFAULT_CURRENT_TIMESTAMP_SUPPLIER);
+    ListedEntityTracker(final String componentId, final ComponentLog logger, final RecordSchema recordSchema) {
+        this(componentId, logger, DEFAULT_CURRENT_TIMESTAMP_SUPPLIER, recordSchema);
     }
 
     /**
      * This constructor is used by unit test code so that it can produce the consistent result by controlling current timestamp.
      * @param currentTimestampSupplier a function to return current timestamp.
      */
-    ListedEntityTracker(String componentId, ComponentLog logger, Supplier<Long> currentTimestampSupplier) {
+    ListedEntityTracker(final String componentId, final ComponentLog logger, final Supplier<Long> currentTimestampSupplier, final RecordSchema recordSchema) {
         this.componentId = componentId;
         this.logger = logger;
         this.currentTimestampSupplier = currentTimestampSupplier;
+        this.recordSchema = recordSchema;
     }
 
     static void validateProperties(ValidationContext context, Collection<ValidationResult> results, Scope scope) {
@@ -237,9 +247,8 @@ public class ListedEntityTracker<T extends ListableEntity> {
         }
 
         if (alreadyListedEntities == null || justElectedPrimaryNode) {
-            logger.info(justElectedPrimaryNode
-                    ? "Just elected as Primary node, restoring already-listed entities."
-                    : "At the first onTrigger, restoring already-listed entities.");
+            logger.info(justElectedPrimaryNode ? "Just elected as Primary node, restoring already-listed entities." : "At the first onTrigger, restoring already-listed entities.");
+
             try {
                 final Map<String, ListedEntity> fetchedListedEntities = fetchListedEntities();
                 if (fetchedListedEntities == null) {
@@ -283,14 +292,12 @@ public class ListedEntityTracker<T extends ListableEntity> {
             }
 
             if (entity.getTimestamp() > alreadyListedEntity.getTimestamp()) {
-                logger.trace("Picked {} having newer timestamp {} than {}.",
-                        new Object[]{identifier, entity.getTimestamp(), alreadyListedEntity.getTimestamp()});
+                logger.trace("Picked {} having newer timestamp {} than {}.", new Object[]{identifier, entity.getTimestamp(), alreadyListedEntity.getTimestamp()});
                 return true;
             }
 
             if (entity.getSize() != alreadyListedEntity.getSize()) {
-                logger.trace("Picked {} having different size {} than {}.",
-                        new Object[]{identifier, entity.getSize(), alreadyListedEntity.getSize()});
+                logger.trace("Picked {} having different size {} than {}.", new Object[]{identifier, entity.getSize(), alreadyListedEntity.getSize()});
                 return true;
             }
 
@@ -313,25 +320,23 @@ public class ListedEntityTracker<T extends ListableEntity> {
         oldEntityIds.forEach(oldEntityId -> alreadyListedEntities.remove(oldEntityId));
 
         // Emit updated entities.
-        for (T updatedEntity : updatedEntities) {
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, createAttributes.apply(updatedEntity));
-            session.transfer(flowFile, REL_SUCCESS);
-            // In order to reduce object size, discard meta data captured at the sub-classes.
-            final ListedEntity listedEntity = new ListedEntity(updatedEntity.getTimestamp(), updatedEntity.getSize());
-            alreadyListedEntities.put(updatedEntity.getIdentifier(), listedEntity);
+        if (context.getProperty(AbstractListProcessor.RECORD_WRITER).isSet()) {
+            try {
+                createRecordsForEntities(context, session, updatedEntities);
+            } catch (final IOException | SchemaNotFoundException e) {
+                logger.error("Failed to create records for listed entities", e);
+            }
+        } else {
+            createFlowFilesForEntities(session, updatedEntities, createAttributes);
         }
 
         // Commit ProcessSession before persisting listed entities.
         // In case persisting listed entities failure, same entities may be listed again, but better than not listing.
         session.commit();
         try {
-            logger.debug("Removed old entities count: {}, Updated entities count: {}",
-                    new Object[]{oldEntityIds.size(), updatedEntities.size()});
-            if (logger.isTraceEnabled()) {
-                logger.trace("Removed old entities: {}, Updated entities: {}",
-                        new Object[]{oldEntityIds, updatedEntities});
-            }
+            logger.debug("Removed old entities count: {}, Updated entities count: {}", new Object[]{oldEntityIds.size(), updatedEntities.size()});
+            logger.trace("Removed old entities: {}, Updated entities: {}", new Object[]{oldEntityIds, updatedEntities});
+
             persistListedEntities(alreadyListedEntities);
         } catch (IOException e) {
             throw new ProcessException("Failed to persist already-listed entities due to " + e, e);
@@ -339,4 +344,47 @@ public class ListedEntityTracker<T extends ListableEntity> {
 
     }
 
+    private void createRecordsForEntities(final ProcessContext context, final ProcessSession session, final List<T> updatedEntities) throws IOException, SchemaNotFoundException {
+        if (updatedEntities.isEmpty()) {
+            logger.debug("No entities to write records for");
+            return;
+        }
+
+        final RecordSetWriterFactory writerFactory = context.getProperty(AbstractListProcessor.RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        final WriteResult writeResult;
+        try (final OutputStream out = session.write(flowFile);
+             final RecordSetWriter recordSetWriter = writerFactory.createWriter(logger, recordSchema, out, Collections.emptyMap())) {
+
+            recordSetWriter.beginRecordSet();
+            for (T updatedEntity : updatedEntities) {
+                recordSetWriter.write(updatedEntity.toRecord());
+
+                // In order to reduce object size, discard meta data captured at the sub-classes.
+                final ListedEntity listedEntity = new ListedEntity(updatedEntity.getTimestamp(), updatedEntity.getSize());
+                alreadyListedEntities.put(updatedEntity.getIdentifier(), listedEntity);
+            }
+
+            writeResult = recordSetWriter.finishRecordSet();
+        }
+
+        final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
+        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+        flowFile = session.putAllAttributes(flowFile, attributes);
+
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+    private void createFlowFilesForEntities(final ProcessSession session, final List<T> updatedEntities, final Function<T, Map<String, String>> createAttributes) {
+        for (T updatedEntity : updatedEntities) {
+            FlowFile flowFile = session.create();
+            flowFile = session.putAllAttributes(flowFile, createAttributes.apply(updatedEntity));
+            session.transfer(flowFile, REL_SUCCESS);
+            // In order to reduce object size, discard meta data captured at the sub-classes.
+            final ListedEntity listedEntity = new ListedEntity(updatedEntity.getTimestamp(), updatedEntity.getSize());
+            alreadyListedEntities.put(updatedEntity.getIdentifier(), listedEntity);
+        }
+    }
+
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index e893332..dd8fdaa 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -29,7 +29,16 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
@@ -71,8 +80,6 @@ public class TestAbstractListProcessor {
         return AbstractListProcessor.LISTING_LAG_MILLIS.get(targetPrecision) * 2;
     }
 
-    private static final long DEFAULT_SLEEP_MILLIS = getSleepMillis(TimeUnit.MILLISECONDS);
-
     private ConcreteListProcessor proc;
     private TestRunner runner;
 
@@ -175,6 +182,33 @@ public class TestAbstractListProcessor {
     }
 
     @Test
+    public void testWriteRecords() throws InitializationException {
+        final RecordSetWriterFactory writerFactory = new MockRecordWriter("id,name,timestamp,size", false);
+        runner.addControllerService("record-writer", writerFactory);
+        runner.enableControllerService(writerFactory);
+
+        runner.setProperty(AbstractListProcessor.RECORD_WRITER, "record-writer");
+
+        runner.run();
+
+        assertEquals(0, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size());
+        proc.addEntity("name", "identifier", 4L);
+        proc.addEntity("name2", "identifier2", 8L);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 1);
+
+        final MockFlowFile flowfile = runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0);
+        flowfile.assertAttributeEquals("record.count", "2");
+        flowfile.assertContentEquals("id,name,timestamp,size\nidentifier,name,4,0\nidentifier2,name2,8,0\n");
+
+        runner.clearTransferState();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 0);
+    }
+
+    @Test
     public void testEntityTrackingStrategy() throws InitializationException {
         runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES);
         // Require a cache service.
@@ -351,13 +385,14 @@ public class TestAbstractListProcessor {
 
         @Override
         protected ListedEntityTracker<ListableEntity> createListedEntityTracker() {
-            return new ListedEntityTracker<>(getIdentifier(), getLogger(), () -> currentTimestamp.get());
+            return new ListedEntityTracker<>(getIdentifier(), getLogger(), currentTimestamp::get, null);
         }
 
         @Override
         protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
             final List<PropertyDescriptor> properties = new ArrayList<>();
             properties.add(LISTING_STRATEGY);
+            properties.add(RECORD_WRITER);
             properties.add(DISTRIBUTED_CACHE_SERVICE);
             properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
             properties.add(ListedEntityTracker.TRACKING_STATE_CACHE);
@@ -397,6 +432,16 @@ public class TestAbstractListProcessor {
                 public long getSize() {
                     return size;
                 }
+
+                @Override
+                public Record toRecord() {
+                    final Map<String, Object> values = new HashMap<>(4);
+                    values.put("id", identifier);
+                    values.put("name", name);
+                    values.put("timestamp", timestamp);
+                    values.put("size", size);
+                    return new MapRecord(getRecordSchema(), values);
+                }
             };
 
             entities.put(entity.getIdentifier(), entity);
@@ -432,5 +477,15 @@ public class TestAbstractListProcessor {
         protected Scope getStateScope(final PropertyContext context) {
             return Scope.CLUSTER;
         }
+
+        @Override
+        protected RecordSchema getRecordSchema() {
+            final List<RecordField> fields = new ArrayList<>();
+            fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+            fields.add(new RecordField("size", RecordFieldType.LONG.getDataType()));
+            return new SimpleRecordSchema(fields);
+        }
     }
 }
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
index 0c360d1..626b5d2 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
@@ -122,14 +122,14 @@ public abstract class AbstractGCPProcessor<
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return ImmutableList.of(
-                GCP_CREDENTIALS_PROVIDER_SERVICE,
-                PROJECT_ID,
-                RETRY_COUNT,
-                PROXY_HOST,
-                PROXY_PORT,
-                HTTP_PROXY_USERNAME,
-                HTTP_PROXY_PASSWORD,
-                ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS)
+            PROJECT_ID,
+            GCP_CREDENTIALS_PROVIDER_SERVICE,
+            RETRY_COUNT,
+            PROXY_HOST,
+            PROXY_PORT,
+            HTTP_PROXY_USERNAME,
+            HTTP_PROXY_PASSWORD,
+            ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS)
         );
     }
 
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
index 2adfd29..724a88f 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
@@ -18,10 +18,8 @@
 package org.apache.nifi.processors.gcp.storage;
 
 import com.google.cloud.ReadChannel;
-import com.google.cloud.storage.Acl;
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.BlobId;
-import com.google.cloud.storage.BlobInfo;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageException;
 import com.google.common.collect.ImmutableList;
@@ -42,7 +40,6 @@ import org.apache.nifi.processor.util.StandardValidators;
 
 import java.nio.channels.Channels;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -97,8 +94,7 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
 @CapabilityDescription("Fetches a file from a Google Cloud Bucket. Designed to be used in tandem with ListGCSBucket.")
 @SeeAlso({ListGCSBucket.class, PutGCSObject.class, DeleteGCSObject.class})
 @WritesAttributes({
-        @WritesAttribute(attribute = "filename", description = "The name of the file, parsed if possible from the " +
-                "Content-Disposition response header"),
+        @WritesAttribute(attribute = "filename", description = "The name of the file, parsed if possible from the Content-Disposition response header"),
         @WritesAttribute(attribute = BUCKET_ATTR, description = BUCKET_DESC),
         @WritesAttribute(attribute = KEY_ATTR, description = KEY_DESC),
         @WritesAttribute(attribute = SIZE_ATTR, description = SIZE_DESC),
@@ -136,7 +132,7 @@ public class FetchGCSObject extends AbstractGCSProcessor {
 
     public static final PropertyDescriptor KEY = new PropertyDescriptor
             .Builder().name("gcs-key")
-            .displayName("Key")
+            .displayName("Name")
             .description(KEY_DESC)
             .required(true)
             .defaultValue("${" + CoreAttributes.FILENAME.key() + "}")
@@ -147,7 +143,7 @@ public class FetchGCSObject extends AbstractGCSProcessor {
     public static final PropertyDescriptor GENERATION = new PropertyDescriptor.Builder()
             .name("gcs-generation")
             .displayName("Object Generation")
-            .description("The generation of the Object to download. If null, will download latest generation.")
+            .description("The generation of the Object to download. If not set, the latest generation will be downloaded.")
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
             .required(false)
@@ -166,12 +162,12 @@ public class FetchGCSObject extends AbstractGCSProcessor {
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return ImmutableList.<PropertyDescriptor>builder()
-                .addAll(super.getSupportedPropertyDescriptors())
-                .add(BUCKET)
-                .add(KEY)
-                .add(GENERATION)
-                .add(ENCRYPTION_KEY)
-                .build();
+            .add(BUCKET)
+            .add(KEY)
+            .addAll(super.getSupportedPropertyDescriptors())
+            .add(GENERATION)
+            .add(ENCRYPTION_KEY)
+            .build();
     }
 
 
@@ -185,21 +181,12 @@ public class FetchGCSObject extends AbstractGCSProcessor {
 
         final long startNanos = System.nanoTime();
 
-        String bucketName = context.getProperty(BUCKET)
-                                   .evaluateAttributeExpressions(flowFile)
-                                   .getValue();
-        String key = context.getProperty(KEY)
-                                   .evaluateAttributeExpressions(flowFile)
-                                   .getValue();
-        Long generation = context.getProperty(GENERATION)
-                                    .evaluateAttributeExpressions(flowFile)
-                                    .asLong();
-        String encryptionKey = context.getProperty(ENCRYPTION_KEY)
-                                    .evaluateAttributeExpressions(flowFile)
-                                    .getValue();
+        final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final Long generation = context.getProperty(GENERATION).evaluateAttributeExpressions(flowFile).asLong();
+        final String encryptionKey = context.getProperty(ENCRYPTION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
         final Storage storage = getCloudService();
-        final Map<String, String> attributes = new HashMap<>();
         final BlobId blobId = BlobId.of(bucketName, key, generation);
 
         try {
@@ -214,134 +201,26 @@ public class FetchGCSObject extends AbstractGCSProcessor {
             }
 
             final Blob blob = storage.get(blobId);
-
             if (blob == null) {
                 throw new StorageException(404, "Blob " + blobId + " not found");
             }
 
-            final ReadChannel reader = storage.reader(blobId, blobSourceOptions.toArray(new Storage.BlobSourceOption[blobSourceOptions.size()]));
-
+            final ReadChannel reader = storage.reader(blobId, blobSourceOptions.toArray(new Storage.BlobSourceOption[0]));
             flowFile = session.importFrom(Channels.newInputStream(reader), flowFile);
 
-            attributes.put(BUCKET_ATTR, blob.getBucket());
-            attributes.put(KEY_ATTR, blob.getName());
-
-            if (blob.getSize() != null) {
-                attributes.put(SIZE_ATTR, String.valueOf(blob.getSize()));
-            }
-
-            if (blob.getCacheControl() != null) {
-                attributes.put(CACHE_CONTROL_ATTR, blob.getCacheControl());
-            }
-
-            if (blob.getComponentCount() != null) {
-                attributes.put(COMPONENT_COUNT_ATTR, String.valueOf(blob.getComponentCount()));
-            }
-
-            if (blob.getContentEncoding() != null) {
-                attributes.put(CONTENT_ENCODING_ATTR, blob.getContentEncoding());
-            }
-
-            if (blob.getContentLanguage() != null) {
-                attributes.put(CONTENT_LANGUAGE_ATTR, blob.getContentLanguage());
-            }
-
-            if (blob.getContentType() != null) {
-                attributes.put(CoreAttributes.MIME_TYPE.key(), blob.getContentType());
-            }
-
-            if (blob.getCrc32c() != null) {
-                attributes.put(CRC32C_ATTR, blob.getCrc32c());
-            }
-
-            if (blob.getCustomerEncryption() != null) {
-                final BlobInfo.CustomerEncryption encryption = blob.getCustomerEncryption();
-
-                attributes.put(ENCRYPTION_ALGORITHM_ATTR, encryption.getEncryptionAlgorithm());
-                attributes.put(ENCRYPTION_SHA256_ATTR, encryption.getKeySha256());
-            }
-
-            if (blob.getEtag() != null) {
-                attributes.put(ETAG_ATTR, blob.getEtag());
-            }
-
-            if (blob.getGeneratedId() != null) {
-                attributes.put(GENERATED_ID_ATTR, blob.getGeneratedId());
-            }
-
-            if (blob.getGeneration() != null) {
-                attributes.put(GENERATION_ATTR, String.valueOf(blob.getGeneration()));
-            }
-
-            if (blob.getMd5() != null) {
-                attributes.put(MD5_ATTR, blob.getMd5());
-            }
-
-            if (blob.getMediaLink() != null) {
-                attributes.put(MEDIA_LINK_ATTR, blob.getMediaLink());
-            }
-
-            if (blob.getMetageneration() != null) {
-                attributes.put(METAGENERATION_ATTR, String.valueOf(blob.getMetageneration()));
-            }
-
-            if (blob.getOwner() != null) {
-                final Acl.Entity entity = blob.getOwner();
-
-                if (entity instanceof Acl.User) {
-                    attributes.put(OWNER_ATTR, ((Acl.User) entity).getEmail());
-                    attributes.put(OWNER_TYPE_ATTR, "user");
-                } else if (entity instanceof Acl.Group) {
-                    attributes.put(OWNER_ATTR, ((Acl.Group) entity).getEmail());
-                    attributes.put(OWNER_TYPE_ATTR, "group");
-                } else if (entity instanceof Acl.Domain) {
-                    attributes.put(OWNER_ATTR, ((Acl.Domain) entity).getDomain());
-                    attributes.put(OWNER_TYPE_ATTR, "domain");
-                } else if (entity instanceof Acl.Project) {
-                    attributes.put(OWNER_ATTR, ((Acl.Project) entity).getProjectId());
-                    attributes.put(OWNER_TYPE_ATTR, "project");
-                }
-            }
-
-            if (blob.getSelfLink() != null) {
-                attributes.put(URI_ATTR, blob.getSelfLink());
-            }
-
-            if (blob.getContentDisposition() != null) {
-                attributes.put(CONTENT_DISPOSITION_ATTR, blob.getContentDisposition());
-
-                final Util.ParsedContentDisposition parsedContentDisposition = Util.parseContentDisposition(blob.getContentDisposition());
-
-                if (parsedContentDisposition != null) {
-                    attributes.put(CoreAttributes.FILENAME.key(), parsedContentDisposition.getFileName());
-                }
-            }
-
-            if (blob.getCreateTime() != null) {
-                attributes.put(CREATE_TIME_ATTR, String.valueOf(blob.getCreateTime()));
-            }
-
-            if (blob.getUpdateTime() != null) {
-                attributes.put(UPDATE_TIME_ATTR, String.valueOf(blob.getUpdateTime()));
-            }
-
+            final Map<String, String> attributes = StorageAttributes.createAttributes(blob);
+            flowFile = session.putAllAttributes(flowFile, attributes);
         } catch (StorageException e) {
-            getLogger().error(e.getMessage(), e);
+            getLogger().error("Failed to fetch GCS Object due to {}", new Object[] {e}, e);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
             return;
         }
 
-        if (!attributes.isEmpty()) {
-            flowFile = session.putAllAttributes(flowFile, attributes);
-        }
         session.transfer(flowFile, REL_SUCCESS);
 
         final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
         getLogger().info("Successfully retrieved GCS Object for {} in {} millis; routing to success", new Object[]{flowFile, millis});
-        session.getProvenanceReporter().fetch(
-                flowFile,
-                "https://" + bucketName + ".storage.googleapis.com/" + key,
-                millis);
+        session.getProvenanceReporter().fetch(flowFile, "https://" + bucketName + ".storage.googleapis.com/" + key, millis);
     }
 }
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index f4cdb27..628c241 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -25,9 +25,9 @@ import com.google.common.collect.ImmutableList;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
-import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -38,14 +38,26 @@ import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -174,18 +186,29 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             .description("Specifies whether to use GCS Generations, if applicable.  If false, only the latest version of each object will be returned.")
             .build();
 
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
+            "all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.")
+        .required(false)
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .build();
+
+
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return ImmutableList.<PropertyDescriptor>builder()
-                .addAll(super.getSupportedPropertyDescriptors())
-                .add(BUCKET)
-                .add(PREFIX)
-                .add(USE_GENERATIONS)
-                .build();
+            .add(BUCKET)
+            .add(RECORD_WRITER)
+            .addAll(super.getSupportedPropertyDescriptors())
+            .add(PREFIX)
+            .add(USE_GENERATIONS)
+            .build();
     }
 
-    public static final Set<Relationship> relationships = Collections.unmodifiableSet(
-            new HashSet<>(Collections.singletonList(REL_SUCCESS)));
+    private static final Set<Relationship> relationships = Collections.singleton(REL_SUCCESS);
+
     @Override
     public Set<Relationship> getRelationships() {
         return relationships;
@@ -194,8 +217,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
     // State tracking
     public static final String CURRENT_TIMESTAMP = "currentTimestamp";
     public static final String CURRENT_KEY_PREFIX = "key-";
-    protected long currentTimestamp = 0L;
-    protected Set<String> currentKeys;
+    private volatile long currentTimestamp = 0L;
+    private final Set<String> currentKeys = Collections.synchronizedSet(new HashSet<>());
 
 
     private Set<String> extractKeys(final StateMap stateMap) {
@@ -209,21 +232,24 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
         if (stateMap.getVersion() == -1L || stateMap.get(CURRENT_TIMESTAMP) == null || stateMap.get(CURRENT_KEY_PREFIX+"0") == null) {
             currentTimestamp = 0L;
-            currentKeys = new HashSet<>();
+            currentKeys.clear();
         } else {
             currentTimestamp = Long.parseLong(stateMap.get(CURRENT_TIMESTAMP));
-            currentKeys = extractKeys(stateMap);
+            currentKeys.clear();
+            currentKeys.addAll(extractKeys(stateMap));
         }
     }
 
-    void persistState(final ProcessContext context) {
-        Map<String, String> state = new HashMap<>();
-        state.put(CURRENT_TIMESTAMP, String.valueOf(currentTimestamp));
+    void persistState(final ProcessContext context, final long timestamp, final Set<String> keys) {
+        final Map<String, String> state = new HashMap<>();
+        state.put(CURRENT_TIMESTAMP, String.valueOf(timestamp));
+
         int i = 0;
-        for (String key : currentKeys) {
+        for (final String key : keys) {
             state.put(CURRENT_KEY_PREFIX+i, key);
             i++;
         }
+
         try {
             context.getStateManager().setState(state, Scope.CLUSTER);
         } catch (IOException ioe) {
@@ -231,6 +257,14 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         }
     }
 
+    Set<String> getStateKeys() {
+        return Collections.unmodifiableSet(currentKeys);
+    }
+
+    long getStateTimestamp() {
+        return currentTimestamp;
+    }
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         try {
@@ -244,16 +278,13 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         final long startNanos = System.nanoTime();
 
         final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
-
         final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
-
         final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
 
-        List<Storage.BlobListOption> listOptions = new ArrayList<>();
+        final List<Storage.BlobListOption> listOptions = new ArrayList<>();
         if (prefix != null) {
             listOptions.add(Storage.BlobListOption.prefix(prefix));
         }
-
         if (useGenerations) {
             listOptions.add(Storage.BlobListOption.versions(true));
         }
@@ -261,159 +292,297 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         final Storage storage = getCloudService();
 
         long maxTimestamp = 0L;
-        Set<String> maxKeys = new HashSet<>();
+        final Set<String> maxKeys = new HashSet<>();
+
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        final BlobWriter writer;
+        if (writerFactory == null) {
+            writer = new AttributeBlobWriter(session);
+        } else {
+            writer = new RecordBlobWriter(session, writerFactory, getLogger());
+        }
 
+        try {
+            writer.beginListing();
 
-        Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[listOptions.size()]));
-        do {
+            Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
             int listCount = 0;
 
-            for (Blob blob : blobPage.getValues()) {
-                long lastModified = blob.getUpdateTime();
-                if (lastModified < currentTimestamp
-                        || lastModified == currentTimestamp && currentKeys.contains(blob.getName())) {
-                    continue;
-                }
+            do {
+                for (final Blob blob : blobPage.getValues()) {
+                    long lastModified = blob.getUpdateTime();
+                    if (lastModified < currentTimestamp || lastModified == currentTimestamp && currentKeys.contains(blob.getName())) {
+                        continue;
+                    }
 
-                // Create attributes
-                final Map<String, String> attributes = new HashMap<>();
+                    writer.addToListing(blob);
 
-                attributes.put(BUCKET_ATTR, blob.getBucket());
-                attributes.put(KEY_ATTR, blob.getName());
+                    // Update state
+                    if (lastModified > maxTimestamp) {
+                        maxTimestamp = lastModified;
+                        maxKeys.clear();
+                    }
+                    if (lastModified == maxTimestamp) {
+                        maxKeys.add(blob.getName());
+                    }
 
-                if (blob.getSize() != null) {
-                    attributes.put(SIZE_ATTR, String.valueOf(blob.getSize()));
+                    listCount++;
                 }
 
-                if (blob.getCacheControl() != null) {
-                    attributes.put(CACHE_CONTROL_ATTR, blob.getCacheControl());
+                if (writer.isCheckpoint()) {
+                    commit(session, listCount);
+                    listCount = 0;
                 }
 
-                if (blob.getComponentCount() != null) {
-                    attributes.put(COMPONENT_COUNT_ATTR, String.valueOf(blob.getComponentCount()));
-                }
+                blobPage = blobPage.getNextPage();
+            } while (blobPage != null);
 
-                if (blob.getContentDisposition() != null) {
-                    attributes.put(CONTENT_DISPOSITION_ATTR, blob.getContentDisposition());
-                }
+            writer.finishListing();
+            commit(session, listCount);
 
-                if (blob.getContentEncoding() != null) {
-                    attributes.put(CONTENT_ENCODING_ATTR, blob.getContentEncoding());
-                }
+            if (maxTimestamp != 0) {
+                currentTimestamp = maxTimestamp;
+                currentKeys.clear();
+                currentKeys.addAll(maxKeys);
+                persistState(context, currentTimestamp, currentKeys);
+            } else {
+                getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", new Object[]{bucket});
+                context.yield();
+            }
+        } catch (final Exception e) {
+            getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e);
+            writer.finishListingExceptionally(e);
+            session.rollback();
+            context.yield();
+            return;
+        }
 
-                if (blob.getContentLanguage() != null) {
-                    attributes.put(CONTENT_LANGUAGE_ATTR, blob.getContentLanguage());
-                }
+        final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
+    }
 
-                if (blob.getContentType() != null) {
-                    attributes.put(CoreAttributes.MIME_TYPE.key(), blob.getContentType());
-                }
+    private void commit(final ProcessSession session, int listCount) {
+        if (listCount > 0) {
+            getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
+            session.commit();
+        }
+    }
 
-                if (blob.getCrc32c() != null) {
-                    attributes.put(CRC32C_ATTR, blob.getCrc32c());
-                }
 
-                if (blob.getCustomerEncryption() != null) {
-                    final BlobInfo.CustomerEncryption encryption = blob.getCustomerEncryption();
+    private interface BlobWriter {
+        void beginListing() throws IOException, SchemaNotFoundException;
 
-                    attributes.put(ENCRYPTION_ALGORITHM_ATTR, encryption.getEncryptionAlgorithm());
-                    attributes.put(ENCRYPTION_SHA256_ATTR, encryption.getKeySha256());
-                }
+        void addToListing(Blob blob) throws IOException;
 
-                if (blob.getEtag() != null) {
-                    attributes.put(ETAG_ATTR, blob.getEtag());
-                }
+        void finishListing() throws IOException;
 
-                if (blob.getGeneratedId() != null) {
-                    attributes.put(GENERATED_ID_ATTR, blob.getGeneratedId());
-                }
+        void finishListingExceptionally(Exception cause);
 
-                if (blob.getGeneration() != null) {
-                    attributes.put(GENERATION_ATTR, String.valueOf(blob.getGeneration()));
-                }
+        boolean isCheckpoint();
+    }
 
-                if (blob.getMd5() != null) {
-                    attributes.put(MD5_ATTR, blob.getMd5());
-                }
 
-                if (blob.getMediaLink() != null) {
-                    attributes.put(MEDIA_LINK_ATTR, blob.getMediaLink());
-                }
+    static class RecordBlobWriter implements BlobWriter {
+        private static final RecordSchema RECORD_SCHEMA;
+
+        public static final String BUCKET = "bucket";
+        public static final String NAME = "name";
+        public static final String SIZE = "size";
+        public static final String CACHE_CONTROL = "cacheControl";
+        public static final String COMPONENT_COUNT = "componentCount";
+        public static final String CONTENT_DISPOSITION = "contentDisposition";
+        public static final String CONTENT_ENCODING = "contentEncoding";
+        public static final String CONTENT_LANGUAGE = "contentLanguage";
+        public static final String CRC32C = "crc32c";
+        public static final String CREATE_TIME = "createTime";
+        public static final String UPDATE_TIME = "updateTime";
+        public static final String ENCRYPTION_ALGORITHM = "encryptionAlgorithm";
+        public static final String ENCRYPTION_KEY_SHA256 = "encryptionKeySha256";
+        public static final String ETAG = "etag";
+        public static final String GENERATED_ID = "generatedId";
+        public static final String GENERATION = "generation";
+        public static final String MD5 = "md5";
+        public static final String MEDIA_LINK = "mediaLink";
+        public static final String METAGENERATION = "metageneration";
+        public static final String OWNER = "owner";
+        public static final String OWNER_TYPE = "ownerType";
+        public static final String URI = "uri";
+
+        static {
+            final List<RecordField> fields = new ArrayList<>();
+            fields.add(new RecordField(BUCKET, RecordFieldType.STRING.getDataType(), false));
+            fields.add(new RecordField(NAME, RecordFieldType.STRING.getDataType(), false));
+            fields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType()));
+            fields.add(new RecordField(CACHE_CONTROL, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(COMPONENT_COUNT, RecordFieldType.INT.getDataType()));
+            fields.add(new RecordField(CONTENT_DISPOSITION, RecordFieldType.LONG.getDataType()));
+            fields.add(new RecordField(CONTENT_ENCODING, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(CONTENT_LANGUAGE, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(CRC32C, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(CREATE_TIME, RecordFieldType.TIMESTAMP.getDataType()));
+            fields.add(new RecordField(UPDATE_TIME, RecordFieldType.TIMESTAMP.getDataType()));
+            fields.add(new RecordField(ENCRYPTION_ALGORITHM, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(ENCRYPTION_KEY_SHA256, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(ETAG, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(GENERATED_ID, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(GENERATION, RecordFieldType.LONG.getDataType()));
+            fields.add(new RecordField(MD5, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(MEDIA_LINK, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(METAGENERATION, RecordFieldType.LONG.getDataType()));
+            fields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(OWNER_TYPE, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(URI, RecordFieldType.STRING.getDataType()));
+
+            RECORD_SCHEMA = new SimpleRecordSchema(fields);
+        }
 
-                if (blob.getMetageneration() != null) {
-                    attributes.put(METAGENERATION_ATTR, String.valueOf(blob.getMetageneration()));
-                }
 
-                if (blob.getOwner() != null) {
-                    final Acl.Entity entity = blob.getOwner();
-
-                    if (entity instanceof Acl.User) {
-                        attributes.put(OWNER_ATTR, ((Acl.User) entity).getEmail());
-                        attributes.put(OWNER_TYPE_ATTR, "user");
-                    } else if (entity instanceof Acl.Group) {
-                        attributes.put(OWNER_ATTR, ((Acl.Group) entity).getEmail());
-                        attributes.put(OWNER_TYPE_ATTR, "group");
-                    } else if (entity instanceof Acl.Domain) {
-                        attributes.put(OWNER_ATTR, ((Acl.Domain) entity).getDomain());
-                        attributes.put(OWNER_TYPE_ATTR, "domain");
-                    } else if (entity instanceof Acl.Project) {
-                        attributes.put(OWNER_ATTR, ((Acl.Project) entity).getProjectId());
-                        attributes.put(OWNER_TYPE_ATTR, "project");
-                    }
-                }
+        private final ProcessSession session;
+        private final RecordSetWriterFactory writerFactory;
+        private final ComponentLog logger;
+        private RecordSetWriter recordWriter;
+        private FlowFile flowFile;
 
-                if (blob.getSelfLink() != null) {
-                    attributes.put(URI_ATTR, blob.getSelfLink());
-                }
+        public RecordBlobWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory, final ComponentLog logger) {
+            this.session = session;
+            this.writerFactory = writerFactory;
+            this.logger = logger;
+        }
 
-                attributes.put(CoreAttributes.FILENAME.key(), blob.getName());
+        @Override
+        public void beginListing() throws IOException, SchemaNotFoundException {
+            flowFile = session.create();
 
-                if (blob.getCreateTime() != null) {
-                    attributes.put(CREATE_TIME_ATTR, String.valueOf(blob.getCreateTime()));
-                }
+            final OutputStream out = session.write(flowFile);
+            recordWriter = writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile);
+            recordWriter.beginRecordSet();
+        }
 
-                if (blob.getUpdateTime() != null) {
-                    attributes.put(UPDATE_TIME_ATTR, String.valueOf(blob.getUpdateTime()));
-                }
+        @Override
+        public void addToListing(final Blob blob) throws IOException {
+            recordWriter.write(createRecordForListing(blob));
+        }
 
-                // Create the flowfile
-                FlowFile flowFile = session.create();
+        @Override
+        public void finishListing() throws IOException {
+            final WriteResult writeResult = recordWriter.finishRecordSet();
+            recordWriter.close();
+
+            if (writeResult.getRecordCount() == 0) {
+                session.remove(flowFile);
+            } else {
+                final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                 flowFile = session.putAllAttributes(flowFile, attributes);
+
                 session.transfer(flowFile, REL_SUCCESS);
+            }
+        }
 
-                // Update state
-                if (lastModified > maxTimestamp) {
-                    maxTimestamp = lastModified;
-                    maxKeys.clear();
-                }
-                if (lastModified == maxTimestamp) {
-                    maxKeys.add(blob.getName());
+        @Override
+        public void finishListingExceptionally(final Exception cause) {
+            try {
+                recordWriter.close();
+            } catch (IOException e) {
+                logger.error("Failed to write listing as Records due to {}", new Object[] {e}, e);
+            }
+
+            session.remove(flowFile);
+        }
+
+        @Override
+        public boolean isCheckpoint() {
+            return false;
+        }
+
+        private Record createRecordForListing(final Blob blob) {
+            final Map<String, Object> values = new HashMap<>();
+            values.put(BUCKET, blob.getBucket());
+            values.put(NAME, blob.getName());
+            values.put(SIZE, blob.getSize());
+            values.put(CACHE_CONTROL, blob.getCacheControl());
+            values.put(COMPONENT_COUNT, blob.getComponentCount());
+            values.put(CONTENT_DISPOSITION, blob.getContentDisposition());
+            values.put(CONTENT_ENCODING, blob.getContentEncoding());
+            values.put(CONTENT_LANGUAGE, blob.getContentLanguage());
+            values.put(CRC32C, blob.getCrc32c());
+            values.put(CREATE_TIME, blob.getCreateTime() == null ? null : new Timestamp(blob.getCreateTime()));
+            values.put(UPDATE_TIME, blob.getUpdateTime() == null ? null : new Timestamp(blob.getUpdateTime()));
+
+            final BlobInfo.CustomerEncryption encryption = blob.getCustomerEncryption();
+            if (encryption != null) {
+                values.put(ENCRYPTION_ALGORITHM, encryption.getEncryptionAlgorithm());
+                values.put(ENCRYPTION_KEY_SHA256, encryption.getKeySha256());
+            }
+
+            values.put(ETAG, blob.getEtag());
+            values.put(GENERATED_ID, blob.getGeneratedId());
+            values.put(GENERATION, blob.getGeneration());
+            values.put(MD5, blob.getMd5());
+            values.put(MEDIA_LINK, blob.getMediaLink());
+            values.put(METAGENERATION, blob.getMetageneration());
+
+            final Acl.Entity owner = blob.getOwner();
+            if (owner != null) {
+                if (owner instanceof Acl.User) {
+                    values.put(OWNER, ((Acl.User) owner).getEmail());
+                    values.put(OWNER_TYPE, "user");
+                } else if (owner instanceof Acl.Group) {
+                    values.put(OWNER, ((Acl.Group) owner).getEmail());
+                    values.put(OWNER_TYPE, "group");
+                } else if (owner instanceof Acl.Domain) {
+                    values.put(OWNER, ((Acl.Domain) owner).getDomain());
+                    values.put(OWNER_TYPE, "domain");
+                } else if (owner instanceof Acl.Project) {
+                    values.put(OWNER, ((Acl.Project) owner).getProjectId());
+                    values.put(OWNER_TYPE, "project");
                 }
-                listCount++;
             }
 
-            commit(context, session, listCount);
+            values.put(URI, blob.getSelfLink());
 
-            blobPage = blobPage.getNextPage();
-        } while (blobPage != null);
+            return new MapRecord(RECORD_SCHEMA, values);
+        }
+    }
 
-        if (maxTimestamp != 0) {
-            currentTimestamp = maxTimestamp;
-            currentKeys = maxKeys;
-            persistState(context);
-        } else {
-            getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", new Object[]{bucket});
-            context.yield();
+
+
+    /**
+     * Writes Blobs by creating a new FlowFile for each blob and writing information as FlowFile attributes
+     */
+    private static class AttributeBlobWriter implements BlobWriter {
+        private final ProcessSession session;
+
+        public AttributeBlobWriter(final ProcessSession session) {
+            this.session = session;
         }
 
-        final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-        getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
-    }
+        @Override
+        public void beginListing() {
+        }
 
-    private void commit(final ProcessContext context, final ProcessSession session, int listCount) {
-        if (listCount > 0) {
-            getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
-            session.commit();
+        @Override
+        public void addToListing(final Blob blob) {
+            final Map<String, String> attributes = StorageAttributes.createAttributes(blob);
+
+            // Create the flowfile
+            FlowFile flowFile = session.create();
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            session.transfer(flowFile, REL_SUCCESS);
+        }
+
+        @Override
+        public void finishListing() {
+        }
+
+        @Override
+        public void finishListingExceptionally(final Exception cause) {
+        }
+
+        @Override
+        public boolean isCheckpoint() {
+            return true;
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/StorageAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/StorageAttributes.java
index 46ef207..5d6450b 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/StorageAttributes.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/StorageAttributes.java
@@ -16,6 +16,13 @@
  */
 package org.apache.nifi.processors.gcp.storage;
 
+import com.google.cloud.storage.Acl;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobInfo;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Common attributes being written and accessed through Google Cloud Storage.
@@ -92,4 +99,76 @@ public class StorageAttributes {
 
     public static final String URI_ATTR = "gcs.uri";
     public static final String URI_DESC = "The URI of the object as a string.";
+
+    public static Map<String, String> createAttributes(final Blob blob) {
+        final Map<String, String> attributes = new HashMap<>();
+
+        addAttribute(attributes, BUCKET_ATTR, blob.getBucket());
+        addAttribute(attributes, KEY_ATTR, blob.getName());
+
+        addAttribute(attributes, SIZE_ATTR, blob.getSize());
+        addAttribute(attributes, CACHE_CONTROL_ATTR, blob.getCacheControl());
+        addAttribute(attributes, COMPONENT_COUNT_ATTR, blob.getComponentCount());
+        addAttribute(attributes, CONTENT_DISPOSITION_ATTR, blob.getContentDisposition());
+        addAttribute(attributes, CONTENT_ENCODING_ATTR, blob.getContentEncoding());
+        addAttribute(attributes, CONTENT_LANGUAGE_ATTR, blob.getContentLanguage());
+        addAttribute(attributes, CoreAttributes.MIME_TYPE.key(), blob.getContentType());
+        addAttribute(attributes, CRC32C_ATTR, blob.getCrc32c());
+
+        if (blob.getCustomerEncryption() != null) {
+            final BlobInfo.CustomerEncryption encryption = blob.getCustomerEncryption();
+
+            addAttribute(attributes, ENCRYPTION_ALGORITHM_ATTR, encryption.getEncryptionAlgorithm());
+            addAttribute(attributes, ENCRYPTION_SHA256_ATTR, encryption.getKeySha256());
+        }
+
+        addAttribute(attributes, ETAG_ATTR, blob.getEtag());
+        addAttribute(attributes, GENERATED_ID_ATTR, blob.getGeneratedId());
+        addAttribute(attributes, GENERATION_ATTR, blob.getGeneration());
+        addAttribute(attributes, MD5_ATTR, blob.getMd5());
+        addAttribute(attributes, MEDIA_LINK_ATTR, blob.getMediaLink());
+        addAttribute(attributes, METAGENERATION_ATTR, blob.getMetageneration());
+
+        if (blob.getOwner() != null) {
+            final Acl.Entity entity = blob.getOwner();
+
+            if (entity instanceof Acl.User) {
+                addAttribute(attributes, OWNER_ATTR, ((Acl.User) entity).getEmail());
+                addAttribute(attributes, OWNER_TYPE_ATTR, "user");
+            } else if (entity instanceof Acl.Group) {
+                addAttribute(attributes, OWNER_ATTR, ((Acl.Group) entity).getEmail());
+                addAttribute(attributes, OWNER_TYPE_ATTR, "group");
+            } else if (entity instanceof Acl.Domain) {
+                addAttribute(attributes, OWNER_ATTR, ((Acl.Domain) entity).getDomain());
+                addAttribute(attributes, OWNER_TYPE_ATTR, "domain");
+            } else if (entity instanceof Acl.Project) {
+                addAttribute(attributes, OWNER_ATTR, ((Acl.Project) entity).getProjectId());
+                addAttribute(attributes, OWNER_TYPE_ATTR, "project");
+            }
+        }
+
+        addAttribute(attributes, URI_ATTR, blob.getSelfLink());
+        addAttribute(attributes, CoreAttributes.FILENAME.key(), blob.getName());
+        addAttribute(attributes, CREATE_TIME_ATTR, blob.getCreateTime());
+        addAttribute(attributes, UPDATE_TIME_ATTR, blob.getUpdateTime());
+
+        return attributes;
+    }
+
+    private static void addAttribute(final Map<String, String> attributes, final String key, final Object value) {
+        if (value == null) {
+            return;
+        }
+
+        attributes.put(key, value.toString());
+    }
+
+    private static void addAttribute(final Map<String, String> attributes, final String key, final String value) {
+        if (value == null) {
+            return;
+        }
+
+        attributes.put(key, value);
+    }
+
 }
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.storage.ListGCSBucket/additionalDetails.html b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.storage.ListGCSBucket/additionalDetails.html
new file mode 100644
index 0000000..27aa2af
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.storage.ListGCSBucket/additionalDetails.html
@@ -0,0 +1,144 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+<head>
+    <meta charset="utf-8"/>
+    <title>ListGCSBucket</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+<body>
+
+<h1>Streaming Versus Batch Processing</h1>
+
+<p>
+    ListGCSBucket performs a listing of all GCS Objects that it encounters in the configured GCS bucket.
+    There are two common, broadly defined use cases.
+</p>
+
+<h3>Streaming Use Case</h3>
+
+<p>
+    By default, the Processor will create a separate FlowFile for each object in the bucket and add attributes for filename, bucket, etc.
+    A common use case is to connect ListGCSBucket to the FetchGCSObject processor. These two processors used in conjunction with one another provide the ability to
+    easily monitor a bucket and fetch the contents of any new object as it lands in GCS in an efficient streaming fashion.
+</p>
+
+<h3>Batch Use Case</h3>
+<p>
+    Another common use case is the desire to process all newly arriving objects in a given bucket, and to then perform some action
+    only when all objects have completed their processing. The above approach of streaming the data makes this difficult, because NiFi is inherently
+    a streaming platform in that there is no "job" that has a beginning and an end. Data is simply picked up as it becomes available.
+</p>
+
+<p>
+    To solve this, the ListGCSBucket Processor can optionally be configured with a Record Writer. When a Record Writer is configured, a single
+    FlowFile will be created that will contain a Record for each object in the bucket, instead of a separate FlowFile per object.
+    See the documentation for ListFile for an example of how to build a dataflow that allows for processing all of the objects before proceeding
+    with any other step.
+</p>
+
+<p>
+    One important difference between the data produced by ListFile and ListGCSBucket, though, is the structure of the Records that are emitted. The Records
+    emitted by ListFile have a different schema than those emitted by ListGCSBucket. ListGCSBucket emits records that follow the following schema (in Avro format):
+</p>
+
+<code>
+    <pre>
+{
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [{
+    "name": "bucket",
+    "type": "string"
+  }, {
+    "name": "name",
+    "type": "string"
+  }, {
+    "name": "size",
+    "type": ["null", "long"]
+  }, {
+    "name": "cacheControl",
+    "type": ["null", "string"]
+  }, {
+    "name": "componentCount",
+    "type": ["null", "int"]
+  }, {
+    "name": "contentDisposition",
+    "type": ["null", "long"]
+  }, {
+    "name": "contentEncoding",
+    "type": ["null", "string"]
+  }, {
+    "name": "contentLanguage",
+    "type": ["null", "string"]
+  }, {
+    "name": "crc32c",
+    "type": ["null", "string"]
+  }, {
+    "name": "createTime",
+    "type": ["null", {
+      "type": "long",
+      "logicalType": "timestamp-millis"
+    }]
+  }, {
+    "name": "updateTime",
+    "type": ["null", {
+      "type": "long",
+      "logicalType": "timestamp-millis"
+    }]
+  }, {
+    "name": "encryptionAlgorithm",
+    "type": ["null", "string"]
+  }, {
+    "name": "encryptionKeySha256",
+    "type": ["null", "string"]
+  }, {
+    "name": "etag",
+    "type": ["null", "string"]
+  }, {
+    "name": "generatedId",
+    "type": ["null", "string"]
+  }, {
+    "name": "generation",
+    "type": ["null", "long"]
+  }, {
+    "name": "md5",
+    "type": ["null", "string"]
+  }, {
+    "name": "mediaLink",
+    "type": ["null", "string"]
+  }, {
+    "name": "metageneration",
+    "type": ["null", "long"]
+  }, {
+    "name": "owner",
+    "type": ["null", "string"]
+  }, {
+    "name": "ownerType",
+    "type": ["null", "string"]
+  }, {
+    "name": "uri",
+    "type": ["null", "string"]
+  }]
+}
+    </pre>
+</code>
+
+
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index 1a925d0..a355755 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -37,6 +37,7 @@ import org.mockito.Mock;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
 import static org.apache.nifi.processors.gcp.storage.StorageAttributes.CACHE_CONTROL_ATTR;
@@ -62,7 +63,6 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
@@ -130,22 +130,15 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         addRequiredPropertiesToRunner(runner);
         runner.assertValid();
 
-        assertEquals("Cluster StateMap should be fresh (version -1L)",
-                -1L,
-                runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion()
-        );
-
-        assertNull(processor.currentKeys);
+        assertEquals("Cluster StateMap should be fresh (version -1L)", -1L, runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion());
+        assertTrue(processor.getStateKeys().isEmpty());
 
         processor.restoreState(runner.getProcessContext());
 
-        assertNotNull(processor.currentKeys);
-        assertEquals(
-                0L,
-                processor.currentTimestamp
-        );
+        assertTrue(processor.getStateKeys().isEmpty());
+        assertEquals(0L, processor.getStateTimestamp());
 
-        assertTrue(processor.currentKeys.isEmpty());
+        assertTrue(processor.getStateKeys().isEmpty());
 
     }
 
@@ -165,21 +158,15 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         runner.getStateManager().setState(state, Scope.CLUSTER);
 
-        assertNull(processor.currentKeys);
-        assertEquals(
-                0L,
-                processor.currentTimestamp
-        );
+        assertTrue(processor.getStateKeys().isEmpty());
+        assertEquals(0L, processor.getStateTimestamp());
 
         processor.restoreState(runner.getProcessContext());
 
-        assertNotNull(processor.currentKeys);
-        assertTrue(processor.currentKeys.contains("test-key-0"));
-        assertTrue(processor.currentKeys.contains("test-key-1"));
-        assertEquals(
-                4L,
-                processor.currentTimestamp
-        );
+        assertNotNull(processor.getStateKeys());
+        assertTrue(processor.getStateKeys().contains("test-key-0"));
+        assertTrue(processor.getStateKeys().contains("test-key-1"));
+        assertEquals(4L, processor.getStateTimestamp());
     }
 
     @Test
@@ -195,21 +182,11 @@ public class ListGCSBucketTest extends AbstractGCSTest {
                 runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion()
         );
 
-        processor.currentKeys = ImmutableSet.of(
-                "test-key-0",
-                "test-key-1"
-        );
-
-        processor.currentTimestamp = 4L;
-
-        processor.persistState(runner.getProcessContext());
+        final Set<String> keys = ImmutableSet.of("test-key-0", "test-key-1");
+        processor.persistState(runner.getProcessContext(), 4L, keys);
 
         final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER);
-        assertEquals(
-                "Cluster StateMap should have been written to",
-                1L,
-                stateMap.getVersion()
-        );
+        assertEquals("Cluster StateMap should have been written to", 1L, stateMap.getVersion());
 
         assertEquals(
                 ImmutableMap.of(
@@ -231,16 +208,11 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         runner.getStateManager().setFailOnStateSet(Scope.CLUSTER, true);
 
-        processor.currentKeys = ImmutableSet.of(
-                "test-key-0",
-                "test-key-1"
-        );
-
-        processor.currentTimestamp = 4L;
+        final Set<String> keys = ImmutableSet.of("test-key-0", "test-key-1");
 
         assertTrue(runner.getLogger().getErrorMessages().isEmpty());
 
-        processor.persistState(runner.getProcessContext());
+        processor.persistState(runner.getProcessContext(), 4L, keys);
 
         // The method should have caught the error and reported it to the logger.
         final List<LogMessage> logMessages = runner.getLogger().getErrorMessages();
@@ -279,13 +251,9 @@ public class ListGCSBucketTest extends AbstractGCSTest {
                 buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
         );
 
-        when(mockBlobPage.getValues())
-                .thenReturn(mockList);
-
+        when(mockBlobPage.getValues()).thenReturn(mockList);
         when(mockBlobPage.getNextPage()).thenReturn(null);
-
-        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPage);
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
@@ -296,49 +264,18 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
 
         MockFlowFile flowFile = successes.get(0);
-        assertEquals(
-                "blob-bucket-1",
-                flowFile.getAttribute(BUCKET_ATTR)
-        );
-
-        assertEquals(
-                "blob-key-1",
-                flowFile.getAttribute(KEY_ATTR)
-        );
-
-        assertEquals(
-                "2",
-                flowFile.getAttribute(UPDATE_TIME_ATTR)
-        );
+        assertEquals("blob-bucket-1", flowFile.getAttribute(BUCKET_ATTR));
+        assertEquals("blob-key-1", flowFile.getAttribute(KEY_ATTR));
+        assertEquals("2", flowFile.getAttribute(UPDATE_TIME_ATTR));
 
         flowFile = successes.get(1);
-        assertEquals(
-                "blob-bucket-2",
-                flowFile.getAttribute(BUCKET_ATTR)
-        );
-
-        assertEquals(
-                "blob-key-2",
-                flowFile.getAttribute(KEY_ATTR)
-        );
+        assertEquals("blob-bucket-2", flowFile.getAttribute(BUCKET_ATTR));
+        assertEquals("blob-key-2",flowFile.getAttribute(KEY_ATTR));
+        assertEquals("3", flowFile.getAttribute(UPDATE_TIME_ATTR));
 
-        assertEquals(
-                "3",
-                flowFile.getAttribute(UPDATE_TIME_ATTR)
-        );
-
-        assertEquals(
-                3L,
-                processor.currentTimestamp
-        );
-
-        assertEquals(
-                ImmutableSet.of(
-                        "blob-key-2"
-                ),
-                processor.currentKeys
-        );
+        assertEquals(3L, processor.getStateTimestamp());
 
+        assertEquals(ImmutableSet.of("blob-key-2"), processor.getStateKeys());
     }
 
     @Test
@@ -353,13 +290,9 @@ public class ListGCSBucketTest extends AbstractGCSTest {
                 buildMockBlob("blob-bucket-1", "blob-key-1", 2L)
         );
 
-        when(mockBlobPage.getValues())
-                .thenReturn(mockList);
-
+        when(mockBlobPage.getValues()).thenReturn(mockList);
         when(mockBlobPage.getNextPage()).thenReturn(null);
-
-        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPage);
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.enqueue("test2");
@@ -368,16 +301,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
         runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
 
-        assertEquals(
-                "blob-key-1",
-                runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_KEY_PREFIX+"0")
-        );
-
-        assertEquals(
-                "2",
-                runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_TIMESTAMP)
-        );
-
+        assertEquals("blob-key-1", runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_KEY_PREFIX+"0"));
+        assertEquals("2", runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_TIMESTAMP));
     }
 
 
@@ -392,24 +317,16 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of();
 
-        when(mockBlobPage.getValues())
-                .thenReturn(mockList);
-
+        when(mockBlobPage.getValues()).thenReturn(mockList);
         when(mockBlobPage.getNextPage()).thenReturn(null);
-
-        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPage);
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
 
         runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 0);
 
-        assertEquals(
-                "No state should be persisted on an empty return",
-                -1L,
-                runner.getStateManager().getState(Scope.CLUSTER).getVersion()
-        );
+        assertEquals("No state should be persisted on an empty return", -1L, runner.getStateManager().getState(Scope.CLUSTER).getVersion());
     }
 
     @Test
@@ -420,10 +337,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         addRequiredPropertiesToRunner(runner);
         runner.assertValid();
 
-        final Map<String, String> state = ImmutableMap.of(
-                ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
-                ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1"
-        );
+        final Map<String, String> state = ImmutableMap.of(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L), ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1");
 
         runner.getStateManager().setState(state, Scope.CLUSTER);
 
@@ -432,13 +346,9 @@ public class ListGCSBucketTest extends AbstractGCSTest {
                 buildMockBlob("blob-bucket-2", "blob-key-2", 2L)
         );
 
-        when(mockBlobPage.getValues())
-                .thenReturn(mockList);
-
+        when(mockBlobPage.getValues()).thenReturn(mockList);
         when(mockBlobPage.getNextPage()).thenReturn(null);
-
-        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPage);
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
@@ -449,32 +359,11 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
 
         MockFlowFile flowFile = successes.get(0);
-        assertEquals(
-                "blob-bucket-2",
-                flowFile.getAttribute(BUCKET_ATTR)
-        );
-
-        assertEquals(
-                "blob-key-2",
-                flowFile.getAttribute(KEY_ATTR)
-        );
-
-        assertEquals(
-                "2",
-                flowFile.getAttribute(UPDATE_TIME_ATTR)
-        );
-
-        assertEquals(
-                2L,
-                processor.currentTimestamp
-        );
-
-        assertEquals(
-                ImmutableSet.of(
-                        "blob-key-2"
-                ),
-                processor.currentKeys
-        );
+        assertEquals("blob-bucket-2", flowFile.getAttribute(BUCKET_ATTR));
+        assertEquals("blob-key-2", flowFile.getAttribute(KEY_ATTR));
+        assertEquals("2", flowFile.getAttribute(UPDATE_TIME_ATTR));
+        assertEquals(2L, processor.getStateTimestamp());
+        assertEquals(ImmutableSet.of("blob-key-2"), processor.getStateKeys());
     }
 
     @Test
@@ -514,32 +403,12 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
 
         MockFlowFile flowFile = successes.get(0);
-        assertEquals(
-                "blob-bucket-1",
-                flowFile.getAttribute(BUCKET_ATTR)
-        );
+        assertEquals("blob-bucket-1", flowFile.getAttribute(BUCKET_ATTR));
+        assertEquals("blob-key-1", flowFile.getAttribute(KEY_ATTR));
+        assertEquals("2", flowFile.getAttribute(UPDATE_TIME_ATTR));
+        assertEquals(2L, processor.getStateTimestamp());
 
-        assertEquals(
-                "blob-key-1",
-                flowFile.getAttribute(KEY_ATTR)
-        );
-
-        assertEquals(
-                "2",
-                flowFile.getAttribute(UPDATE_TIME_ATTR)
-        );
-
-        assertEquals(
-                2L,
-                processor.currentTimestamp
-        );
-
-        assertEquals(
-                ImmutableSet.of(
-                        "blob-key-1"
-                ),
-                processor.currentKeys
-        );
+        assertEquals(ImmutableSet.of("blob-key-1"), processor.getStateKeys());
     }
 
     @Test
@@ -580,49 +449,16 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
 
         MockFlowFile flowFile = successes.get(0);
-        assertEquals(
-                "blob-bucket-1",
-                flowFile.getAttribute(BUCKET_ATTR)
-        );
-
-        assertEquals(
-                "blob-key-1",
-                flowFile.getAttribute(KEY_ATTR)
-        );
-
-        assertEquals(
-                "2",
-                flowFile.getAttribute(UPDATE_TIME_ATTR)
-        );
+        assertEquals("blob-bucket-1", flowFile.getAttribute(BUCKET_ATTR));
+        assertEquals("blob-key-1", flowFile.getAttribute(KEY_ATTR));
+        assertEquals("2", flowFile.getAttribute(UPDATE_TIME_ATTR));
 
         flowFile = successes.get(1);
-        assertEquals(
-                "blob-bucket-3",
-                flowFile.getAttribute(BUCKET_ATTR)
-        );
-
-        assertEquals(
-                "blob-key-3",
-                flowFile.getAttribute(KEY_ATTR)
-        );
-
-        assertEquals(
-                "2",
-                flowFile.getAttribute(UPDATE_TIME_ATTR)
-        );
-
-        assertEquals(
-                2L,
-                processor.currentTimestamp
-        );
-
-        assertEquals(
-                ImmutableSet.of(
-                        "blob-key-1",
-                        "blob-key-3"
-                ),
-                processor.currentKeys
-        );
+        assertEquals("blob-bucket-3", flowFile.getAttribute(BUCKET_ATTR));
+        assertEquals("blob-key-3",flowFile.getAttribute(KEY_ATTR));
+        assertEquals("2", flowFile.getAttribute(UPDATE_TIME_ATTR));
+        assertEquals(2L, processor.getStateTimestamp());
+        assertEquals(ImmutableSet.of("blob-key-1", "blob-key-3"), processor.getStateKeys());
     }
 
     @Test
@@ -663,49 +499,16 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
 
         MockFlowFile flowFile = successes.get(0);
-        assertEquals(
-                "blob-bucket-1",
-                flowFile.getAttribute(BUCKET_ATTR)
-        );
-
-        assertEquals(
-                "blob-key-1",
-                flowFile.getAttribute(KEY_ATTR)
-        );
-
-        assertEquals(
-                "1",
-                flowFile.getAttribute(UPDATE_TIME_ATTR)
-        );
+        assertEquals("blob-bucket-1", flowFile.getAttribute(BUCKET_ATTR));
+        assertEquals("blob-key-1",flowFile.getAttribute(KEY_ATTR));
+        assertEquals("1",flowFile.getAttribute(UPDATE_TIME_ATTR));
 
         flowFile = successes.get(1);
-        assertEquals(
-                "blob-bucket-3",
-                flowFile.getAttribute(BUCKET_ATTR)
-        );
-
-        assertEquals(
-                "blob-key-3",
-                flowFile.getAttribute(KEY_ATTR)
-        );
-
-        assertEquals(
-                "1",
-                flowFile.getAttribute(UPDATE_TIME_ATTR)
-        );
-
-        assertEquals(
-                1L,
-                processor.currentTimestamp
-        );
-
-        assertEquals(
-                ImmutableSet.of(
-                        "blob-key-1",
-                        "blob-key-3"
-                ),
-                processor.currentKeys
-        );
+        assertEquals("blob-bucket-3", flowFile.getAttribute(BUCKET_ATTR));
+        assertEquals("blob-key-3", flowFile.getAttribute(KEY_ATTR));
+        assertEquals("1", flowFile.getAttribute(UPDATE_TIME_ATTR));
+        assertEquals(1L, processor.getStateTimestamp());
+        assertEquals(ImmutableSet.of("blob-key-1", "blob-key-3"), processor.getStateKeys());
     }
 
     @Test
@@ -743,110 +546,35 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of(blob);
 
-        when(mockBlobPage.getValues())
-                .thenReturn(mockList);
-
+        when(mockBlobPage.getValues()).thenReturn(mockList);
         when(mockBlobPage.getNextPage()).thenReturn(null);
-
-        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPage);
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
 
-
         runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS);
         runner.assertTransferCount(FetchGCSObject.REL_SUCCESS, 1);
         final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0);
-        assertEquals(
-                CACHE_CONTROL,
-                flowFile.getAttribute(CACHE_CONTROL_ATTR)
-        );
-
-        assertEquals(
-                COMPONENT_COUNT,
-                Integer.valueOf(flowFile.getAttribute(COMPONENT_COUNT_ATTR))
-        );
-
-        assertEquals(
-                CONTENT_ENCODING,
-                flowFile.getAttribute(CONTENT_ENCODING_ATTR)
-        );
-
-        assertEquals(
-                CONTENT_LANGUAGE,
-                flowFile.getAttribute(CONTENT_LANGUAGE_ATTR)
-        );
-
-        assertEquals(
-                CONTENT_TYPE,
-                flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())
-        );
-
-        assertEquals(
-                CRC32C,
-                flowFile.getAttribute(CRC32C_ATTR)
-        );
-
-        assertEquals(
-                ENCRYPTION,
-                flowFile.getAttribute(ENCRYPTION_ALGORITHM_ATTR)
-        );
-
-        assertEquals(
-                ENCRYPTION_SHA256,
-                flowFile.getAttribute(ENCRYPTION_SHA256_ATTR)
-        );
-
-        assertEquals(
-                ETAG,
-                flowFile.getAttribute(ETAG_ATTR)
-        );
-
-        assertEquals(
-                GENERATED_ID,
-                flowFile.getAttribute(GENERATED_ID_ATTR)
-        );
-
-        assertEquals(
-                GENERATION,
-                Long.valueOf(flowFile.getAttribute(GENERATION_ATTR))
-        );
-
-        assertEquals(
-                MD5,
-                flowFile.getAttribute(MD5_ATTR)
-        );
-
-        assertEquals(
-                MEDIA_LINK,
-                flowFile.getAttribute(MEDIA_LINK_ATTR)
-        );
-
-        assertEquals(
-                METAGENERATION,
-                Long.valueOf(flowFile.getAttribute(METAGENERATION_ATTR))
-        );
-
-        assertEquals(
-                URI,
-                flowFile.getAttribute(URI_ATTR)
-        );
-
-        assertEquals(
-                CONTENT_DISPOSITION,
-                flowFile.getAttribute(CONTENT_DISPOSITION_ATTR)
-        );
-
-        assertEquals(
-                CREATE_TIME,
-                Long.valueOf(flowFile.getAttribute(CREATE_TIME_ATTR))
-        );
-
-        assertEquals(
-                UPDATE_TIME,
-                Long.valueOf(flowFile.getAttribute(UPDATE_TIME_ATTR))
-        );
+        assertEquals(CACHE_CONTROL, flowFile.getAttribute(CACHE_CONTROL_ATTR));
+
+        assertEquals(COMPONENT_COUNT,Integer.valueOf(flowFile.getAttribute(COMPONENT_COUNT_ATTR)));
+        assertEquals(CONTENT_ENCODING, flowFile.getAttribute(CONTENT_ENCODING_ATTR));
+        assertEquals(CONTENT_LANGUAGE, flowFile.getAttribute(CONTENT_LANGUAGE_ATTR));
+        assertEquals(CONTENT_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()));
+        assertEquals(CRC32C, flowFile.getAttribute(CRC32C_ATTR));
+        assertEquals(ENCRYPTION, flowFile.getAttribute(ENCRYPTION_ALGORITHM_ATTR));
+        assertEquals(ENCRYPTION_SHA256, flowFile.getAttribute(ENCRYPTION_SHA256_ATTR));
+        assertEquals(ETAG, flowFile.getAttribute(ETAG_ATTR));
+        assertEquals(GENERATED_ID, flowFile.getAttribute(GENERATED_ID_ATTR));
+        assertEquals(GENERATION, Long.valueOf(flowFile.getAttribute(GENERATION_ATTR)));
+        assertEquals(MD5, flowFile.getAttribute(MD5_ATTR));
+        assertEquals(MEDIA_LINK, flowFile.getAttribute(MEDIA_LINK_ATTR));
+        assertEquals(METAGENERATION, Long.valueOf(flowFile.getAttribute(METAGENERATION_ATTR)));
+        assertEquals(URI, flowFile.getAttribute(URI_ATTR));
+        assertEquals(CONTENT_DISPOSITION, flowFile.getAttribute(CONTENT_DISPOSITION_ATTR));
+        assertEquals(CREATE_TIME, Long.valueOf(flowFile.getAttribute(CREATE_TIME_ATTR)));
+        assertEquals(UPDATE_TIME, Long.valueOf(flowFile.getAttribute(UPDATE_TIME_ATTR)));
     }
 
     @Test
@@ -864,31 +592,19 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of(blob);
 
-        when(mockBlobPage.getValues())
-                .thenReturn(mockList);
-
+        when(mockBlobPage.getValues()).thenReturn(mockList);
         when(mockBlobPage.getNextPage()).thenReturn(null);
-
-        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPage);
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
 
-
         runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS);
         runner.assertTransferCount(FetchGCSObject.REL_SUCCESS, 1);
-        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0);
-        assertEquals(
-                OWNER_USER_EMAIL,
-                flowFile.getAttribute(OWNER_ATTR)
-        );
-
-        assertEquals(
-                "user",
-                flowFile.getAttribute(OWNER_TYPE_ATTR)
-        );
 
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0);
+        assertEquals(OWNER_USER_EMAIL, flowFile.getAttribute(OWNER_ATTR));
+        assertEquals("user", flowFile.getAttribute(OWNER_TYPE_ATTR));
     }
 
 
@@ -907,31 +623,19 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of(blob);
 
-        when(mockBlobPage.getValues())
-                .thenReturn(mockList);
-
+        when(mockBlobPage.getValues()).thenReturn(mockList);
         when(mockBlobPage.getNextPage()).thenReturn(null);
-
-        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPage);
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
 
-
         runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS);
         runner.assertTransferCount(FetchGCSObject.REL_SUCCESS, 1);
-        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0);
-        assertEquals(
-                OWNER_GROUP_EMAIL,
-                flowFile.getAttribute(OWNER_ATTR)
-        );
-
-        assertEquals(
-                "group",
-                flowFile.getAttribute(OWNER_TYPE_ATTR)
-        );
 
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0);
+        assertEquals(OWNER_GROUP_EMAIL, flowFile.getAttribute(OWNER_ATTR));
+        assertEquals("group", flowFile.getAttribute(OWNER_TYPE_ATTR));
     }
 
 
@@ -950,32 +654,19 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         when(blob.getOwner()).thenReturn(mockDomain);
 
         final Iterable<Blob> mockList = ImmutableList.of(blob);
-
-        when(mockBlobPage.getValues())
-                .thenReturn(mockList);
-
+        when(mockBlobPage.getValues()).thenReturn(mockList);
         when(mockBlobPage.getNextPage()).thenReturn(null);
-
-        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPage);
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
 
-
         runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS);
         runner.assertTransferCount(FetchGCSObject.REL_SUCCESS, 1);
-        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0);
-        assertEquals(
-                OWNER_DOMAIN,
-                flowFile.getAttribute(OWNER_ATTR)
-        );
-
-        assertEquals(
-                "domain",
-                flowFile.getAttribute(OWNER_TYPE_ATTR)
-        );
 
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0);
+        assertEquals(OWNER_DOMAIN, flowFile.getAttribute(OWNER_ATTR));
+        assertEquals("domain", flowFile.getAttribute(OWNER_TYPE_ATTR));
     }
 
 
@@ -995,31 +686,19 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of(blob);
 
-        when(mockBlobPage.getValues())
-                .thenReturn(mockList);
-
+        when(mockBlobPage.getValues()).thenReturn(mockList);
         when(mockBlobPage.getNextPage()).thenReturn(null);
-
-        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPage);
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
 
-
         runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS);
         runner.assertTransferCount(FetchGCSObject.REL_SUCCESS, 1);
-        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0);
-        assertEquals(
-                OWNER_PROJECT_ID,
-                flowFile.getAttribute(OWNER_ATTR)
-        );
-
-        assertEquals(
-                "project",
-                flowFile.getAttribute(OWNER_TYPE_ATTR)
-        );
 
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS).get(0);
+        assertEquals(OWNER_PROJECT_ID, flowFile.getAttribute(OWNER_ATTR));
+        assertEquals("project", flowFile.getAttribute(OWNER_TYPE_ATTR));
     }
 
 
@@ -1033,23 +712,16 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         final Iterable<Blob> mockList = ImmutableList.of();
 
-        when(mockBlobPage.getValues())
-                .thenReturn(mockList);
-
+        when(mockBlobPage.getValues()).thenReturn(mockList);
         when(mockBlobPage.getNextPage()).thenReturn(null);
-
-        when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
-                .thenReturn(mockBlobPage);
+        when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
 
         runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
         runner.enqueue("test");
         runner.run();
 
         runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 0);
-        assertEquals(
-                1,
-                runner.getLogger().getErrorMessages().size()
-        );
+        assertEquals(1, runner.getLogger().getErrorMessages().size());
     }
 
     @Test
@@ -1059,31 +731,19 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
 
-        runner.setProperty(
-                ListGCSBucket.PREFIX,
-                PREFIX
-        );
-
+        runner.setProperty(ListGCSBucket.PREFIX, PREFIX);
         runner.assertValid();
 
         final Iterable<Blob> mockList = ImmutableList.of();
 
-        when(mockBlobPage.getValues())
-                .thenReturn(mockList);
-
+        when(mockBlobPage.getValues()).thenReturn(mockList);
         when(mockBlobPage.getNextPage()).thenReturn(null);
-
-        when(storage.list(anyString(), argumentCaptor.capture()))
-                .thenReturn(mockBlobPage);
+        when(storage.list(anyString(), argumentCaptor.capture())).thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
 
-        assertEquals(
-                Storage.BlobListOption.prefix(PREFIX),
-                argumentCaptor.getValue()
-        );
-
+        assertEquals(Storage.BlobListOption.prefix(PREFIX), argumentCaptor.getValue());
     }
 
 
@@ -1094,30 +754,19 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
 
-        runner.setProperty(
-                ListGCSBucket.USE_GENERATIONS,
-                String.valueOf(USE_GENERATIONS)
-        );
+        runner.setProperty(ListGCSBucket.USE_GENERATIONS, String.valueOf(USE_GENERATIONS));
         runner.assertValid();
 
         final Iterable<Blob> mockList = ImmutableList.of();
 
-        when(mockBlobPage.getValues())
-                .thenReturn(mockList);
-
+        when(mockBlobPage.getValues()).thenReturn(mockList);
         when(mockBlobPage.getNextPage()).thenReturn(null);
-
-        when(storage.list(anyString(), argumentCaptor.capture()))
-                .thenReturn(mockBlobPage);
+        when(storage.list(anyString(), argumentCaptor.capture())).thenReturn(mockBlobPage);
 
         runner.enqueue("test");
         runner.run();
 
         Storage.BlobListOption option = argumentCaptor.getValue();
-
-        assertEquals(
-                Storage.BlobListOption.versions(true),
-                option
-        );
+        assertEquals(Storage.BlobListOption.versions(true), option);
     }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
index 1439227..9e90425 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml
@@ -88,6 +88,16 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-properties</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 31f582f..abbceb2 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -50,10 +50,22 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.security.PrivilegedExceptionAction;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -98,6 +110,37 @@ import java.util.regex.Pattern;
 @SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
 public class ListHDFS extends AbstractHadoopProcessor {
 
+    private static final RecordSchema RECORD_SCHEMA;
+    private static final String FILENAME = "filename";
+    private static final String PATH = "path";
+    private static final String IS_DIRECTORY = "directory";
+    private static final String SIZE = "size";
+    private static final String LAST_MODIFIED = "lastModified";
+    private static final String PERMISSIONS = "permissions";
+    private static final String OWNER = "owner";
+    private static final String GROUP = "group";
+    private static final String REPLICATION = "replication";
+    private static final String IS_SYM_LINK = "symLink";
+    private static final String IS_ENCRYPTED = "encrypted";
+    private static final String IS_ERASURE_CODED = "erasureCoded";
+
+    static {
+        final List<RecordField> recordFields = new ArrayList<>();
+        recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(PATH, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(IS_DIRECTORY, RecordFieldType.BOOLEAN.getDataType(), false));
+        recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
+        recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
+        recordFields.add(new RecordField(PERMISSIONS, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(GROUP, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(REPLICATION, RecordFieldType.INT.getDataType()));
+        recordFields.add(new RecordField(IS_SYM_LINK, RecordFieldType.BOOLEAN.getDataType()));
+        recordFields.add(new RecordField(IS_ENCRYPTED, RecordFieldType.BOOLEAN.getDataType()));
+        recordFields.add(new RecordField(IS_ERASURE_CODED, RecordFieldType.BOOLEAN.getDataType()));
+        RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
+    }
+
     @Deprecated
     public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
         .name("Distributed Cache Service")
@@ -114,6 +157,15 @@ public class ListHDFS extends AbstractHadoopProcessor {
         .defaultValue("true")
         .build();
 
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
+            "all entities will be written to a single FlowFile.")
+        .required(false)
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .build();
+
     public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
         .name("File Filter")
         .description("Only files whose names match the given regular expression will be picked up")
@@ -208,6 +260,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
         props.add(DISTRIBUTED_CACHE_SERVICE);
         props.add(DIRECTORY);
         props.add(RECURSE_SUBDIRS);
+        props.add(RECORD_WRITER);
         props.add(FILE_FILTER);
         props.add(FILE_FILTER_MODE);
         props.add(MIN_AGE);
@@ -394,7 +447,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
             statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
             getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
         } catch (final IOException | IllegalArgumentException e) {
-            getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
+            getLogger().error("Failed to perform listing of HDFS", e);
             return;
         } catch (final InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -405,12 +458,21 @@ public class ListHDFS extends AbstractHadoopProcessor {
         final Set<FileStatus> listable = determineListable(statuses, context);
         getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()});
 
-        for (final FileStatus status : listable) {
-            final Map<String, String> attributes = createAttributes(status);
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            session.transfer(flowFile, REL_SUCCESS);
+        // Create FlowFile(s) for the listing, if there are any
+        if (!listable.isEmpty()) {
+            if (context.getProperty(RECORD_WRITER).isSet()) {
+                try {
+                    createRecords(listable, context, session);
+                } catch (final IOException | SchemaNotFoundException e) {
+                    getLogger().error("Failed to write listing of HDFS", e);
+                    return;
+                }
+            } else {
+                createFlowFiles(listable, session);
+            }
+        }
 
+        for (final FileStatus status : listable) {
             final long fileModTime = status.getModificationTime();
             if (fileModTime > latestTimestampEmitted) {
                 latestTimestampEmitted = fileModTime;
@@ -438,6 +500,65 @@ public class ListHDFS extends AbstractHadoopProcessor {
         }
     }
 
+    private void createFlowFiles(final Set<FileStatus> fileStatuses, final ProcessSession session) {
+        for (final FileStatus status : fileStatuses) {
+            final Map<String, String> attributes = createAttributes(status);
+            FlowFile flowFile = session.create();
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            session.transfer(flowFile, REL_SUCCESS);
+        }
+    }
+
+    private void createRecords(final Set<FileStatus> fileStatuses, final ProcessContext context, final ProcessSession session) throws IOException, SchemaNotFoundException {
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        FlowFile flowFile = session.create();
+        final WriteResult writeResult;
+        try (final OutputStream out = session.write(flowFile);
+             final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
+
+            recordSetWriter.beginRecordSet();
+            for (final FileStatus fileStatus : fileStatuses) {
+                final Record record = createRecord(fileStatus);
+                recordSetWriter.write(record);
+            }
+
+            writeResult = recordSetWriter.finishRecordSet();
+        }
+
+        final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
+        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+        flowFile = session.putAllAttributes(flowFile, attributes);
+
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+    private Record createRecord(final FileStatus fileStatus) {
+        final Map<String, Object> values = new HashMap<>();
+        values.put(FILENAME, fileStatus.getPath().getName());
+        values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
+        values.put(OWNER, fileStatus.getOwner());
+        values.put(GROUP, fileStatus.getGroup());
+        values.put(LAST_MODIFIED, new Timestamp(fileStatus.getModificationTime()));
+        values.put(SIZE, fileStatus.getLen());
+        values.put(REPLICATION, fileStatus.getReplication());
+
+        final FsPermission permission = fileStatus.getPermission();
+        final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
+        values.put(PERMISSIONS, perms);
+
+        values.put(IS_DIRECTORY, fileStatus.isDirectory());
+        values.put(IS_SYM_LINK, fileStatus.isSymlink());
+        values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
+        values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
+
+        return new MapRecord(getRecordSchema(), values);
+    }
+
+    private RecordSchema getRecordSchema() {
+        return RECORD_SCHEMA;
+    }
+
     private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) throws IOException, InterruptedException {
         final Set<FileStatus> statusSet = new HashSet<>();
 
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
index 804208c..732b9b7 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
@@ -17,7 +17,7 @@
 
 <head>
     <meta charset="utf-8"/>
-    <title>PutHDFS</title>
+    <title>ListHDFS</title>
     <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
 </head>
 
@@ -105,5 +105,92 @@ ListHDFS configuration:
     <li>/data/csv/2.csv</li>
     <li>/data/csv/3.csv</li>
 </ul>
+
+
+<h1>Streaming Versus Batch Processing</h1>
+
+<p>
+    ListHDFS performs a listing of all files that it encounters in the configured HDFS directory.
+    There are two common, broadly defined use cases.
+</p>
+
+<h3>Streaming Use Case</h3>
+
+<p>
+    By default, the Processor will create a separate FlowFile for each file in the directory and add attributes for filename, path, etc.
+    A common use case is to connect ListHDFS to the FetchHDFS processor. These two processors used in conjunction with one another provide the ability to
+    easily monitor a directory and fetch the contents of any new file as it lands in HDFS in an efficient streaming fashion.
+</p>
+
+<h3>Batch Use Case</h3>
+<p>
+    Another common use case is the desire to process all newly arriving files in a given directory, and to then perform some action
+    only when all files have completed their processing. The above approach of streaming the data makes this difficult, because NiFi is inherently
+    a streaming platform in that there is no "job" that has a beginning and an end. Data is simply picked up as it becomes available.
+</p>
+
+<p>
+    To solve this, the ListHDFS Processor can optionally be configured with a Record Writer. When a Record Writer is configured, a single
+    FlowFile will be created that will contain a Record for each file in the directory, instead of a separate FlowFile per file.
+    See the documentation for ListFile for an example of how to build a dataflow that allows for processing all of the files before proceeding
+    with any other step.
+</p>
+
+<p>
+    One important difference between the data produced by ListFile and ListHDFS, though, is the structure of the Records that are emitted. The Records
+    emitted by ListFile have a different schema than those emitted by ListHDFS. ListHDFS emits records that follow the following schema (in Avro format):
+</p>
+
+<code>
+    <pre>
+{
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [{
+    "name": "filename",
+    "type": "string"
+  }, {
+    "name": "path",
+    "type": "string"
+  }, {
+    "name": "directory",
+    "type": "boolean"
+  }, {
+    "name": "size",
+    "type": "long"
+  }, {
+    "name": "lastModified",
+    "type": {
+      "type": "long",
+      "logicalType": "timestamp-millis"
+    }
+  }, {
+    "name": "permissions",
+    "type": ["null", "string"]
+  }, {
+    "name": "owner",
+    "type": ["null", "string"]
+  }, {
+    "name": "group",
+    "type": ["null", "string"]
+  }, {
+    "name": "replication",
+    "type": ["null", "int"]
+  }, {
+    "name": "symLink",
+    "type": ["null", "boolean"]
+  }, {
+    "name": "encrypted",
+    "type": ["null", "boolean"]
+  }, {
+    "name": "erasureCoded",
+    "type": ["null", "boolean"]
+  }]
+}
+    </pre>
+</code>
+
+
 </body>
 </html>
diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index 18c7e34..3de9166 100644
--- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -16,40 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_FILES_VALUE;
-import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FILES_ONLY_VALUE;
-import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FULL_PATH_VALUE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Stream;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -75,6 +41,39 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_FILES_VALUE;
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FILES_ONLY_VALUE;
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FULL_PATH_VALUE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class TestListHDFS {
 
     private TestRunner runner;
@@ -712,12 +711,6 @@ public class TestListHDFS {
         Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
         runner.run();
 
-        final ArgumentCaptor<Object[]> loggingArgsCaptor = ArgumentCaptor.forClass(Object[].class);
-        verify(mockLogger, atLeastOnce()).error(anyString(), loggingArgsCaptor.capture());
-        // assert that FNFE exceptions were logged for the Directory property's value.
-        assertTrue(loggingArgsCaptor.getAllValues().stream().flatMap(Stream::of)
-                .anyMatch(o -> o instanceof FileNotFoundException && ((FileNotFoundException)o).getMessage().contains(nonExistingPath)));
-
         // assert that no files were listed
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
         // assert that no files were penalized
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
index fa7dc5c..7032652 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
@@ -38,9 +38,20 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
@@ -171,6 +182,16 @@ public class ListDatabaseTables extends AbstractProcessor {
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
+            "all entities will be written to a single FlowFile instead of adding attributes to individual FlowFiles.")
+        .required(false)
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .build();
+
+
     private static final List<PropertyDescriptor> propertyDescriptors;
     private static final Set<Relationship> relationships;
 
@@ -179,17 +200,18 @@ public class ListDatabaseTables extends AbstractProcessor {
      * Will also create a Set of relationships
      */
     static {
-        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        final List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.add(DBCP_SERVICE);
         _propertyDescriptors.add(CATALOG);
         _propertyDescriptors.add(SCHEMA_PATTERN);
         _propertyDescriptors.add(TABLE_NAME_PATTERN);
         _propertyDescriptors.add(TABLE_TYPES);
         _propertyDescriptors.add(INCLUDE_COUNT);
+        _propertyDescriptors.add(RECORD_WRITER);
         _propertyDescriptors.add(REFRESH_INTERVAL);
         propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
 
-        Set<Relationship> _relationships = new HashSet<>();
+        final Set<Relationship> _relationships = new HashSet<>();
         _relationships.add(REL_SUCCESS);
         relationships = Collections.unmodifiableSet(_relationships);
     }
@@ -227,7 +249,16 @@ public class ListDatabaseTables extends AbstractProcessor {
             throw new ProcessException(ioe);
         }
 
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final TableListingWriter writer;
+        if (writerFactory == null) {
+            writer = new AttributeTableListingWriter(session);
+        } else {
+            writer = new RecordTableListingWriter(session, writerFactory, getLogger());
+        }
+
         try (final Connection con = dbcpService.getConnection(Collections.emptyMap())) {
+            writer.beginListing();
 
             DatabaseMetaData dbMetaData = con.getMetaData();
             try (ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes)) {
@@ -239,11 +270,11 @@ public class ListDatabaseTables extends AbstractProcessor {
                     final String tableRemarks = rs.getString(5);
 
                     // Build fully-qualified name
-                    String fqn = Stream.of(tableCatalog, tableSchema, tableName)
-                      .filter(segment -> !StringUtils.isEmpty(segment))
-                      .collect(Collectors.joining("."));
+                    final String fqn = Stream.of(tableCatalog, tableSchema, tableName)
+                        .filter(segment -> !StringUtils.isEmpty(segment))
+                        .collect(Collectors.joining("."));
 
-                    String lastTimestampForTable = stateMapProperties.get(fqn);
+                    final String lastTimestampForTable = stateMapProperties.get(fqn);
                     boolean refreshTable = true;
                     try {
                         // Refresh state if the interval has elapsed
@@ -252,8 +283,8 @@ public class ListDatabaseTables extends AbstractProcessor {
                         if (!StringUtils.isEmpty(lastTimestampForTable)) {
                             lastRefreshed = Long.parseLong(lastTimestampForTable);
                         }
-                        if (lastRefreshed == -1 || (refreshInterval > 0 && currentTime >= (lastRefreshed
-                                                                                           + refreshInterval))) {
+
+                        if (lastRefreshed == -1 || (refreshInterval > 0 && currentTime >= (lastRefreshed + refreshInterval))) {
                             stateMapProperties.remove(lastTimestampForTable);
                         } else {
                             refreshTable = false;
@@ -265,9 +296,11 @@ public class ListDatabaseTables extends AbstractProcessor {
                         context.yield();
                         return;
                     }
+
                     if (refreshTable) {
-                        FlowFile flowFile = session.create();
                         logger.info("Found {}: {}", new Object[] {tableType, fqn});
+                        final Map<String, String> tableInformation = new HashMap<>();
+
                         if (includeCount) {
                             try (Statement st = con.createStatement()) {
                                 final String countQuery = "SELECT COUNT(1) FROM " + fqn;
@@ -275,41 +308,42 @@ public class ListDatabaseTables extends AbstractProcessor {
                                 logger.debug("Executing query: {}", new Object[] {countQuery});
                                 try (ResultSet countResult = st.executeQuery(countQuery)) {
                                     if (countResult.next()) {
-                                        flowFile = session
-                                          .putAttribute(flowFile, DB_TABLE_COUNT,
-                                            Long.toString(countResult.getLong(1)));
+                                        tableInformation.put(DB_TABLE_COUNT, Long.toString(countResult.getLong(1)));
                                     }
                                 }
-                            } catch (SQLException se) {
+                            } catch (final SQLException se) {
                                 logger.error("Couldn't get row count for {}", new Object[] {fqn});
-                                session.remove(flowFile);
                                 continue;
                             }
                         }
+
                         if (tableCatalog != null) {
-                            flowFile = session.putAttribute(flowFile, DB_TABLE_CATALOG, tableCatalog);
+                            tableInformation.put(DB_TABLE_CATALOG, tableCatalog);
                         }
                         if (tableSchema != null) {
-                            flowFile = session.putAttribute(flowFile, DB_TABLE_SCHEMA, tableSchema);
+                            tableInformation.put(DB_TABLE_SCHEMA, tableSchema);
                         }
-                        flowFile = session.putAttribute(flowFile, DB_TABLE_NAME, tableName);
-                        flowFile = session.putAttribute(flowFile, DB_TABLE_FULLNAME, fqn);
-                        flowFile = session.putAttribute(flowFile, DB_TABLE_TYPE, tableType);
+                        tableInformation.put(DB_TABLE_NAME, tableName);
+                        tableInformation.put(DB_TABLE_FULLNAME, fqn);
+                        tableInformation.put(DB_TABLE_TYPE, tableType);
                         if (tableRemarks != null) {
-                            flowFile = session.putAttribute(flowFile, DB_TABLE_REMARKS, tableRemarks);
+                            tableInformation.put(DB_TABLE_REMARKS, tableRemarks);
                         }
 
                         String transitUri;
                         try {
                             transitUri = dbMetaData.getURL();
-                        } catch (SQLException sqle) {
+                        } catch (final SQLException sqle) {
                             transitUri = "<unknown>";
                         }
-                        session.getProvenanceReporter().receive(flowFile, transitUri);
-                        session.transfer(flowFile, REL_SUCCESS);
+
+                        writer.addToListing(tableInformation, transitUri);
+
                         stateMapProperties.put(fqn, Long.toString(System.currentTimeMillis()));
                     }
                 }
+
+                writer.finishListing();
             }
             // Update the timestamps for listed tables
             if (stateMap.getVersion() == -1) {
@@ -318,8 +352,148 @@ public class ListDatabaseTables extends AbstractProcessor {
                 stateManager.replace(stateMap, stateMapProperties, Scope.CLUSTER);
             }
 
-        } catch (final SQLException | IOException e) {
+        } catch (final SQLException | IOException | SchemaNotFoundException e) {
+            writer.finishListingExceptionally(e);
+            session.rollback();
             throw new ProcessException(e);
         }
     }
+
+    interface TableListingWriter {
+        void beginListing() throws IOException, SchemaNotFoundException;
+
+        void addToListing(Map<String, String> tableInformation, String transitUri) throws IOException;
+
+        void finishListing() throws IOException;
+
+        void finishListingExceptionally(Exception cause);
+    }
+
+
+    private static class AttributeTableListingWriter implements TableListingWriter {
+        private final ProcessSession session;
+
+        public AttributeTableListingWriter(final ProcessSession session) {
+            this.session = session;
+        }
+
+        @Override
+        public void beginListing() {
+        }
+
+        @Override
+        public void addToListing(final Map<String, String> tableInformation, final String transitUri) {
+            FlowFile flowFile = session.create();
+            flowFile = session.putAllAttributes(flowFile, tableInformation);
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+            session.transfer(flowFile, REL_SUCCESS);
+        }
+
+        @Override
+        public void finishListing() {
+        }
+
+        @Override
+        public void finishListingExceptionally(final Exception cause) {
+        }
+    }
+
+    static class RecordTableListingWriter implements TableListingWriter {
+        private static final RecordSchema RECORD_SCHEMA;
+        public static final String TABLE_NAME = "tableName";
+        public static final String TABLE_CATALOG = "catalog";
+        public static final String TABLE_SCHEMA = "schemaName";
+        public static final String TABLE_FULLNAME = "fullName";
+        public static final String TABLE_TYPE = "tableType";
+        public static final String TABLE_REMARKS = "remarks";
+        public static final String TABLE_ROW_COUNT = "rowCount";
+
+
+        static {
+            final List<RecordField> fields = new ArrayList<>();
+            fields.add(new RecordField(TABLE_NAME, RecordFieldType.STRING.getDataType(), false));
+            fields.add(new RecordField(TABLE_CATALOG, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(TABLE_SCHEMA, RecordFieldType.STRING.getDataType()));
+            fields.add(new RecordField(TABLE_FULLNAME, RecordFieldType.STRING.getDataType(), false));
+            fields.add(new RecordField(TABLE_TYPE, RecordFieldType.STRING.getDataType(), false));
+            fields.add(new RecordField(TABLE_REMARKS, RecordFieldType.STRING.getDataType(), false));
+            fields.add(new RecordField(TABLE_ROW_COUNT, RecordFieldType.LONG.getDataType(), false));
+            RECORD_SCHEMA = new SimpleRecordSchema(fields);
+        }
+
+
+        private final ProcessSession session;
+        private final RecordSetWriterFactory writerFactory;
+        private final ComponentLog logger;
+        private RecordSetWriter recordWriter;
+        private FlowFile flowFile;
+        private String transitUri;
+
+        public RecordTableListingWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory, final ComponentLog logger) {
+            this.session = session;
+            this.writerFactory = writerFactory;
+            this.logger = logger;
+        }
+
+        @Override
+        public void beginListing() throws IOException, SchemaNotFoundException {
+            flowFile = session.create();
+
+            final OutputStream out = session.write(flowFile);
+            recordWriter = writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile);
+            recordWriter.beginRecordSet();
+        }
+
+        @Override
+        public void addToListing(final Map<String, String> tableInfo, final String transitUri) throws IOException {
+            this.transitUri = transitUri;
+            recordWriter.write(createRecordForListing(tableInfo));
+        }
+
+        @Override
+        public void finishListing() throws IOException {
+            final WriteResult writeResult = recordWriter.finishRecordSet();
+            recordWriter.close();
+
+            if (writeResult.getRecordCount() == 0) {
+                session.remove(flowFile);
+            } else {
+                final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                flowFile = session.putAllAttributes(flowFile, attributes);
+
+                session.transfer(flowFile, REL_SUCCESS);
+                session.getProvenanceReporter().receive(flowFile, transitUri);
+            }
+        }
+
+        @Override
+        public void finishListingExceptionally(final Exception cause) {
+            try {
+                recordWriter.close();
+            } catch (IOException e) {
+                logger.error("Failed to write listing as Records due to {}", new Object[] {e}, e);
+            }
+
+            session.remove(flowFile);
+        }
+
+        private Record createRecordForListing(final Map<String, String> tableInfo) {
+            final Map<String, Object> values = new HashMap<>();
+            values.put(TABLE_NAME, tableInfo.get(DB_TABLE_NAME));
+            values.put(TABLE_FULLNAME, tableInfo.get(DB_TABLE_FULLNAME));
+            values.put(TABLE_CATALOG, tableInfo.get(DB_TABLE_CATALOG));
+            values.put(TABLE_REMARKS, tableInfo.get(DB_TABLE_REMARKS));
+            values.put(TABLE_SCHEMA, tableInfo.get(DB_TABLE_SCHEMA));
+            values.put(TABLE_TYPE, tableInfo.get(DB_TABLE_TYPE));
+
+            final String rowCountString = tableInfo.get(DB_TABLE_COUNT);
+            if (rowCountString != null) {
+                values.put(TABLE_ROW_COUNT, Long.parseLong(rowCountString));
+            }
+
+            return new MapRecord(RECORD_SCHEMA, values);
+        }
+    }
+
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
index 73a7f7c..f8282d4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
@@ -17,13 +17,10 @@
 
 package org.apache.nifi.processors.standard;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
 import org.apache.nifi.annotation.behavior.Stateful;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -37,8 +34,12 @@ import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.list.ListedEntityTracker;
-import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.FTPTransfer;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 @PrimaryNodeOnly
 @TriggerSerially
@@ -77,6 +78,7 @@ public class ListFTP extends ListFileTransfer {
         properties.add(USERNAME);
         properties.add(FTPTransfer.PASSWORD);
         properties.add(REMOTE_PATH);
+        properties.add(RECORD_WRITER);
         properties.add(DISTRIBUTED_CACHE_SERVICE);
         properties.add(FTPTransfer.RECURSIVE_SEARCH);
         properties.add(FTPTransfer.FOLLOW_SYMLINK);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index 1b6639f..8d06dff 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -44,6 +44,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.list.AbstractListProcessor;
 import org.apache.nifi.processor.util.list.ListedEntityTracker;
 import org.apache.nifi.processors.standard.util.FileInfo;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.Tuple;
 
 import java.io.File;
@@ -291,6 +292,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
         properties.add(DIRECTORY);
         properties.add(LISTING_STRATEGY);
         properties.add(RECURSE);
+        properties.add(RECORD_WRITER);
         properties.add(DIRECTORY_LOCATION);
         properties.add(FILE_FILTER);
         properties.add(PATH_FILTER);
@@ -495,6 +497,11 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
     }
 
     @Override
+    protected RecordSchema getRecordSchema() {
+        return FileInfo.getRecordSchema();
+    }
+
+    @Override
     protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
         final Path basePath = new File(getPath(context)).toPath();
         final Boolean recurse = context.getProperty(RECURSE).asBoolean();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
index 554fd4d..3ee554c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -30,6 +30,8 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.list.AbstractListProcessor;
 import org.apache.nifi.processors.standard.util.FileInfo;
 import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.serialization.record.RecordSchema;
+
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
@@ -124,6 +126,11 @@ public abstract class ListFileTransfer extends AbstractListProcessor<FileInfo> {
     }
 
     @Override
+    protected RecordSchema getRecordSchema() {
+        return FileInfo.getRecordSchema();
+    }
+
+    @Override
     protected boolean isListingResetNecessary(final PropertyDescriptor property) {
         return HOSTNAME.equals(property) || REMOTE_PATH.equals(property);
     }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index ccdf1d8..67c0d05 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -17,19 +17,10 @@
 
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
 import org.apache.nifi.annotation.behavior.Stateful;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -45,11 +36,19 @@ import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.list.ListedEntityTracker;
-import org.apache.nifi.processors.standard.util.FileInfo;
 import org.apache.nifi.processors.standard.util.FTPTransfer;
+import org.apache.nifi.processors.standard.util.FileInfo;
 import org.apache.nifi.processors.standard.util.FileTransfer;
 import org.apache.nifi.processors.standard.util.SFTPTransfer;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
 @PrimaryNodeOnly
 @TriggerSerially
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -76,7 +75,7 @@ import org.apache.nifi.processors.standard.util.SFTPTransfer;
     + "a new Primary Node is selected, the new node will not duplicate the data that was listed by the previous Primary Node.")
 public class ListSFTP extends ListFileTransfer {
 
-    private final AtomicReference<Predicate<FileInfo>> fileFilterRef = new AtomicReference();
+    private volatile Predicate<FileInfo> fileFilter;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -91,6 +90,7 @@ public class ListSFTP extends ListFileTransfer {
         properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
         properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
         properties.add(REMOTE_PATH);
+        properties.add(RECORD_WRITER);
         properties.add(DISTRIBUTED_CACHE_SERVICE);
         properties.add(SFTPTransfer.RECURSIVE_SEARCH);
         properties.add(SFTPTransfer.FOLLOW_SYMLINK);
@@ -146,13 +146,13 @@ public class ListSFTP extends ListFileTransfer {
         final List<FileInfo> listing = super.performListing(context, minTimestamp);
 
         return listing.stream()
-                .filter(fileFilterRef.get())
+                .filter(fileFilter)
                 .collect(Collectors.toList());
     }
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
-        fileFilterRef.set(createFileFilter(context));
+        fileFilter = createFileFilter(context);
     }
 
     private Predicate<FileInfo> createFileFilter(final ProcessContext context) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
index ca6648b..9b33de1 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
@@ -16,14 +16,47 @@
  */
 package org.apache.nifi.processors.standard.util;
 
+import java.io.File;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.nifi.processor.util.list.ListableEntity;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 
 public class FileInfo implements Comparable<FileInfo>, Serializable, ListableEntity {
-
     private static final long serialVersionUID = 1L;
 
+    private static final RecordSchema SCHEMA;
+    private static final String FILENAME = "filename";
+    private static final String PATH = "path";
+    private static final String DIRECTORY = "directory";
+    private static final String SIZE = "size";
+    private static final String LAST_MODIFIED = "lastModified";
+    private static final String PERMISSIONS = "permissions";
+    private static final String OWNER = "owner";
+    private static final String GROUP = "group";
+
+    static {
+        final List<RecordField> recordFields = new ArrayList<>();
+        recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(PATH, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(DIRECTORY, RecordFieldType.BOOLEAN.getDataType(), false));
+        recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
+        recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
+        recordFields.add(new RecordField(PERMISSIONS, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(GROUP, RecordFieldType.STRING.getDataType()));
+        SCHEMA = new SimpleRecordSchema(recordFields);
+    }
+
     private final boolean directory;
     private final long size;
     private final long lastModifiedTime;
@@ -65,6 +98,23 @@ public class FileInfo implements Comparable<FileInfo>, Serializable, ListableEnt
         return group;
     }
 
+    public Record toRecord() {
+        final Map<String, Object> values = new HashMap<>(8);
+        values.put(FILENAME, getFileName());
+        values.put(PATH, new File(getFullPathFileName()).getParent());
+        values.put(DIRECTORY, isDirectory());
+        values.put(SIZE, getSize());
+        values.put(LAST_MODIFIED, getLastModifiedTime());
+        values.put(PERMISSIONS, getPermissions());
+        values.put(OWNER, getOwner());
+        values.put(GROUP, getGroup());
+        return new MapRecord(SCHEMA, values);
+    }
+
+    public static RecordSchema getRecordSchema() {
+        return SCHEMA;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFTP/ListFTP-batch-high-level-flow.png b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFTP/ListFTP-batch-high-level-flow.png
new file mode 100644
index 0000000..0a85909
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFTP/ListFTP-batch-high-level-flow.png differ
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFTP/ListFTP-batch-processing.png b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFTP/ListFTP-batch-processing.png
new file mode 100644
index 0000000..840e988
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFTP/ListFTP-batch-processing.png differ
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFTP/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFTP/additionalDetails.html
new file mode 100644
index 0000000..462d191
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFTP/additionalDetails.html
@@ -0,0 +1,150 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ListFTP</title>
+
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+
+<p>
+    ListFTP performs a listing of all files that it encounters in the configured directory of an FTP server.
+    There are two common, broadly defined use cases.
+</p>
+
+<h3>Streaming Use Case</h3>
+
+<p>
+    By default, the Processor will create a separate FlowFile for each file in the directory and add attributes for filename, path, etc.
+    A common use case is to connect ListFTP to the FetchFTP processor. These two processors used in conjunction with one another provide the ability to
+    easily monitor a directory and fetch the contents of any new file as it lands on the FTP server in an efficient streaming fashion.
+</p>
+
+<h3>Batch Use Case</h3>
+<p>
+    Another common use case is the desire to process all newly arriving files in a given directory, and to then perform some action
+    only when all files have completed their processing. The above approach of streaming the data makes this difficult, because NiFi is inherently
+    a streaming platform in that there is no "job" that has a beginning and an end. Data is simply picked up as it becomes available.
+</p>
+
+<p>
+    To solve this, the ListFTP Processor can optionally be configured with a Record Writer. When a Record Writer is configured, a single
+    FlowFile will be created that will contain a Record for each file in the directory, instead of a separate FlowFile per file.
+    With this pattern, in order to fetch the contents of each file, the records must be split up into individual FlowFiles and then
+    fetched. So how does this help us?
+</p>
+
+<p>
+    We can still accomplish the desired use case of waiting until all files in the directory have been processed by splitting apart the FlowFile
+    and processing all of the data within a Process Group. Configuring the Process Group with a FlowFile Concurrency of "Single FlowFile per Node"
+    means that only one FlowFile will be brought into the Process Group. Once that happens, the FlowFile can be split apart and each part processed.
+    Configuring the Process Group with an Outbound Policy of "Batch Output" means that none of the FlowFiles will leave the Process Group until all have
+    finished processing. As a result, we can build a flow like the following:
+</p>
+
+<img src="ListFTP-batch-high-level-flow.png" style="width: 50%; height: 50%" />
+
+<p>
+    In this flow, we perform a listing of a directory with ListFTP. The processor is configured with a Record Writer (in this case a CSV Writer, but any Record Writer can be used)
+    so that only a single FlowFile is generated for the entire listing. That listing is then sent to the "Process Listing" Process Group (shown below). Only after the contents of the entire directory
+    have been processed will data leave the "Process Listing" Process Group. At that point, when all data in the Process Group is ready to leave, each of the processed
+    files will be sent to the "Post-Processing" Process Group. At the same time, the original listing is to be sent to the "Processing Complete Notification" Process Group.
+    In order to accomplish this, the Process Group must be configured with a FlowFile Concurrency of "Single FlowFile per Node" and an Outbound Policy of "Batch Output."
+</p>
+
+<p>
+    The "Process Listing" Process Group that is described above looks like this:
+</p>
+
+<img src="ListFTP-batch-processing.png" style="width: 50%; height: 50%" />
+
+<p>
+    A listing is received via the "Listing" Input Port. This is then sent directly to the "Listing of Processed Data" Output Port so that when all processing completes,
+    the original listing will be sent out also.
+</p>
+
+<p>
+    Next, the listing is broken apart into an individual FlowFile per record. Because we want to use FetchFTP to fetch the data, we need to get the file's filename
+    and path as FlowFile attributes. This can be done in a few different ways, but the easiest mechanism is to use the PartitionRecord processor.
+    This Processor is configured with a Record Reader that is able to read the data written by ListFTP (in this case, a CSV Reader).
+    The Processor is also configured with two additional user-defined properties:
+</p>
+
+    <ul>
+        <li><code>path: /path</code></li>
+        <li><code>filename: /filename</code></li>
+    </ul>
+
+<p>
+    As a result, each record that comes into the PartitionRecord processor will be split into an individual FlowFile (because the combination of the "path" and "filename" fields will be unique
+    for each Record) and the "filename" and "path" record fields will become attributes on the FlowFile. FetchFTP is configured to use a value of <code>${path}/${filename}</code>
+    for the "Remote File" property, making use of these attributes.
+</p>
+
+<p>
+    Finally, we process the data - in this example, simply by compressing it with GZIP compression - and send the output to the "Processed Data" Output Port. The data will queue up here
+    until all data is ready to leave the Process Group and then will be released.
+</p>
+
+
+<h3>Record Schema</h3>
+
+<p>
+    When the Processor is configured to write the listing using a Record Writer, the Records will be written using the following schema (in Avro format):
+</p>
+<pre>
+    <code>
+{
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [{
+    "name": "filename",
+    "type": "string"
+  }, {
+    "name": "path",
+    "type": "string"
+  }, {
+    "name": "directory",
+    "type": "boolean"
+  }, {
+    "name": "size",
+    "type": "long"
+  }, {
+    "name": "lastModified",
+    "type": {
+      "type": "long",
+      "logicalType": "timestamp-millis"
+    }
+  }, {
+    "name": "permissions",
+    "type": ["null", "string"]
+  }, {
+    "name": "owner",
+    "type": ["null", "string"]
+  }, {
+    "name": "group",
+    "type": ["null", "string"]
+  }]
+}
+    </code>
+</pre>
+
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFile/ListFile-batch-high-level-flow.png b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFile/ListFile-batch-high-level-flow.png
new file mode 100644
index 0000000..1ece89a
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFile/ListFile-batch-high-level-flow.png differ
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFile/ListFile-batch-processing.png b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFile/ListFile-batch-processing.png
new file mode 100644
index 0000000..295a725
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFile/ListFile-batch-processing.png differ
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFile/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFile/additionalDetails.html
new file mode 100644
index 0000000..d4a1f69
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListFile/additionalDetails.html
@@ -0,0 +1,150 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ListFile</title>
+
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+
+<p>
+    ListFile performs a listing of all files that it encounters in the configured directory.
+    There are two common, broadly defined use cases.
+</p>
+
+<h3>Streaming Use Case</h3>
+
+<p>
+    By default, the Processor will create a separate FlowFile for each file in the directory and add attributes for filename, path, etc.
+    A common use case is to connect ListFile to the FetchFile processor. These two processors used in conjunction with one another provide the ability to
+    easily monitor a directory and fetch the contents of any new file as it lands in an efficient streaming fashion.
+</p>
+
+<h3>Batch Use Case</h3>
+<p>
+    Another common use case is the desire to process all newly arriving files in a given directory, and to then perform some action
+    only when all files have completed their processing. The above approach of streaming the data makes this difficult, because NiFi is inherently
+    a streaming platform in that there is no "job" that has a beginning and an end. Data is simply picked up as it becomes available.
+</p>
+
+<p>
+    To solve this, the ListFile Processor can optionally be configured with a Record Writer. When a Record Writer is configured, a single
+    FlowFile will be created that will contain a Record for each file in the directory, instead of a separate FlowFile per file.
+    With this pattern, in order to fetch the contents of each file, the records must be split up into individual FlowFiles and then
+    fetched. So how does this help us?
+</p>
+
+<p>
+    We can still accomplish the desired use case of waiting until all files in the directory have been processed by splitting apart the FlowFile
+    and processing all of the data within a Process Group. Configuring the Process Group with a FlowFile Concurrency of "Single FlowFile per Node"
+    means that only one FlowFile will be brought into the Process Group. Once that happens, the FlowFile can be split apart and each part processed.
+    Configuring the Process Group with an Outbound Policy of "Batch Output" means that none of the FlowFiles will leave the Process Group until all have
+    finished processing. As a result, we can build a flow like the following:
+</p>
+
+<img src="ListFile-batch-high-level-flow.png" style="width: 50%; height: 50%" />
+
+<p>
+    In this flow, we perform a listing of a directory with ListFile. The processor is configured with a Record Writer (in this case a CSV Writer, but any Record Writer can be used)
+    so that only a single FlowFile is generated for the entire listing. That listing is then sent to the "Process Listing" Process Group (shown below). Only after the contents of the entire directory
+    have been processed will data leave the "Process Listing" Process Group. At that point, when all data in the Process Group is ready to leave, each of the processed
+    files will be sent to the "Post-Processing" Process Group. At the same time, the original listing is to be sent to the "Processing Complete Notification" Process Group.
+    In order to accomplish this, the Process Group must be configured with a FlowFile Concurrency of "Single FlowFile per Node" and an Outbound Policy of "Batch Output."
+</p>
+
+<p>
+    The "Process Listing" Process Group that is described above looks like this:
+</p>
+
+<img src="ListFile-batch-processing.png" style="width: 50%; height: 50%" />
+
+<p>
+    A listing is received via the "Listing" Input Port. This is then sent directly to the "Listing of Processed Data" Output Port so that when all processing completes,
+    the original listing will be sent out also.
+</p>
+
+<p>
+    Next, the listing is broken apart into an individual FlowFile per record. Because we want to use FetchFile to fetch the data, we need to get the file's filename
+    and path as FlowFile attributes. This can be done in a few different ways, but the easiest mechanism is to use the PartitionRecord processor.
+    This Processor is configured with a Record Reader that is able to read the data written by ListFile (in this case, a CSV Reader).
+    The Processor is also configured with two additional user-defined properties:
+</p>
+
+    <ul>
+        <li><code>absolute.path: /path</code></li>
+        <li><code>filename: /filename</code></li>
+    </ul>
+
+<p>
+    As a result, each record that comes into the PartitionRecord processor will be split into an individual FlowFile (because the combination of the "path" and "filename" fields will be unique
+    for each Record) and the "filename" and "path" record fields will become attributes on the FlowFile (using attribute names of "absolute.path" and "filename"). FetchFile uses default
+    configuration, which references these attributes.
+</p>
+
+<p>
+    Finally, we process the data - in this example, simply by compressing it with GZIP compression - and send the output to the "Processed Data" Output Port. The data will queue up here
+    until all data is ready to leave the Process Group and then will be released.
+</p>
+
+
+<h3>Record Schema</h3>
+
+<p>
+    When the Processor is configured to write the listing using a Record Writer, the Records will be written using the following schema (in Avro format):
+</p>
+<pre>
+    <code>
+{
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [{
+    "name": "filename",
+    "type": "string"
+  }, {
+    "name": "path",
+    "type": "string"
+  }, {
+    "name": "directory",
+    "type": "boolean"
+  }, {
+    "name": "size",
+    "type": "long"
+  }, {
+    "name": "lastModified",
+    "type": {
+      "type": "long",
+      "logicalType": "timestamp-millis"
+    }
+  }, {
+    "name": "permissions",
+    "type": ["null", "string"]
+  }, {
+    "name": "owner",
+    "type": ["null", "string"]
+  }, {
+    "name": "group",
+    "type": ["null", "string"]
+  }]
+}
+    </code>
+</pre>
+
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListSFTP/ListSFTP-batch-high-level-flow.png b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListSFTP/ListSFTP-batch-high-level-flow.png
new file mode 100644
index 0000000..e9c8e6e
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListSFTP/ListSFTP-batch-high-level-flow.png differ
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListSFTP/ListSFTP-batch-processing.png b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListSFTP/ListSFTP-batch-processing.png
new file mode 100644
index 0000000..b5f5ce5
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListSFTP/ListSFTP-batch-processing.png differ
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListSFTP/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListSFTP/additionalDetails.html
new file mode 100644
index 0000000..624099e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ListSFTP/additionalDetails.html
@@ -0,0 +1,150 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ListSFTP</title>
+
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+
+<p>
+    ListSFTP performs a listing of all files that it encounters in the configured directory of an SFTP server.
+    There are two common, broadly defined use cases.
+</p>
+
+<h3>Streaming Use Case</h3>
+
+<p>
+    By default, the Processor will create a separate FlowFile for each file in the directory and add attributes for filename, path, etc.
+    A common use case is to connect ListSFTP to the FetchSFTP processor. These two processors used in conjunction with one another provide the ability to
+    easily monitor a directory and fetch the contents of any new file as it lands on the SFTP server in an efficient streaming fashion.
+</p>
+
+<h3>Batch Use Case</h3>
+<p>
+    Another common use case is the desire to process all newly arriving files in a given directory, and to then perform some action
+    only when all files have completed their processing. The above approach of streaming the data makes this difficult, because NiFi is inherently
+    a streaming platform in that there is no "job" that has a beginning and an end. Data is simply picked up as it becomes available.
+</p>
+
+<p>
+    To solve this, the ListSFTP Processor can optionally be configured with a Record Writer. When a Record Writer is configured, a single
+    FlowFile will be created that will contain a Record for each file in the directory, instead of a separate FlowFile per file.
+    With this pattern, in order to fetch the contents of each file, the records must be split up into individual FlowFiles and then
+    fetched. So how does this help us?
+</p>
+
+<p>
+    We can still accomplish the desired use case of waiting until all files in the directory have been processed by splitting apart the FlowFile
+    and processing all of the data within a Process Group. Configuring the Process Group with a FlowFile Concurrency of "Single FlowFile per Node"
+    means that only one FlowFile will be brought into the Process Group. Once that happens, the FlowFile can be split apart and each part processed.
+    Configuring the Process Group with an Outbound Policy of "Batch Output" means that none of the FlowFiles will leave the Process Group until all have
+    finished processing. As a result, we can build a flow like the following:
+</p>
+
+<img src="ListSFTP-batch-high-level-flow.png" style="width: 50%; height: 50%" />
+
+<p>
+    In this flow, we perform a listing of a directory with ListSFTP. The processor is configured with a Record Writer (in this case a CSV Writer, but any Record Writer can be used)
+    so that only a single FlowFile is generated for the entire listing. That listing is then sent to the "Process Listing" Process Group (shown below). Only after the contents of the entire directory
+    have been processed will data leave the "Process Listing" Process Group. At that point, when all data in the Process Group is ready to leave, each of the processed
+    files will be sent to the "Post-Processing" Process Group. At the same time, the original listing is to be sent to the "Processing Complete Notification" Process Group.
+    In order to accomplish this, the Process Group must be configured with a FlowFile Concurrency of "Single FlowFile per Node" and an Outbound Policy of "Batch Output."
+</p>
+
+<p>
+    The "Process Listing" Process Group that is described above looks like this:
+</p>
+
+<img src="ListSFTP-batch-processing.png" style="width: 50%; height: 50%" />
+
+<p>
+    A listing is received via the "Listing" Input Port. This is then sent directly to the "Listing of Processed Data" Output Port so that when all processing completes,
+    the original listing will be sent out also.
+</p>
+
+<p>
+    Next, the listing is broken apart into an individual FlowFile per record. Because we want to use FetchSFTP to fetch the data, we need to get the file's filename
+    and path as FlowFile attributes. This can be done in a few different ways, but the easiest mechanism is to use the PartitionRecord processor.
+    This Processor is configured with a Record Reader that is able to read the data written by ListSFTP (in this case, a CSV Reader).
+    The Processor is also configured with two additional user-defined properties:
+</p>
+
+    <ul>
+        <li><code>path: /path</code></li>
+        <li><code>filename: /filename</code></li>
+    </ul>
+
+<p>
+    As a result, each record that comes into the PartitionRecord processor will be split into an individual FlowFile (because the combination of the "path" and "filename" fields will be unique
+    for each Record) and the "filename" and "path" record fields will become attributes on the FlowFile. FetchSFTP is configured to use a value of <code>${path}/${filename}</code>
+    for the "Remote File" property, making use of these attributes.
+</p>
+
+<p>
+    Finally, we process the data - in this example, simply by compressing it with GZIP compression - and send the output to the "Processed Data" Output Port. The data will queue up here
+    until all data is ready to leave the Process Group and then will be released.
+</p>
+
+
+<h3>Record Schema</h3>
+
+<p>
+    When the Processor is configured to write the listing using a Record Writer, the Records will be written using the following schema (in Avro format):
+</p>
+<pre>
+    <code>
+{
+  "type": "record",
+  "name": "nifiRecord",
+  "namespace": "org.apache.nifi",
+  "fields": [{
+    "name": "filename",
+    "type": "string"
+  }, {
+    "name": "path",
+    "type": "string"
+  }, {
+    "name": "directory",
+    "type": "boolean"
+  }, {
+    "name": "size",
+    "type": "long"
+  }, {
+    "name": "lastModified",
+    "type": {
+      "type": "long",
+      "logicalType": "timestamp-millis"
+    }
+  }, {
+    "name": "permissions",
+    "type": ["null", "string"]
+  }, {
+    "name": "owner",
+    "type": ["null", "string"]
+  }, {
+    "name": "group",
+    "type": ["null", "string"]
+  }]
+}
+    </code>
+</pre>
+
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java
index 321c6ad..21bbf3e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -144,7 +145,40 @@ public class TestListDatabaseTables {
         List<MockFlowFile> results = runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS);
         assertEquals("2", results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT));
         assertEquals("0", results.get(1).getAttribute(ListDatabaseTables.DB_TABLE_COUNT));
+    }
+
+    @Test
+    public void testListTablesWithCountAsRecord() throws Exception {
+        runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true");
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_TABLE1");
+            stmt.execute("drop table TEST_TABLE2");
+        } catch (final SQLException sqle) {
+            // Do nothing, may not have existed
+        }
+
+        stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))");
+        stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0, NULL, 1)");
+        stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1, 1)");
+        stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))");
+
+        final MockRecordWriter recordWriter = new MockRecordWriter(null, false);
+        runner.addControllerService("record-writer", recordWriter);
+        runner.setProperty(ListDatabaseTables.RECORD_WRITER, "record-writer");
+        runner.enableControllerService(recordWriter);
+
+        runner.run();
+        runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 1);
 
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS).get(0);
+        flowFile.assertContentEquals(
+            "TEST_TABLE1,,APP,APP.TEST_TABLE1,TABLE,,2\n" +
+                "TEST_TABLE2,,APP,APP.TEST_TABLE2,TABLE,,0\n");
     }
 
     @Test