You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "tpalfy (via GitHub)" <gi...@apache.org> on 2023/06/09 14:27:03 UTC

[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1224345469


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterator.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FileStatusIterator implements Iterator<FileStatus> {
+
+    private static final String IO_ERROR_MESSAGE = "IO error occured while iterating HFDS";
+
+    private final boolean recursive;
+    private final FileSystem hdfs;
+    private final UserGroupInformation userGroupInformation;
+    private final Deque<FileStatus> fileStatuses;
+    private final Deque<FileStatus> dirStatuses;
+    private final AtomicLong totalFileCount;
+
+    public FileStatusIterator(final Path path, final boolean recursive, final FileSystem hdfs, final UserGroupInformation userGroupInformation,

Review Comment:
   I think the algorithm used here is quite smart and looks good.
   It's a fairly crucial part as well so I think we should have dedicated unit tests for it.



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -319,131 +240,36 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
             this.resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestModifiedStatuses = new ArrayList<>();

Review Comment:
   We need to reset the `latestModificationTime` as well, otherwise it prevents us from starting the listing from scratch even if the state has been cleared.
   ```suggestion
               latestModificationTime = 0L;
               latestModifiedStatuses = new ArrayList<>();
   ```



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -452,243 +278,83 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         // Pull in any file that is newer than the timestamp that we have.
         final FileSystem hdfs = getFileSystem();
         final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
-
-        final Set<FileStatus> statuses;
-        try {
-            final Path rootPath = getNormalizedPath(context, DIRECTORY);
-            statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
-            getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
-        } catch (final IOException | IllegalArgumentException e) {
-            getLogger().error("Failed to perform listing of HDFS", e);
-            return;
-        } catch (final InterruptedException e) {
-            Thread.currentThread().interrupt();
-            getLogger().error("Interrupted while performing listing of HDFS", e);
-            return;
-        }
-
-        final Set<FileStatus> listable = determineListable(statuses, context);
-        getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()});
-
-        // Create FlowFile(s) for the listing, if there are any
-        if (!listable.isEmpty()) {
-            if (context.getProperty(RECORD_WRITER).isSet()) {
-                try {
-                    createRecords(listable, context, session);
-                } catch (final IOException | SchemaNotFoundException e) {
-                    getLogger().error("Failed to write listing of HDFS", e);
-                    return;
-                }
-            } else {
-                createFlowFiles(listable, session);
-            }
-        }
-
-        for (final FileStatus status : listable) {
-            final long fileModTime = status.getModificationTime();
-            if (fileModTime > latestTimestampEmitted) {
-                latestTimestampEmitted = fileModTime;
-            }
-        }
-
-        final Map<String, String> updatedState = new HashMap<>(1);
-        updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(latestTimestampListed));
-        updatedState.put(EMITTED_TIMESTAMP_KEY, String.valueOf(latestTimestampEmitted));
-        getLogger().debug("New state map: {}", new Object[] {updatedState});
-
-        try {
-            session.setState(updatedState, Scope.CLUSTER);
-        } catch (final IOException ioe) {
-            getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
-        }
-
-        final int listCount = listable.size();
-        if ( listCount > 0 ) {
-            getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
-            session.commitAsync();
-        } else {
-            getLogger().debug("There is no data to list. Yielding.");
-            context.yield();
-        }
-    }
-
-    private void createFlowFiles(final Set<FileStatus> fileStatuses, final ProcessSession session) {
-        for (final FileStatus status : fileStatuses) {
-            final Map<String, String> attributes = createAttributes(status);
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            session.transfer(flowFile, getSuccessRelationship());
-        }
-    }
-
-    private void createRecords(final Set<FileStatus> fileStatuses, final ProcessContext context, final ProcessSession session) throws IOException, SchemaNotFoundException {
+        final PathFilter pathFilter = createPathFilter(context);
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        FlowFile flowFile = session.create();
-        final WriteResult writeResult;
-        try (final OutputStream out = session.write(flowFile);
-             final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
+        final FileStatusManager fileStatusManager = new FileStatusManager();
+        final Path rootPath = getNormalizedPath(context, DIRECTORY);
+        final FileStatusIterable fileStatuses = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
 
-            recordSetWriter.beginRecordSet();
-            for (final FileStatus fileStatus : fileStatuses) {
-                final Record record = createRecord(fileStatus);
-                recordSetWriter.write(record);
-            }
-
-            writeResult = recordSetWriter.finishRecordSet();
-        }
+        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
+        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
 
-        final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
-        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
-        flowFile = session.putAllAttributes(flowFile, attributes);
+        final HdfsObjectWriter writer = getHdfsObjectWriter(session, writerFactory, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager);
 
-        session.transfer(flowFile, getSuccessRelationship());
-    }
+        writer.write();
 
-    private Record createRecord(final FileStatus fileStatus) {
-        final Map<String, Object> values = new HashMap<>();
-        values.put(FILENAME, fileStatus.getPath().getName());
-        values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
-        values.put(OWNER, fileStatus.getOwner());
-        values.put(GROUP, fileStatus.getGroup());
-        values.put(LAST_MODIFIED, new Timestamp(fileStatus.getModificationTime()));
-        values.put(SIZE, fileStatus.getLen());
-        values.put(REPLICATION, fileStatus.getReplication());
-
-        final FsPermission permission = fileStatus.getPermission();
-        final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
-        values.put(PERMISSIONS, perms);
-
-        values.put(IS_DIRECTORY, fileStatus.isDirectory());
-        values.put(IS_SYM_LINK, fileStatus.isSymlink());
-        values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
-        values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
-
-        return new MapRecord(getRecordSchema(), values);
-    }
+        getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatuses.getTotalFileCount(), writer.getListedFileCount());
 
-    private RecordSchema getRecordSchema() {
-        return RECORD_SCHEMA;
-    }
 
-    private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) throws IOException, InterruptedException {
-        final Set<FileStatus> statusSet = new HashSet<>();
+        if (writer.getListedFileCount() > 0) {
+            final Map<String, String> updatedState = new HashMap<>(1);

Review Comment:
   ```suggestion
               final Map<String, String> updatedState = new HashMap<>();
   ```



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FileStatusIterable implements Iterable<FileStatus> {
+
+    private final Path path;
+    private final boolean recursive;
+    private final FileSystem hdfs;
+    private final UserGroupInformation userGroupInformation;
+    private final AtomicLong totalFileCount = new AtomicLong();
+
+    public FileStatusIterable(final Path path, final boolean recursive, final FileSystem hdfs, final UserGroupInformation userGroupInformation) {

Review Comment:
   Minor: Just wondering if we could get rid of either the `FileStatusIterable` or the `FileStatusIterator`. Specifically the `Iterable` is really only used so that we can have a `foreach` instead of a `for` calling `hasNext` and `next` explicitly.
   This is more declarative though so just a thought.
   



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterator.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FileStatusIterator implements Iterator<FileStatus> {
+
+    private static final String IO_ERROR_MESSAGE = "IO error occured while iterating HFDS";

Review Comment:
   ```suggestion
       private static final String IO_ERROR_MESSAGE = "IO error occurred while iterating HFDS";
   
   ```



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordObjectWriter.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.OutputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.hadoop.ListHDFS.REL_SUCCESS;
+
+public class RecordObjectWriter implements HdfsObjectWriter {
+
+    private static final RecordSchema RECORD_SCHEMA;
+
+    private static final String FILENAME = "filename";
+    private static final String PATH = "path";
+    private static final String IS_DIRECTORY = "directory";
+    private static final String SIZE = "size";
+    private static final String LAST_MODIFIED = "lastModified";
+    private static final String PERMISSIONS = "permissions";
+    private static final String OWNER = "owner";
+    private static final String GROUP = "group";
+    private static final String REPLICATION = "replication";
+    private static final String IS_SYM_LINK = "symLink";
+    private static final String IS_ENCRYPTED = "encrypted";
+    private static final String IS_ERASURE_CODED = "erasureCoded";
+
+    static {
+        final List<RecordField> recordFields = new ArrayList<>();
+        recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(PATH, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(IS_DIRECTORY, RecordFieldType.BOOLEAN.getDataType(), false));
+        recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
+        recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
+        recordFields.add(new RecordField(PERMISSIONS, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(GROUP, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(REPLICATION, RecordFieldType.INT.getDataType()));
+        recordFields.add(new RecordField(IS_SYM_LINK, RecordFieldType.BOOLEAN.getDataType()));
+        recordFields.add(new RecordField(IS_ENCRYPTED, RecordFieldType.BOOLEAN.getDataType()));
+        recordFields.add(new RecordField(IS_ERASURE_CODED, RecordFieldType.BOOLEAN.getDataType()));
+        RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
+    }
+
+
+    private final ProcessSession session;
+    private final RecordSetWriterFactory writerFactory;
+    private final ComponentLog logger;
+    private final FileStatusIterable fileStatuses;
+    final long minimumAge;
+    final long maximumAge;
+    final PathFilter pathFilter;
+    final FileStatusManager fileStatusManager;
+    final long latestModificationTime;
+    final List<String> latestModifiedStatuses;
+    long fileCount;
+
+
+    public RecordObjectWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory, final ComponentLog logger,
+                              final FileStatusIterable fileStatuses, final long minimumAge, final long maximumAge, final PathFilter pathFilter,
+                              final FileStatusManager fileStatusManager, final long latestModificationTime, final List<String> latestModifiedStatuses) {
+        this.session = session;
+        this.writerFactory = writerFactory;
+        this.logger = logger;
+        this.fileStatuses = fileStatuses;
+        this.minimumAge = minimumAge;
+        this.maximumAge = maximumAge;
+        this.pathFilter = pathFilter;
+        this.fileStatusManager = fileStatusManager;
+        this.latestModificationTime = latestModificationTime;
+        this.latestModifiedStatuses = latestModifiedStatuses;
+        fileCount = 0;
+    }
+
+    @Override
+    public void write() {
+        FlowFile flowFile = session.create();
+
+        final OutputStream out = session.write(flowFile);
+        try (RecordSetWriter recordWriter = writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile)) {
+            recordWriter.beginRecordSet();
+
+            for (FileStatus status : fileStatuses) {
+                if (determineListable(status, minimumAge, maximumAge, pathFilter, latestModificationTime, latestModifiedStatuses)) {
+                    recordWriter.write(createRecordForListing(status));
+                    fileStatusManager.update(status);
+                }
+            }
+
+            WriteResult writeResult = recordWriter.finishRecordSet();
+            fileCount = writeResult.getRecordCount();
+
+            if (fileCount == 0) {
+                session.remove(flowFile);
+            } else {
+                final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                flowFile = session.putAllAttributes(flowFile, attributes);
+
+                session.transfer(flowFile, REL_SUCCESS);
+            }
+        } catch (Exception e) {
+            throw new ProcessException("An error occured while writing results", e);

Review Comment:
   ```suggestion
               throw new ProcessException("An error occurred while writing results", e);
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org