You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Lehel44 (via GitHub)" <gi...@apache.org> on 2023/05/12 13:35:37 UTC

[GitHub] [nifi] Lehel44 opened a new pull request, #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Lehel44 opened a new pull request, #7240:
URL: https://github.com/apache/nifi/pull/7240

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-11178](https://issues.apache.org/jira/browse/NIFI-11178)
   
   - Removed FileStatus collections to improve memory usage. The processor sends out every FileStatus to a record or a flowfile if it's eligibe for listing
   - Introduced HdfsObjectWriter (similar to S3ObjectWriter) to handle record and flowfile writing with a common interface.
   - Refactored the recursive file listing method to an iterator, moved the branching conditions to the createPathFilter logic.
   - Completely refactored incremental loading. The last entries are not held back anymore.
   - Added logic for legacy state transitioning.
   - Fixed unit tests
   - Updated processor documentation.
   
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1226922484


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -319,131 +240,36 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
             this.resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestModifiedStatuses = new ArrayList<>();
+            StateMap stateMap = session.getState(Scope.CLUSTER);
+            String latestListedTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
+            String latestFiles = stateMap.get(LATEST_FILES_KEY);
+
+            final String legacyLatestListingTimestampString = stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = Long.parseLong(legacyLatestEmittedTimestampString);
+                latestModificationTime = legacyLatestListingTimestamp == legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : legacyLatestListingTimestamp;
+            } else if (latestListedTimestampString != null) {
+                latestModificationTime = Long.parseLong(latestListedTimestampString);
+                latestModifiedStatuses = new ArrayList<>(Arrays.asList(latestFiles.split("\\s")));

Review Comment:
   It looks minor but it is hard to follow why `latestListedTimestampString` becomes `latestModificationTime` after a simple parsing.
   Could not we just call there variables: `latestTimestampString`, `latestTimestamp`, `latestFilesString`, `latestFiles`?
   It would be more consistent with the state property names too.
   I would apply these names in `FileStatusManager` too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "Lehel44 (via GitHub)" <gi...@apache.org>.
Lehel44 commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1228820394


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FilterMode.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.nifi.components.DescribedValue;
+
+import java.util.stream.Stream;
+
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILE_FILTER;
+import static org.apache.nifi.processors.hadoop.ListHDFS.RECURSE_SUBDIRS;
+
+public enum FilterMode implements DescribedValue {
+
+    FILTER_DIRECTORIES_AND_FILES(
+            "filter-mode-directories-and-files",
+            "Directories and Files",
+            "Filtering will be applied to the names of directories and files.  If " + RECURSE_SUBDIRS.getDisplayName()
+                    + " is set to true, only subdirectories with a matching name will be searched for files that match "
+                    + "the regular expression defined in " + FILE_FILTER.getDisplayName() + "."
+    ),
+    FILTER_MODE_FILES_ONLY(
+            "filter-mode-files-only",
+            "Files Only",
+            "Filtering will only be applied to the names of files.  If " + RECURSE_SUBDIRS.getDisplayName()
+                    + " is set to true, the entire subdirectory tree will be searched for files that match "
+                    + "the regular expression defined in " + FILE_FILTER.getDisplayName() + "."
+    ),
+
+    FILTER_MODE_FULL_PATH(
+            "filter-mode-full-path",
+            "Full Path",
+            "Filtering will be applied by evaluating the regular expression defined in " + FILE_FILTER.getDisplayName()
+                    + " against the full path of files with and without the scheme and authority.  If "
+                    + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+                    + "the file matches the regular expression defined in " + FILE_FILTER.getDisplayName() + ".  See 'Additional Details' for more information."
+    );
+
+    private final String value;
+    private final String displayName;
+    private final String description;
+
+    FilterMode(final String value, final String displayName, final String description) {
+        this.value = value;
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    public static FilterMode forName(String filterMode) {

Review Comment:
   Using `valueOf` the name must match exactly the identifier used to declare the enum constant. In` createPathFilter`
   `final FilterMode filterMode = FilterMode.forName(context.getProperty(FILE_FILTER_MODE).getValue());`
   we get the property value from the context which is the value member of the enum and different from its name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1226887367


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HdfsObjectWriter.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
+
+import java.util.List;
+
+/**
+ * Interface for common management of writing to records and to flowfiles.
+ */
+public interface HdfsObjectWriter {
+
+    void write();
+
+    long getListedFileCount();
+
+    default boolean determineListable(final FileStatus status, final long minimumAge, final long maximumAge, final PathFilter filter,
+                                      final long latestModificationTime, final List<String> latestModifiedStatuses) {
+        final boolean isCopyInProgress = status.getPath().getName().endsWith("_COPYING_");
+        final boolean isFilterAccepted = filter.accept(status.getPath());
+        if (isCopyInProgress || !isFilterAccepted) {
+            return false;
+        }
+        // If the file was created during the processor's last iteration we have to check if it was already listed
+        if (status.getModificationTime() == latestModificationTime) {
+            return !latestModifiedStatuses.contains(status.getPath().toString());
+        }
+
+        final long fileAge = System.currentTimeMillis() - status.getModificationTime();

Review Comment:
   I see the logic was copied from the original version but calling `System.currentTimeMillis()` multiple times (individually for each file) may lead to skipped files so it is also a bug which should be fixed.
   
   Example (min age = 5 sec):
   - T0: file1 is created
   - T0+1: file2 is created
   - T0+5000: file1 is listed but it is not old enough (5000 - 0 = 5000 ms), so it will be skipped in this iteration
   - T0+5002: file2 is listed and it is old enough (5002 - 1 = 5001 ms), so it passes the test and will be processed
   
   The problem is that file2 also sets the latest modification time to T0+1 so file1 will not be listed in the next iteration either.
   
   Using a "global" current time (determined before all listings) would solve this issue.
   In this example, if the base timestamp is T+4999, then both files are not old enough and will be processed in the next iteration only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "Lehel44 (via GitHub)" <gi...@apache.org>.
Lehel44 commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1254212309


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HdfsObjectWriter.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+
+import java.util.List;
+
+/**
+ * Interface for common management of writing to records and to flowfiles.
+ */
+public abstract class HdfsObjectWriter {
+
+    protected final ProcessSession session;
+    protected final FileStatusIterable fileStatusIterable;
+    final long minimumAge;
+    final long maximumAge;
+    final PathFilter pathFilter;
+    final FileStatusManager fileStatusManager;
+    final long latestModificationTime;
+    final List<String> latestModifiedStatuses;
+    final long currentTimeMillis;
+    long fileCount;
+
+
+    HdfsObjectWriter(ProcessSession session, FileStatusIterable fileStatusIterable, long minimumAge, long maximumAge, PathFilter pathFilter,
+                     FileStatusManager fileStatusManager, long latestModificationTime, List<String> latestModifiedStatuses) {
+        this.session = session;
+        this.fileStatusIterable = fileStatusIterable;
+        this.minimumAge = minimumAge;
+        this.maximumAge = maximumAge;
+        this.pathFilter = pathFilter;
+        this.fileStatusManager = fileStatusManager;
+        this.latestModificationTime = latestModificationTime;
+        this.latestModifiedStatuses = latestModifiedStatuses;
+        currentTimeMillis = System.currentTimeMillis();
+        fileCount = 0L;
+    }
+
+    public abstract void write();
+
+    public long getListedFileCount() {
+        return fileCount;
+    }
+
+    boolean determineListable(final FileStatus status, final long minimumAge, final long maximumAge, final PathFilter filter,

Review Comment:
   Indeed, there is no need for them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1253194313


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -286,392 +200,141 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
             problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
                     .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
+            resetState = true;
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestTimestamp = 0L;
+            latestFiles = new ArrayList<>();
+            StateMap stateMap = session.getState(Scope.CLUSTER);
+            String latestTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
+
+            final String legacyLatestListingTimestampString = stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = Long.parseLong(legacyLatestEmittedTimestampString);
+                latestTimestamp = legacyLatestListingTimestamp == legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : legacyLatestListingTimestamp;
+                getLogger().debug("Transitioned from legacy state to new state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp': {}'," +
+                        "'latestTimestamp': {}", legacyLatestListingTimestamp, legacyLatestEmittedTimestamp, latestTimestamp);
+            } else if (latestTimestampString != null) {
+                latestTimestamp = Long.parseLong(latestTimestampString);
+                this.latestFiles = stateMap.toMap().entrySet().stream()
+                        .filter(entry -> entry.getKey().startsWith("latest.file"))
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
             }
-        } catch (final IOException ioe) {
+        } catch (IOException e) {
             getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
             context.yield();
             return;
         }
 
         // Pull in any file that is newer than the timestamp that we have.
-        final FileSystem hdfs = getFileSystem();
-        final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
+        try (final FileSystem hdfs = getFileSystem()) {
+            final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+            final PathFilter pathFilter = createPathFilter(context);
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final Set<FileStatus> statuses;
-        try {
+            final FileStatusManager fileStatusManager = new FileStatusManager();

Review Comment:
   I agree with @turcsanyip except the last part. Conceptually I would advise against initializing a final field from a non-final running field, even if it's hasn't changed at that point yet. I would keep those fields in the the object writer constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy closed pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy closed pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.
URL: https://github.com/apache/nifi/pull/7240


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "Lehel44 (via GitHub)" <gi...@apache.org>.
Lehel44 commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1254213700


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HdfsObjectWriter.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+
+import java.util.List;
+
+/**
+ * Interface for common management of writing to records and to flowfiles.
+ */
+public abstract class HdfsObjectWriter {

Review Comment:
   Yes I thought about simply FileStatusWriter but HadoopFileStatusWriter is more describing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1199124868


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -409,41 +259,44 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         // We have to ensure that we don't continually perform listings, because if we perform two listings within
         // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
         // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
+        if (notEnoughTimeElapsedToRun(context)) {

Review Comment:
   We no longer need to worry about two consecutive `onTriggers` happening too quickly. We can remove this check and also we can remove the waiting logic from the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1226928846


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -319,131 +240,36 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
             this.resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestModifiedStatuses = new ArrayList<>();
+            StateMap stateMap = session.getState(Scope.CLUSTER);
+            String latestListedTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
+            String latestFiles = stateMap.get(LATEST_FILES_KEY);
+
+            final String legacyLatestListingTimestampString = stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = Long.parseLong(legacyLatestEmittedTimestampString);
+                latestModificationTime = legacyLatestListingTimestamp == legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : legacyLatestListingTimestamp;
+            } else if (latestListedTimestampString != null) {
+                latestModificationTime = Long.parseLong(latestListedTimestampString);
+                latestModifiedStatuses = new ArrayList<>(Arrays.asList(latestFiles.split("\\s")));

Review Comment:
   Are we sure that file and directory names cannot contain spaces within the path?
   Space separated paths may not work, I'm afraid.
   Just an idea but other processors add the paths in separate entries in the state like `latest.file.1`, `latest.file.2`, etc. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1255861882


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -36,642 +33,306 @@
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.deprecation.log.DeprecationLogger;
-import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+import org.apache.nifi.processors.hadoop.util.FilterMode;
+import org.apache.nifi.processors.hadoop.util.writer.FlowFileObjectWriter;
+import org.apache.nifi.processors.hadoop.util.writer.HadoopFileStatusWriter;
+import org.apache.nifi.processors.hadoop.util.writer.RecordObjectWriter;
 import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-import java.io.File;
+
 import java.io.IOException;
-import java.io.OutputStream;
-import java.security.PrivilegedExceptionAction;
-import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_DIRECTORIES_AND_FILES;
 
 @PrimaryNodeOnly
 @TriggerSerially
 @TriggerWhenEmpty
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"hadoop", "HCFS", "HDFS", "get", "list", "ingest", "source", "filesystem"})
 @SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
-@CapabilityDescription("Retrieves a listing of files from HDFS. Each time a listing is performed, the files with the latest timestamp will be excluded "
-        + "and picked up during the next execution of the processor. This is done to ensure that we do not miss any files, or produce duplicates, in the "
-        + "cases where files with the same timestamp are written immediately before and after a single execution of the processor. For each file that is "
-        + "listed in HDFS, this processor creates a FlowFile that represents the HDFS file to be fetched in conjunction with FetchHDFS. This Processor is "
-        +  "designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left "
-        +  "off without duplicating all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
+@CapabilityDescription("Retrieves a listing of files from HDFS. For each file that is listed in HDFS, this processor creates a FlowFile that represents "
+        + "the HDFS file to be fetched in conjunction with FetchHDFS. This Processor is designed to run on Primary Node only in a cluster. If the primary "
+        + "node changes, the new Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike GetHDFS, this "
+        + "Processor does not delete any data from HDFS.")
 @WritesAttributes({
-    @WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."),
-    @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, "
-            + "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up "
-            + "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
-    @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"),
-    @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"),
-    @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
-    @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"),
-    @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"),
-    @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, "
-            + "3 for the group, and 3 for other users. For example rw-rw-r--")
+        @WritesAttribute(attribute = "filename", description = "The name of the file that was read from HDFS."),
+        @WritesAttribute(attribute = "path", description = "The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, "
+                + "then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up "
+                + "from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."),
+        @WritesAttribute(attribute = "hdfs.owner", description = "The user that owns the file in HDFS"),
+        @WritesAttribute(attribute = "hdfs.group", description = "The group that owns the file in HDFS"),
+        @WritesAttribute(attribute = "hdfs.lastModified", description = "The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
+        @WritesAttribute(attribute = "hdfs.length", description = "The number of bytes in the file in HDFS"),
+        @WritesAttribute(attribute = "hdfs.replication", description = "The number of HDFS replicas for hte file"),
+        @WritesAttribute(attribute = "hdfs.permissions", description = "The permissions for the file in HDFS. This is formatted as 3 characters for the owner, "
+                + "3 for the group, and 3 for other users. For example rw-rw-r--")
 })
-@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of HDFS files, the latest timestamp of all the files listed and the latest "
-        + "timestamp of all the files transferred are both stored. This allows the Processor to list only files that have been added or modified after "
-        + "this date the next time that the Processor is run, without having to store all of the actual filenames/paths which could lead to performance "
-        + "problems. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary "
-        + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of HDFS files, the latest timestamp of all the files listed is stored. "
+        + "This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run, "
+        + "without having to store all of the actual filenames/paths which could lead to performance problems. State is stored across the cluster "
+        + "so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up where the previous "
+        + "node left off, without duplicating the data.")
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class ListHDFS extends AbstractHadoopProcessor {
 
-    private static final RecordSchema RECORD_SCHEMA;
-    private static final String FILENAME = "filename";
-    private static final String PATH = "path";
-    private static final String IS_DIRECTORY = "directory";
-    private static final String SIZE = "size";
-    private static final String LAST_MODIFIED = "lastModified";
-    private static final String PERMISSIONS = "permissions";
-    private static final String OWNER = "owner";
-    private static final String GROUP = "group";
-    private static final String REPLICATION = "replication";
-    private static final String IS_SYM_LINK = "symLink";
-    private static final String IS_ENCRYPTED = "encrypted";
-    private static final String IS_ERASURE_CODED = "erasureCoded";
-
-    static {
-        final List<RecordField> recordFields = new ArrayList<>();
-        recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
-        recordFields.add(new RecordField(PATH, RecordFieldType.STRING.getDataType(), false));
-        recordFields.add(new RecordField(IS_DIRECTORY, RecordFieldType.BOOLEAN.getDataType(), false));
-        recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
-        recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
-        recordFields.add(new RecordField(PERMISSIONS, RecordFieldType.STRING.getDataType()));
-        recordFields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
-        recordFields.add(new RecordField(GROUP, RecordFieldType.STRING.getDataType()));
-        recordFields.add(new RecordField(REPLICATION, RecordFieldType.INT.getDataType()));
-        recordFields.add(new RecordField(IS_SYM_LINK, RecordFieldType.BOOLEAN.getDataType()));
-        recordFields.add(new RecordField(IS_ENCRYPTED, RecordFieldType.BOOLEAN.getDataType()));
-        recordFields.add(new RecordField(IS_ERASURE_CODED, RecordFieldType.BOOLEAN.getDataType()));
-        RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
-    }
+    private static final String NON_HIDDEN_FILES_REGEX = "[^\\.].*";
 
     public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
-        .name("Recurse Subdirectories")
-        .description("Indicates whether to list files from subdirectories of the HDFS directory")
-        .required(true)
-        .allowableValues("true", "false")
-        .defaultValue("true")
-        .build();
+            .name("Recurse Subdirectories")
+            .description("Indicates whether to list files from subdirectories of the HDFS directory")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
 
     public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
-        .name("record-writer")
-        .displayName("Record Writer")
-        .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
-            "all entities will be written to a single FlowFile.")
-        .required(false)
-        .identifiesControllerService(RecordSetWriterFactory.class)
-        .build();
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each "
+                    + "entity that is listed. If the Record Writer is specified, all entities will be written to a single FlowFile.")
+            .required(false)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
 
     public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
-        .name("File Filter")
-        .description("Only files whose names match the given regular expression will be picked up")
-        .required(true)
-        .defaultValue("[^\\.].*")
-        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-        .build();
-
-    private static final String FILTER_MODE_DIRECTORIES_AND_FILES = "filter-mode-directories-and-files";
-    private static final String FILTER_MODE_FILES_ONLY = "filter-mode-files-only";
-    private static final String FILTER_MODE_FULL_PATH = "filter-mode-full-path";
-    static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES,
-        "Directories and Files",
-        "Filtering will be applied to the names of directories and files.  If " + RECURSE_SUBDIRS.getDisplayName()
-                + " is set to true, only subdirectories with a matching name will be searched for files that match "
-                + "the regular expression defined in " + FILE_FILTER.getDisplayName() + ".");
-    static final AllowableValue FILTER_FILES_ONLY_VALUE = new AllowableValue(FILTER_MODE_FILES_ONLY,
-        "Files Only",
-        "Filtering will only be applied to the names of files.  If " + RECURSE_SUBDIRS.getDisplayName()
-                + " is set to true, the entire subdirectory tree will be searched for files that match "
-                + "the regular expression defined in " + FILE_FILTER.getDisplayName() + ".");
-    static final AllowableValue FILTER_FULL_PATH_VALUE = new AllowableValue(FILTER_MODE_FULL_PATH,
-        "Full Path",
-        "Filtering will be applied by evaluating the regular expression defined in " + FILE_FILTER.getDisplayName()
-                + " against the full path of files with and without the scheme and authority.  If "
-                + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
-                + "the file matches the regular expression defined in " + FILE_FILTER.getDisplayName() + ".  See 'Additional Details' for more information.");
+            .name("File Filter")
+            .description("Only files whose names match the given regular expression will be picked up")
+            .required(true)
+            .defaultValue(NON_HIDDEN_FILES_REGEX)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
 
     public static final PropertyDescriptor FILE_FILTER_MODE = new PropertyDescriptor.Builder()
-        .name("file-filter-mode")
-        .displayName("File Filter Mode")
-        .description("Determines how the regular expression in  " + FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
-        .required(true)
-        .allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE, FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE)
-        .defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue())
-        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-        .build();
-
-    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
-        .name("minimum-file-age")
-        .displayName("Minimum File Age")
-        .description("The minimum age that a file must be in order to be pulled; any file younger than this "
-                + "amount of time (based on last modification date) will be ignored")
-        .required(false)
-        .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
-        .build();
-
-    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
-        .name("maximum-file-age")
-        .displayName("Maximum File Age")
-        .description("The maximum age that a file must be in order to be pulled; any file older than this "
-                + "amount of time (based on last modification date) will be ignored. Minimum value is 100ms.")
-        .required(false)
-        .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
-        .build();
+            .name("file-filter-mode")
+            .displayName("File Filter Mode")
+            .description("Determines how the regular expression in  " + FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
+            .required(true)
+            .allowableValues(FilterMode.class)
+            .defaultValue(FILTER_DIRECTORIES_AND_FILES.getValue())
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MINIMUM_FILE_AGE = new PropertyDescriptor.Builder()
+            .name("minimum-file-age")
+            .displayName("Minimum File Age")
+            .description("The minimum age that a file must be in order to be pulled; any file younger than this "
+                    + "amount of time (based on last modification date) will be ignored")
+            .required(false)
+            .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+            .build();
+
+    public static final PropertyDescriptor MAXIMUM_FILE_AGE = new PropertyDescriptor.Builder()
+            .name("maximum-file-age")
+            .displayName("Maximum File Age")
+            .description("The maximum age that a file must be in order to be pulled; any file older than this "
+                    + "amount of time (based on last modification date) will be ignored. Minimum value is 100ms.")
+            .required(false)
+            .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("All FlowFiles are transferred to this relationship")
-        .build();
-
-    private static final DeprecationLogger deprecationLogger = DeprecationLoggerFactory.getLogger(ListHDFS.class);
-
-    private volatile long latestTimestampListed = -1L;
-    private volatile long latestTimestampEmitted = -1L;
-    private volatile long lastRunTimestamp = -1L;
-    private volatile boolean resetState = false;
-    static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
-    static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
-
-    static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
+            .name("success")
+            .description("All FlowFiles are transferred to this relationship")
+            .build();
+    public static final String LEGACY_EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
+    public static final String LEGACY_LISTING_TIMESTAMP_KEY = "listing.timestamp";
+    public static final String LATEST_TIMESTAMP_KEY = "latest.timestamp";
+    public static final String LATEST_FILES_KEY = "latest.file.%d";
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
     private Pattern fileFilterRegexPattern;
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        super.init(context);
-    }
+    private volatile boolean resetState = false;
 
     @Override
     protected void preProcessConfiguration(Configuration config, ProcessContext context) {
         super.preProcessConfiguration(config, context);
         // Since this processor is marked as INPUT_FORBIDDEN, the FILE_FILTER regex can be compiled here rather than during onTrigger processing
         fileFilterRegexPattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
-
-    }
-
-    protected File getPersistenceFile() {
-        return new File("conf/state/" + getIdentifier());
     }
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> props = new ArrayList<>(properties);
-        props.add(DIRECTORY);
-        props.add(RECURSE_SUBDIRS);
-        props.add(RECORD_WRITER);
-        props.add(FILE_FILTER);
-        props.add(FILE_FILTER_MODE);
-        props.add(MIN_AGE);
-        props.add(MAX_AGE);
+        props.addAll(Arrays.asList(DIRECTORY, RECURSE_SUBDIRS, RECORD_WRITER, FILE_FILTER, FILE_FILTER_MODE, MINIMUM_FILE_AGE, MAXIMUM_FILE_AGE));
         return props;
     }
 
     @Override
     public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        return relationships;
+        return RELATIONSHIPS;
     }
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext context) {
 
         final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context));
 
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
         final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
         final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
 
         if (minimumAge > maximumAge) {
-            problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
-                    .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
+            problems.add(new ValidationResult.Builder().valid(false).subject("ListHDFS Configuration")
+                    .explanation(MINIMUM_FILE_AGE.getDisplayName() + " cannot be greater than " + MAXIMUM_FILE_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
-        }
-    }
-
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
+            resetState = true;
         }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
     }
 
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
+        final long latestTimestamp;
+        final List<String> latestFiles;
         try {
             final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
+            final String latestTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
+
+            final String legacyLatestListingTimestampString = stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = Long.parseLong(legacyLatestEmittedTimestampString);
+                latestTimestamp = legacyLatestListingTimestamp == legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : legacyLatestListingTimestamp;
+                latestFiles = new ArrayList<>();
+                getLogger().debug("Transitioned from legacy state to new state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp': {}'," +
+                        "'latestTimestamp': {}", legacyLatestListingTimestamp, legacyLatestEmittedTimestamp, latestTimestamp);
+            } else if (latestTimestampString != null) {
+                latestTimestamp = Long.parseLong(latestTimestampString);
+                latestFiles = stateMap.toMap().entrySet().stream()
+                        .filter(entry -> entry.getKey().startsWith("latest.file"))
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
             } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+                latestTimestamp = 0L;
+                latestFiles = new ArrayList<>();
             }
-        } catch (final IOException ioe) {
+        } catch (IOException e) {
             getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
             context.yield();
             return;
         }
 
         // Pull in any file that is newer than the timestamp that we have.
-        final FileSystem hdfs = getFileSystem();
-        final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
+        try (final FileSystem hdfs = getFileSystem()) {
+            final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+            final PathFilter pathFilter = createPathFilter(context);
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final Set<FileStatus> statuses;
-        try {
+            final FileStatusManager fileStatusManager = new FileStatusManager(latestTimestamp, latestFiles);
             final Path rootPath = getNormalizedPath(context, DIRECTORY);
-            statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
-            getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
-        } catch (final IOException | IllegalArgumentException e) {
-            getLogger().error("Failed to perform listing of HDFS", e);
-            return;
-        } catch (final InterruptedException e) {
-            Thread.currentThread().interrupt();
-            getLogger().error("Interrupted while performing listing of HDFS", e);
-            return;
-        }
+            final FileStatusIterable fileStatuses = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
 
-        final Set<FileStatus> listable = determineListable(statuses, context);
-        getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()});
-
-        // Create FlowFile(s) for the listing, if there are any
-        if (!listable.isEmpty()) {
-            if (context.getProperty(RECORD_WRITER).isSet()) {
-                try {
-                    createRecords(listable, context, session);
-                } catch (final IOException | SchemaNotFoundException e) {
-                    getLogger().error("Failed to write listing of HDFS", e);
-                    return;
-                }
-            } else {
-                createFlowFiles(listable, session);
-            }
-        }
-
-        for (final FileStatus status : listable) {
-            final long fileModTime = status.getModificationTime();
-            if (fileModTime > latestTimestampEmitted) {
-                latestTimestampEmitted = fileModTime;
-            }
-        }
-
-        final Map<String, String> updatedState = new HashMap<>(1);
-        updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(latestTimestampListed));
-        updatedState.put(EMITTED_TIMESTAMP_KEY, String.valueOf(latestTimestampEmitted));
-        getLogger().debug("New state map: {}", new Object[] {updatedState});
-
-        try {
-            session.setState(updatedState, Scope.CLUSTER);
-        } catch (final IOException ioe) {
-            getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
-        }
-
-        final int listCount = listable.size();
-        if ( listCount > 0 ) {
-            getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
-            session.commitAsync();
-        } else {
-            getLogger().debug("There is no data to list. Yielding.");
-            context.yield();
-        }
-    }
+            final Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+            final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
+            final Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+            final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
 
-    private void createFlowFiles(final Set<FileStatus> fileStatuses, final ProcessSession session) {
-        for (final FileStatus status : fileStatuses) {
-            final Map<String, String> attributes = createAttributes(status);
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            session.transfer(flowFile, getSuccessRelationship());
-        }
-    }
-
-    private void createRecords(final Set<FileStatus> fileStatuses, final ProcessContext context, final ProcessSession session) throws IOException, SchemaNotFoundException {
-        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
-        FlowFile flowFile = session.create();
-        final WriteResult writeResult;
-        try (final OutputStream out = session.write(flowFile);
-             final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
-
-            recordSetWriter.beginRecordSet();
-            for (final FileStatus fileStatus : fileStatuses) {
-                final Record record = createRecord(fileStatus);
-                recordSetWriter.write(record);
+            final HadoopFileStatusWriter writer;
+            if (writerFactory == null) {
+                writer = new FlowFileObjectWriter(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp, latestFiles);
+            } else {
+                writer = new RecordObjectWriter(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp,
+                        latestFiles, writerFactory, getLogger());
             }
 
-            writeResult = recordSetWriter.finishRecordSet();
-        }
+            writer.write();
 
-        final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
-        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
-        flowFile = session.putAllAttributes(flowFile, attributes);
-
-        session.transfer(flowFile, getSuccessRelationship());
-    }
-
-    private Record createRecord(final FileStatus fileStatus) {
-        final Map<String, Object> values = new HashMap<>();
-        values.put(FILENAME, fileStatus.getPath().getName());
-        values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
-        values.put(OWNER, fileStatus.getOwner());
-        values.put(GROUP, fileStatus.getGroup());
-        values.put(LAST_MODIFIED, new Timestamp(fileStatus.getModificationTime()));
-        values.put(SIZE, fileStatus.getLen());
-        values.put(REPLICATION, fileStatus.getReplication());
-
-        final FsPermission permission = fileStatus.getPermission();
-        final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
-        values.put(PERMISSIONS, perms);
-
-        values.put(IS_DIRECTORY, fileStatus.isDirectory());
-        values.put(IS_SYM_LINK, fileStatus.isSymlink());
-        values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
-        values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
-
-        return new MapRecord(getRecordSchema(), values);
-    }
-
-    private RecordSchema getRecordSchema() {
-        return RECORD_SCHEMA;
-    }
+            getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatuses.getTotalFileCount(), writer.getListedFileCount());
 
-    private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) throws IOException, InterruptedException {
-        final Set<FileStatus> statusSet = new HashSet<>();
-
-        getLogger().debug("Fetching listing for {}", new Object[] {path});
-        final FileStatus[] statuses;
-        if (isPostListingFilterNeeded(filterMode)) {
-            // For this filter mode, the filter is not passed to listStatus, so that directory names will not be
-            // filtered out when the listing is recursive.
-            statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path));
-        } else {
-            statuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path, filter));
-        }
-
-        for ( final FileStatus status : statuses ) {
-            if ( status.isDirectory() ) {
-                if ( recursive ) {
-                    try {
-                        statusSet.addAll(getStatuses(status.getPath(), recursive, hdfs, filter, filterMode));
-                    } catch (final IOException ioe) {
-                        getLogger().error("Failed to retrieve HDFS listing for subdirectory {} due to {}; will continue listing others", new Object[] {status.getPath(), ioe});
-                    }
+            if (writer.getListedFileCount() > 0) {
+                final Map<String, String> updatedState = new HashMap<>();
+                updatedState.put(LATEST_TIMESTAMP_KEY, String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
+                final List<String> files = fileStatusManager.getCurrentLatestFiles();
+                for (int i = 0; i < files.size(); i++) {
+                    final String currentFilePath = files.get(i);
+                    updatedState.put(String.format(LATEST_FILES_KEY, i), currentFilePath);
                 }
+                getLogger().debug("New state map: {}", updatedState);
+                updateState(session, updatedState);
+
+                getLogger().info("Successfully created listing with {} new files from HDFS", writer.getListedFileCount());
             } else {
-                if (isPostListingFilterNeeded(filterMode)) {
-                    // Filtering explicitly performed here, since it was not able to be done when calling listStatus.
-                    if (filter.accept(status.getPath())) {
-                        statusSet.add(status);
-                    }
-                } else {
-                    statusSet.add(status);
-                }
+                getLogger().debug("There is no data to list. Yielding.");
+                context.yield();
             }
+        } catch (IOException e) {
+            throw new ProcessException("IO error occurred when closing HDFS file system", e);
         }
-
-        return statusSet;
-    }
-
-    /**
-     * Determines if filtering needs to be applied, after calling {@link FileSystem#listStatus(Path)}, based on the
-     * given filter mode.
-     * Filter modes that need to be able to search directories regardless of the given filter should return true.
-     * FILTER_MODE_FILES_ONLY and FILTER_MODE_FULL_PATH require that {@link FileSystem#listStatus(Path)} be invoked
-     * without a filter so that all directories can be traversed when filtering with these modes.
-     * FILTER_MODE_DIRECTORIES_AND_FILES should return false, since filtering can be applied directly with
-     * {@link FileSystem#listStatus(Path, PathFilter)} regardless of a recursive listing.
-     * @param filterMode the value of one of the defined AllowableValues representing filter modes
-     * @return true if results need to be filtered, false otherwise
-     */
-    private boolean isPostListingFilterNeeded(String filterMode) {
-        return filterMode.equals(FILTER_MODE_FILES_ONLY) || filterMode.equals(FILTER_MODE_FULL_PATH);
-    }
-
-    private String getAbsolutePath(final Path path) {
-        final Path parent = path.getParent();
-        final String prefix = (parent == null || parent.getName().equals("")) ? "" : getAbsolutePath(parent);
-        return prefix + "/" + path.getName();
-    }
-
-    private Map<String, String> createAttributes(final FileStatus status) {
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
-        attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
-
-        attributes.put(getAttributePrefix() + ".owner", status.getOwner());
-        attributes.put(getAttributePrefix() + ".group", status.getGroup());
-        attributes.put(getAttributePrefix() + ".lastModified", String.valueOf(status.getModificationTime()));
-        attributes.put(getAttributePrefix() + ".length", String.valueOf(status.getLen()));
-        attributes.put(getAttributePrefix() + ".replication", String.valueOf(status.getReplication()));
-
-        final FsPermission permission = status.getPermission();
-        final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
-        attributes.put(getAttributePrefix() + ".permissions", perms);
-        return attributes;
-    }
-
-    private String getPerms(final FsAction action) {
-        final StringBuilder sb = new StringBuilder();
-        if (action.implies(FsAction.READ)) {
-            sb.append("r");
-        } else {
-            sb.append("-");
-        }
-
-        if (action.implies(FsAction.WRITE)) {
-            sb.append("w");
-        } else {
-            sb.append("-");
-        }
-
-        if (action.implies(FsAction.EXECUTE)) {
-            sb.append("x");
-        } else {
-            sb.append("-");
-        }
-
-        return sb.toString();
     }
 
     private PathFilter createPathFilter(final ProcessContext context) {
-        final String filterMode = context.getProperty(FILE_FILTER_MODE).getValue();
-        return path -> {
-            final boolean accepted;
-            if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
-                accepted = fileFilterRegexPattern.matcher(path.toString()).matches()
+        final FilterMode filterMode = FilterMode.forName(context.getProperty(FILE_FILTER_MODE).getValue());
+        final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+
+        switch (filterMode) {
+            case FILTER_MODE_FILES_ONLY:
+                return path -> fileFilterRegexPattern.matcher(path.getName()).matches();
+            case FILTER_MODE_FULL_PATH:
+                return path -> fileFilterRegexPattern.matcher(path.toString()).matches()
                         || fileFilterRegexPattern.matcher(Path.getPathWithoutSchemeAndAuthority(path).toString()).matches();
-            } else {
-                accepted =  fileFilterRegexPattern.matcher(path.getName()).matches();
-            }
-            return accepted;
-        };
+            // FILTER_DIRECTORIES_AND_FILES
+            default:
+                return path -> Stream.of(path.toString().split("/"))

Review Comment:
   This case is not handling the path properly. It is working on the entire url. This can lead to dropping files that should be listed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1226829865


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileObjectWriter.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.hadoop.ListHDFS;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FlowFileObjectWriter implements HdfsObjectWriter {
+
+    private static final String HDFS_ATTRIBUTE_PREFIX = "hdfs";
+
+    private final ProcessSession session;
+    private final FileStatusIterable fileStatuses;
+    final long minimumAge;
+    final long maximumAge;
+    final PathFilter pathFilter;
+    final FileStatusManager fileStatusManager;
+    final long latestModificationTime;
+    final List<String> latestModifiedStatuses;
+    private long listedFileCount;
+
+
+    public FlowFileObjectWriter(final ProcessSession session, final FileStatusIterable fileStatuses, final long minimumAge,
+                                final long maximumAge, final PathFilter pathFilter, final FileStatusManager fileStatusManager,
+                                final long latestModificationTime, final List<String> latestModifiedStatuses) {
+        this.session = session;
+        this.fileStatuses = fileStatuses;
+        this.minimumAge = minimumAge;
+        this.maximumAge = maximumAge;
+        this.pathFilter = pathFilter;
+        this.fileStatusManager = fileStatusManager;
+        this.latestModificationTime = latestModificationTime;
+        this.latestModifiedStatuses = latestModifiedStatuses;
+        listedFileCount = 0;

Review Comment:
   The common fields should be extracted into an abstract class.
   `determineListable()` should be moved there too and in that case most of the method parameters are not needed.
   There are also other common methods too: `getAbsolutePath()`, `getPerms()`, `getPermissionsString()`, `getListedFileCount()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1253156101


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FileStatusIterable implements Iterable<FileStatus> {
+
+    private final Path path;
+    private final boolean recursive;
+    private final FileSystem hdfs;
+    private final UserGroupInformation userGroupInformation;
+    private final AtomicLong totalFileCount = new AtomicLong();
+
+    public FileStatusIterable(final Path path, final boolean recursive, final FileSystem hdfs, final UserGroupInformation userGroupInformation) {
+        this.path = path;
+        this.recursive = recursive;
+        this.hdfs = hdfs;
+        this.userGroupInformation = userGroupInformation;
+    }
+
+    @Override
+    public Iterator<FileStatus> iterator() {
+        return new FileStatusIterator();
+    }
+
+    public long getTotalFileCount() {
+        return totalFileCount.get();
+    }
+
+    class FileStatusIterator implements Iterator<FileStatus> {
+
+        private static final String IO_ERROR_MESSAGE = "IO error occurred while iterating HFDS";
+
+        private final Deque<Path> dirStatuses;
+
+        private FileStatus nextFileStatus;
+        private RemoteIterator<FileStatus> hdfsIterator;

Review Comment:
   Or we could call it `fileStatusIterator` (that aspect is more significant imo).
   Whichever the case, rename the `getHdfsIterator` method as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1258310528


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordObjectWriter.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.OutputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.hadoop.ListHDFS.REL_SUCCESS;
+
+public class RecordObjectWriter extends HadoopFileStatusWriter {
+
+    private static final RecordSchema RECORD_SCHEMA;
+
+    private static final String FILENAME = "filename";
+    private static final String PATH = "path";
+    private static final String IS_DIRECTORY = "directory";
+    private static final String SIZE = "size";
+    private static final String LAST_MODIFIED = "lastModified";
+    private static final String PERMISSIONS = "permissions";
+    private static final String OWNER = "owner";
+    private static final String GROUP = "group";
+    private static final String REPLICATION = "replication";
+    private static final String IS_SYM_LINK = "symLink";
+    private static final String IS_ENCRYPTED = "encrypted";
+    private static final String IS_ERASURE_CODED = "erasureCoded";
+
+    static {
+        final List<RecordField> recordFields = new ArrayList<>();
+        recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(PATH, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(IS_DIRECTORY, RecordFieldType.BOOLEAN.getDataType(), false));
+        recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
+        recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
+        recordFields.add(new RecordField(PERMISSIONS, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(GROUP, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(REPLICATION, RecordFieldType.INT.getDataType()));
+        recordFields.add(new RecordField(IS_SYM_LINK, RecordFieldType.BOOLEAN.getDataType()));
+        recordFields.add(new RecordField(IS_ENCRYPTED, RecordFieldType.BOOLEAN.getDataType()));
+        recordFields.add(new RecordField(IS_ERASURE_CODED, RecordFieldType.BOOLEAN.getDataType()));
+        RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
+    }
+
+
+    private final RecordSetWriterFactory writerFactory;
+    private final ComponentLog logger;
+
+    public RecordObjectWriter(final ProcessSession session,
+                              final FileStatusIterable fileStatuses,
+                              final long minimumAge,
+                              final long maximumAge,
+                              final PathFilter pathFilter,
+                              final FileStatusManager fileStatusManager,
+                              final long previousLatestModificationTime,
+                              final List<String> previousLatestFiles,
+                              final  RecordSetWriterFactory writerFactory,
+                              final ComponentLog logger) {
+        super(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, previousLatestModificationTime, previousLatestFiles);
+        this.writerFactory = writerFactory;
+        this.logger = logger;
+    }
+
+    @Override
+    public void write() {
+        FlowFile flowFile = session.create();
+
+        final OutputStream out = session.write(flowFile);
+        try (RecordSetWriter recordWriter = writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile)) {
+            recordWriter.beginRecordSet();
+
+            for (FileStatus status : fileStatusIterable) {
+                if (determineListable(status)) {
+                    recordWriter.write(createRecordForListing(status));
+                    fileStatusManager.update(status);
+                }
+            }
+
+            WriteResult writeResult = recordWriter.finishRecordSet();
+            fileCount = writeResult.getRecordCount();
+
+            if (fileCount == 0) {
+                session.remove(flowFile);
+            } else {
+                final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                flowFile = session.putAllAttributes(flowFile, attributes);
+
+                session.transfer(flowFile, REL_SUCCESS);
+            }
+        } catch (Exception e) {
+            throw new ProcessException("An error occurred while writing results", e);
+        }

Review Comment:
   The `session.putAllAttributes` throws an exception while the `OutputStream out = session.write` is still open.
   We can fix it like this:
   ```suggestion
           final WriteResult writeResult;
           try (
               final OutputStream out = session.write(flowFile);
               final RecordSetWriter recordWriter = writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile)
           ) {
               recordWriter.beginRecordSet();
   
               for (FileStatus status : fileStatusIterable) {
                   if (determineListable(status)) {
                       recordWriter.write(createRecordForListing(status));
                       fileStatusManager.update(status);
                   }
               }
   
               writeResult = recordWriter.finishRecordSet();
           } catch (Exception e) {
               throw new ProcessException("An error occurred while writing results", e);
           }
           
           fileCount = writeResult.getRecordCount();
           if (fileCount == 0) {
               session.remove(flowFile);
           } else {
               final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
               attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
               flowFile = session.putAllAttributes(flowFile, attributes);
   
               session.transfer(flowFile, REL_SUCCESS);
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1253194313


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -286,392 +200,141 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
             problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
                     .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
+            resetState = true;
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestTimestamp = 0L;
+            latestFiles = new ArrayList<>();
+            StateMap stateMap = session.getState(Scope.CLUSTER);
+            String latestTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
+
+            final String legacyLatestListingTimestampString = stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = Long.parseLong(legacyLatestEmittedTimestampString);
+                latestTimestamp = legacyLatestListingTimestamp == legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : legacyLatestListingTimestamp;
+                getLogger().debug("Transitioned from legacy state to new state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp': {}'," +
+                        "'latestTimestamp': {}", legacyLatestListingTimestamp, legacyLatestEmittedTimestamp, latestTimestamp);
+            } else if (latestTimestampString != null) {
+                latestTimestamp = Long.parseLong(latestTimestampString);
+                this.latestFiles = stateMap.toMap().entrySet().stream()
+                        .filter(entry -> entry.getKey().startsWith("latest.file"))
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
             }
-        } catch (final IOException ioe) {
+        } catch (IOException e) {
             getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
             context.yield();
             return;
         }
 
         // Pull in any file that is newer than the timestamp that we have.
-        final FileSystem hdfs = getFileSystem();
-        final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
+        try (final FileSystem hdfs = getFileSystem()) {
+            final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+            final PathFilter pathFilter = createPathFilter(context);
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final Set<FileStatus> statuses;
-        try {
+            final FileStatusManager fileStatusManager = new FileStatusManager();

Review Comment:
   I agree with @turcsanyip except this:
   
   > In this case, it would also be enough to pass FileStatusManager to the object writers and they can retrieve the previous latest values from it in the constructor.
   
   Conceptually I would advise against initializing a final field from a non-final running field, even if it's hasn't changed at that point yet. I would keep those as parameters in the the object writer constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1253194313


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -286,392 +200,141 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
             problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
                     .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
+            resetState = true;
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestTimestamp = 0L;
+            latestFiles = new ArrayList<>();
+            StateMap stateMap = session.getState(Scope.CLUSTER);
+            String latestTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
+
+            final String legacyLatestListingTimestampString = stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = Long.parseLong(legacyLatestEmittedTimestampString);
+                latestTimestamp = legacyLatestListingTimestamp == legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : legacyLatestListingTimestamp;
+                getLogger().debug("Transitioned from legacy state to new state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp': {}'," +
+                        "'latestTimestamp': {}", legacyLatestListingTimestamp, legacyLatestEmittedTimestamp, latestTimestamp);
+            } else if (latestTimestampString != null) {
+                latestTimestamp = Long.parseLong(latestTimestampString);
+                this.latestFiles = stateMap.toMap().entrySet().stream()
+                        .filter(entry -> entry.getKey().startsWith("latest.file"))
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
             }
-        } catch (final IOException ioe) {
+        } catch (IOException e) {
             getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
             context.yield();
             return;
         }
 
         // Pull in any file that is newer than the timestamp that we have.
-        final FileSystem hdfs = getFileSystem();
-        final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
+        try (final FileSystem hdfs = getFileSystem()) {
+            final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+            final PathFilter pathFilter = createPathFilter(context);
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final Set<FileStatus> statuses;
-        try {
+            final FileStatusManager fileStatusManager = new FileStatusManager();

Review Comment:
   I agree with @turcsanyip except the last part. Conceptually I would advise against initializing a final field from a non-final running field, even if it's hasn't changed at that point yet. I would keep those as parameters in the the object writer constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on PR #7240:
URL: https://github.com/apache/nifi/pull/7240#issuecomment-1630806859

   LGTM
   Thank you for your work @Lehel44 and for your review @turcsanyip .
   Merged into main.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1224345469


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterator.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FileStatusIterator implements Iterator<FileStatus> {
+
+    private static final String IO_ERROR_MESSAGE = "IO error occured while iterating HFDS";
+
+    private final boolean recursive;
+    private final FileSystem hdfs;
+    private final UserGroupInformation userGroupInformation;
+    private final Deque<FileStatus> fileStatuses;
+    private final Deque<FileStatus> dirStatuses;
+    private final AtomicLong totalFileCount;
+
+    public FileStatusIterator(final Path path, final boolean recursive, final FileSystem hdfs, final UserGroupInformation userGroupInformation,

Review Comment:
   I think the algorithm used here is quite smart and looks good.
   It's a fairly crucial part as well so I think we should have dedicated unit tests for it.



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -319,131 +240,36 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
             this.resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestModifiedStatuses = new ArrayList<>();

Review Comment:
   We need to reset the `latestModificationTime` as well, otherwise it prevents us from starting the listing from scratch even if the state has been cleared.
   ```suggestion
               latestModificationTime = 0L;
               latestModifiedStatuses = new ArrayList<>();
   ```



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -452,243 +278,83 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         // Pull in any file that is newer than the timestamp that we have.
         final FileSystem hdfs = getFileSystem();
         final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
-
-        final Set<FileStatus> statuses;
-        try {
-            final Path rootPath = getNormalizedPath(context, DIRECTORY);
-            statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
-            getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
-        } catch (final IOException | IllegalArgumentException e) {
-            getLogger().error("Failed to perform listing of HDFS", e);
-            return;
-        } catch (final InterruptedException e) {
-            Thread.currentThread().interrupt();
-            getLogger().error("Interrupted while performing listing of HDFS", e);
-            return;
-        }
-
-        final Set<FileStatus> listable = determineListable(statuses, context);
-        getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()});
-
-        // Create FlowFile(s) for the listing, if there are any
-        if (!listable.isEmpty()) {
-            if (context.getProperty(RECORD_WRITER).isSet()) {
-                try {
-                    createRecords(listable, context, session);
-                } catch (final IOException | SchemaNotFoundException e) {
-                    getLogger().error("Failed to write listing of HDFS", e);
-                    return;
-                }
-            } else {
-                createFlowFiles(listable, session);
-            }
-        }
-
-        for (final FileStatus status : listable) {
-            final long fileModTime = status.getModificationTime();
-            if (fileModTime > latestTimestampEmitted) {
-                latestTimestampEmitted = fileModTime;
-            }
-        }
-
-        final Map<String, String> updatedState = new HashMap<>(1);
-        updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(latestTimestampListed));
-        updatedState.put(EMITTED_TIMESTAMP_KEY, String.valueOf(latestTimestampEmitted));
-        getLogger().debug("New state map: {}", new Object[] {updatedState});
-
-        try {
-            session.setState(updatedState, Scope.CLUSTER);
-        } catch (final IOException ioe) {
-            getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
-        }
-
-        final int listCount = listable.size();
-        if ( listCount > 0 ) {
-            getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
-            session.commitAsync();
-        } else {
-            getLogger().debug("There is no data to list. Yielding.");
-            context.yield();
-        }
-    }
-
-    private void createFlowFiles(final Set<FileStatus> fileStatuses, final ProcessSession session) {
-        for (final FileStatus status : fileStatuses) {
-            final Map<String, String> attributes = createAttributes(status);
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            session.transfer(flowFile, getSuccessRelationship());
-        }
-    }
-
-    private void createRecords(final Set<FileStatus> fileStatuses, final ProcessContext context, final ProcessSession session) throws IOException, SchemaNotFoundException {
+        final PathFilter pathFilter = createPathFilter(context);
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        FlowFile flowFile = session.create();
-        final WriteResult writeResult;
-        try (final OutputStream out = session.write(flowFile);
-             final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
+        final FileStatusManager fileStatusManager = new FileStatusManager();
+        final Path rootPath = getNormalizedPath(context, DIRECTORY);
+        final FileStatusIterable fileStatuses = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
 
-            recordSetWriter.beginRecordSet();
-            for (final FileStatus fileStatus : fileStatuses) {
-                final Record record = createRecord(fileStatus);
-                recordSetWriter.write(record);
-            }
-
-            writeResult = recordSetWriter.finishRecordSet();
-        }
+        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
+        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
 
-        final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
-        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
-        flowFile = session.putAllAttributes(flowFile, attributes);
+        final HdfsObjectWriter writer = getHdfsObjectWriter(session, writerFactory, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager);
 
-        session.transfer(flowFile, getSuccessRelationship());
-    }
+        writer.write();
 
-    private Record createRecord(final FileStatus fileStatus) {
-        final Map<String, Object> values = new HashMap<>();
-        values.put(FILENAME, fileStatus.getPath().getName());
-        values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
-        values.put(OWNER, fileStatus.getOwner());
-        values.put(GROUP, fileStatus.getGroup());
-        values.put(LAST_MODIFIED, new Timestamp(fileStatus.getModificationTime()));
-        values.put(SIZE, fileStatus.getLen());
-        values.put(REPLICATION, fileStatus.getReplication());
-
-        final FsPermission permission = fileStatus.getPermission();
-        final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
-        values.put(PERMISSIONS, perms);
-
-        values.put(IS_DIRECTORY, fileStatus.isDirectory());
-        values.put(IS_SYM_LINK, fileStatus.isSymlink());
-        values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
-        values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
-
-        return new MapRecord(getRecordSchema(), values);
-    }
+        getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatuses.getTotalFileCount(), writer.getListedFileCount());
 
-    private RecordSchema getRecordSchema() {
-        return RECORD_SCHEMA;
-    }
 
-    private Set<FileStatus> getStatuses(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) throws IOException, InterruptedException {
-        final Set<FileStatus> statusSet = new HashSet<>();
+        if (writer.getListedFileCount() > 0) {
+            final Map<String, String> updatedState = new HashMap<>(1);

Review Comment:
   ```suggestion
               final Map<String, String> updatedState = new HashMap<>();
   ```



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FileStatusIterable implements Iterable<FileStatus> {
+
+    private final Path path;
+    private final boolean recursive;
+    private final FileSystem hdfs;
+    private final UserGroupInformation userGroupInformation;
+    private final AtomicLong totalFileCount = new AtomicLong();
+
+    public FileStatusIterable(final Path path, final boolean recursive, final FileSystem hdfs, final UserGroupInformation userGroupInformation) {

Review Comment:
   Minor: Just wondering if we could get rid of either the `FileStatusIterable` or the `FileStatusIterator`. Specifically the `Iterable` is really only used so that we can have a `foreach` instead of a `for` calling `hasNext` and `next` explicitly.
   This is more declarative though so just a thought.
   



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterator.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FileStatusIterator implements Iterator<FileStatus> {
+
+    private static final String IO_ERROR_MESSAGE = "IO error occured while iterating HFDS";

Review Comment:
   ```suggestion
       private static final String IO_ERROR_MESSAGE = "IO error occurred while iterating HFDS";
   
   ```



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordObjectWriter.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.OutputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.processors.hadoop.ListHDFS.REL_SUCCESS;
+
+public class RecordObjectWriter implements HdfsObjectWriter {
+
+    private static final RecordSchema RECORD_SCHEMA;
+
+    private static final String FILENAME = "filename";
+    private static final String PATH = "path";
+    private static final String IS_DIRECTORY = "directory";
+    private static final String SIZE = "size";
+    private static final String LAST_MODIFIED = "lastModified";
+    private static final String PERMISSIONS = "permissions";
+    private static final String OWNER = "owner";
+    private static final String GROUP = "group";
+    private static final String REPLICATION = "replication";
+    private static final String IS_SYM_LINK = "symLink";
+    private static final String IS_ENCRYPTED = "encrypted";
+    private static final String IS_ERASURE_CODED = "erasureCoded";
+
+    static {
+        final List<RecordField> recordFields = new ArrayList<>();
+        recordFields.add(new RecordField(FILENAME, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(PATH, RecordFieldType.STRING.getDataType(), false));
+        recordFields.add(new RecordField(IS_DIRECTORY, RecordFieldType.BOOLEAN.getDataType(), false));
+        recordFields.add(new RecordField(SIZE, RecordFieldType.LONG.getDataType(), false));
+        recordFields.add(new RecordField(LAST_MODIFIED, RecordFieldType.TIMESTAMP.getDataType(), false));
+        recordFields.add(new RecordField(PERMISSIONS, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(OWNER, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(GROUP, RecordFieldType.STRING.getDataType()));
+        recordFields.add(new RecordField(REPLICATION, RecordFieldType.INT.getDataType()));
+        recordFields.add(new RecordField(IS_SYM_LINK, RecordFieldType.BOOLEAN.getDataType()));
+        recordFields.add(new RecordField(IS_ENCRYPTED, RecordFieldType.BOOLEAN.getDataType()));
+        recordFields.add(new RecordField(IS_ERASURE_CODED, RecordFieldType.BOOLEAN.getDataType()));
+        RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
+    }
+
+
+    private final ProcessSession session;
+    private final RecordSetWriterFactory writerFactory;
+    private final ComponentLog logger;
+    private final FileStatusIterable fileStatuses;
+    final long minimumAge;
+    final long maximumAge;
+    final PathFilter pathFilter;
+    final FileStatusManager fileStatusManager;
+    final long latestModificationTime;
+    final List<String> latestModifiedStatuses;
+    long fileCount;
+
+
+    public RecordObjectWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory, final ComponentLog logger,
+                              final FileStatusIterable fileStatuses, final long minimumAge, final long maximumAge, final PathFilter pathFilter,
+                              final FileStatusManager fileStatusManager, final long latestModificationTime, final List<String> latestModifiedStatuses) {
+        this.session = session;
+        this.writerFactory = writerFactory;
+        this.logger = logger;
+        this.fileStatuses = fileStatuses;
+        this.minimumAge = minimumAge;
+        this.maximumAge = maximumAge;
+        this.pathFilter = pathFilter;
+        this.fileStatusManager = fileStatusManager;
+        this.latestModificationTime = latestModificationTime;
+        this.latestModifiedStatuses = latestModifiedStatuses;
+        fileCount = 0;
+    }
+
+    @Override
+    public void write() {
+        FlowFile flowFile = session.create();
+
+        final OutputStream out = session.write(flowFile);
+        try (RecordSetWriter recordWriter = writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile)) {
+            recordWriter.beginRecordSet();
+
+            for (FileStatus status : fileStatuses) {
+                if (determineListable(status, minimumAge, maximumAge, pathFilter, latestModificationTime, latestModifiedStatuses)) {
+                    recordWriter.write(createRecordForListing(status));
+                    fileStatusManager.update(status);
+                }
+            }
+
+            WriteResult writeResult = recordWriter.finishRecordSet();
+            fileCount = writeResult.getRecordCount();
+
+            if (fileCount == 0) {
+                session.remove(flowFile);
+            } else {
+                final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                flowFile = session.putAllAttributes(flowFile, attributes);
+
+                session.transfer(flowFile, REL_SUCCESS);
+            }
+        } catch (Exception e) {
+            throw new ProcessException("An error occured while writing results", e);

Review Comment:
   ```suggestion
               throw new ProcessException("An error occurred while writing results", e);
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "markap14 (via GitHub)" <gi...@apache.org>.
markap14 commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1227161300


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FilterMode.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.nifi.components.DescribedValue;
+
+import java.util.stream.Stream;
+
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILE_FILTER;
+import static org.apache.nifi.processors.hadoop.ListHDFS.RECURSE_SUBDIRS;
+
+public enum FilterMode implements DescribedValue {
+
+    FILTER_DIRECTORIES_AND_FILES(
+            "filter-mode-directories-and-files",
+            "Directories and Files",
+            "Filtering will be applied to the names of directories and files.  If " + RECURSE_SUBDIRS.getDisplayName()
+                    + " is set to true, only subdirectories with a matching name will be searched for files that match "
+                    + "the regular expression defined in " + FILE_FILTER.getDisplayName() + "."
+    ),
+    FILTER_MODE_FILES_ONLY(
+            "filter-mode-files-only",
+            "Files Only",
+            "Filtering will only be applied to the names of files.  If " + RECURSE_SUBDIRS.getDisplayName()
+                    + " is set to true, the entire subdirectory tree will be searched for files that match "
+                    + "the regular expression defined in " + FILE_FILTER.getDisplayName() + "."
+    ),
+
+    FILTER_MODE_FULL_PATH(
+            "filter-mode-full-path",
+            "Full Path",
+            "Filtering will be applied by evaluating the regular expression defined in " + FILE_FILTER.getDisplayName()
+                    + " against the full path of files with and without the scheme and authority.  If "
+                    + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+                    + "the file matches the regular expression defined in " + FILE_FILTER.getDisplayName() + ".  See 'Additional Details' for more information."
+    );
+
+    private final String value;
+    private final String displayName;
+    private final String description;
+
+    FilterMode(final String value, final String displayName, final String description) {
+        this.value = value;
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    public static FilterMode forName(String filterMode) {

Review Comment:
   Is there a reason we need this `forName` instead of just using `valueOf`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1253186098


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -286,392 +200,141 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
             problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
                     .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
+            resetState = true;
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestTimestamp = 0L;
+            latestFiles = new ArrayList<>();

Review Comment:
   Also I think it would help the reader understand better and easier the 3 possible scenarios if these were final and set explicitly in all 3 cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1253186098


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -286,392 +200,141 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
             problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
                     .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
+            resetState = true;
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestTimestamp = 0L;
+            latestFiles = new ArrayList<>();

Review Comment:
   Also I think it would help the reader understand better and easier the 3 possible scenarios if these were final and set explicitly in all 3 cases (legacy, non-legacy, first run).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1259589406


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml:
##########
@@ -154,6 +154,11 @@
             <groupId>org.glassfish.jaxb</groupId>
             <artifactId>jaxb-runtime</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <scope>compile</scope>

Review Comment:
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-mock-record-utils</artifactId>
               <scope>test</scope>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "Lehel44 (via GitHub)" <gi...@apache.org>.
Lehel44 commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1254222125


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -286,392 +200,141 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
             problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
                     .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
+            resetState = true;
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestTimestamp = 0L;
+            latestFiles = new ArrayList<>();

Review Comment:
   Thanks for the suggestion, I'll change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1199095603


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -452,243 +305,201 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         // Pull in any file that is newer than the timestamp that we have.
         final FileSystem hdfs = getFileSystem();
         final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
+        final PathFilter pathFilter = createPathFilter(context);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final Set<FileStatus> statuses;
+        final HdfsObjectWriter writer = getHdfsObjectWriter(session, writerFactory);
+
+        long listedFileCount = 0;
         try {
             final Path rootPath = getNormalizedPath(context, DIRECTORY);
-            statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
-            getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
-        } catch (final IOException | IllegalArgumentException e) {
-            getLogger().error("Failed to perform listing of HDFS", e);
-            return;
-        } catch (final InterruptedException e) {
-            Thread.currentThread().interrupt();
-            getLogger().error("Interrupted while performing listing of HDFS", e);
-            return;
-        }
-
-        final Set<FileStatus> listable = determineListable(statuses, context);
-        getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()});
-
-        // Create FlowFile(s) for the listing, if there are any
-        if (!listable.isEmpty()) {
-            if (context.getProperty(RECORD_WRITER).isSet()) {
-                try {
-                    createRecords(listable, context, session);
-                } catch (final IOException | SchemaNotFoundException e) {
-                    getLogger().error("Failed to write listing of HDFS", e);
-                    return;
+            final FileCountRemoteIterator<FileStatus> fileStatusIterator = getFileStatusIterator(rootPath, recursive, hdfs, pathFilter);
+
+            final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+            final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
+            final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+            final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
+
+            writer.beginListing();
+
+            FileStatus status;
+            while (fileStatusIterator.hasNext()) {
+                status = fileStatusIterator.next();
+                if (status != null && determineListable(status, minimumAge, maximumAge, isTransitioningFromLegacyState, isLegacyLastStatusListed)) {
+                    writer.addToListing(status);
+                    fileStatusManager.update(status);
+                    listedFileCount++;
                 }
-            } else {
-                createFlowFiles(listable, session);
             }
-        }
+            writer.finishListing();
 
-        for (final FileStatus status : listable) {
-            final long fileModTime = status.getModificationTime();
-            if (fileModTime > latestTimestampEmitted) {
-                latestTimestampEmitted = fileModTime;
-            }
+            long totalFileCount = fileStatusIterator.getFileCount();
+            getLogger().debug("Found a total of {} files in HDFS, {} are listed", totalFileCount, listedFileCount);
+        } catch (final IOException | IllegalArgumentException | SchemaNotFoundException e) {
+            getLogger().error("Failed to perform listing of HDFS", e);
+            writer.finishListingExceptionally(e);
+            return;
         }
 
-        final Map<String, String> updatedState = new HashMap<>(1);
-        updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(latestTimestampListed));
-        updatedState.put(EMITTED_TIMESTAMP_KEY, String.valueOf(latestTimestampEmitted));
-        getLogger().debug("New state map: {}", new Object[] {updatedState});
+        if (listedFileCount > 0) {
+            fileStatusManager.finishIteration();
+            final Map<String, String> updatedState = new HashMap<>(1);
+            updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(fileStatusManager.getLastModificationTime()));
+            getLogger().debug("New state map: {}", updatedState);
+            updateState(session, updatedState);
 
-        try {
-            session.setState(updatedState, Scope.CLUSTER);
-        } catch (final IOException ioe) {
-            getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", ioe);
-        }
-
-        final int listCount = listable.size();
-        if ( listCount > 0 ) {
-            getLogger().info("Successfully created listing with {} new files from HDFS", new Object[] {listCount});
+            getLogger().info("Successfully created listing with {} new files from HDFS", listedFileCount);
             session.commitAsync();
         } else {
             getLogger().debug("There is no data to list. Yielding.");
             context.yield();
         }
     }
 
-    private void createFlowFiles(final Set<FileStatus> fileStatuses, final ProcessSession session) {
-        for (final FileStatus status : fileStatuses) {
-            final Map<String, String> attributes = createAttributes(status);
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            session.transfer(flowFile, getSuccessRelationship());
+    private HdfsObjectWriter getHdfsObjectWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory) {
+        final HdfsObjectWriter writer;
+        if (writerFactory == null) {
+            writer = new FlowFileObjectWriter(session);
+        } else {
+            writer = new RecordObjectWriter(session, writerFactory, getLogger());
         }
+        return writer;
     }
 
-    private void createRecords(final Set<FileStatus> fileStatuses, final ProcessContext context, final ProcessSession session) throws IOException, SchemaNotFoundException {
-        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
-        FlowFile flowFile = session.create();
-        final WriteResult writeResult;
-        try (final OutputStream out = session.write(flowFile);
-             final RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), getRecordSchema(), out, Collections.emptyMap())) {
+    private boolean notEnoughTimeElapsedToRun(final ProcessContext context) {
+        final long now = System.nanoTime();
+        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
+            context.yield();
+            return true;
+        }
+        lastRunTimestamp = now;
+        return false;
+    }
 
-            recordSetWriter.beginRecordSet();
-            for (final FileStatus fileStatus : fileStatuses) {
-                final Record record = createRecord(fileStatus);
-                recordSetWriter.write(record);
+    private boolean determineListable(final FileStatus status, final long minimumAge, final long maximumAge, final boolean isTransitioningFromLegacyState, final boolean isLegacyLastStatusListed) {
+        // If the file was created during the processor's last iteration we have to check if it was already listed
+        // If legacy state was used and the file was already listed once, we don't want to list it once again.
+        if (status.getModificationTime() == fileStatusManager.getLastModificationTime()) {
+            if (isTransitioningFromLegacyState) {
+                return !isLegacyLastStatusListed;
             }
-
-            writeResult = recordSetWriter.finishRecordSet();
+            return !fileStatusManager.getLastModifiedStatuses().contains(status);
         }
 
-        final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
-        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
-        flowFile = session.putAllAttributes(flowFile, attributes);
+        final long fileAge = System.currentTimeMillis() - status.getModificationTime();
+        if (minimumAge > fileAge || fileAge > maximumAge) {
+            return false;
+        }
 
-        session.transfer(flowFile, getSuccessRelationship());
+        return status.getModificationTime() > fileStatusManager.getLastModificationTime();
     }
 
-    private Record createRecord(final FileStatus fileStatus) {
-        final Map<String, Object> values = new HashMap<>();
-        values.put(FILENAME, fileStatus.getPath().getName());
-        values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
-        values.put(OWNER, fileStatus.getOwner());
-        values.put(GROUP, fileStatus.getGroup());
-        values.put(LAST_MODIFIED, new Timestamp(fileStatus.getModificationTime()));
-        values.put(SIZE, fileStatus.getLen());
-        values.put(REPLICATION, fileStatus.getReplication());
-
-        final FsPermission permission = fileStatus.getPermission();
-        final String perms = getPerms(permission.getUserAction()) + getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
-        values.put(PERMISSIONS, perms);
-
-        values.put(IS_DIRECTORY, fileStatus.isDirectory());
-        values.put(IS_SYM_LINK, fileStatus.isSymlink());
-        values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
-        values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
-
-        return new MapRecord(getRecordSchema(), values);
-    }
+    private FileCountRemoteIterator<FileStatus> getFileStatusIterator(final Path path, final boolean recursive, final FileSystem hdfs, final PathFilter filter) {
+        final Deque<Path> pathStack = new ArrayDeque<>();
+        pathStack.push(path);
 
-    private RecordSchema getRecordSchema() {
-        return RECORD_SCHEMA;
-    }
+        return new FileCountRemoteIterator<>() {

Review Comment:
   This iterator could be simplified if the stack was handled within and the filtering is not done here.
   (`ListHDFS` already does some of in the `determineListable` so all the filtering could be combined in one place.)
   Also probably better for it to implement `Iterable` instead of `RemoteIterator`.



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -452,243 +305,201 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         // Pull in any file that is newer than the timestamp that we have.
         final FileSystem hdfs = getFileSystem();
         final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
+        final PathFilter pathFilter = createPathFilter(context);
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final Set<FileStatus> statuses;
+        final HdfsObjectWriter writer = getHdfsObjectWriter(session, writerFactory);
+
+        long listedFileCount = 0;
         try {
             final Path rootPath = getNormalizedPath(context, DIRECTORY);
-            statuses = getStatuses(rootPath, recursive, hdfs, createPathFilter(context), fileFilterMode);
-            getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
-        } catch (final IOException | IllegalArgumentException e) {
-            getLogger().error("Failed to perform listing of HDFS", e);
-            return;
-        } catch (final InterruptedException e) {
-            Thread.currentThread().interrupt();
-            getLogger().error("Interrupted while performing listing of HDFS", e);
-            return;
-        }
-
-        final Set<FileStatus> listable = determineListable(statuses, context);
-        getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()});
-
-        // Create FlowFile(s) for the listing, if there are any
-        if (!listable.isEmpty()) {
-            if (context.getProperty(RECORD_WRITER).isSet()) {
-                try {
-                    createRecords(listable, context, session);
-                } catch (final IOException | SchemaNotFoundException e) {
-                    getLogger().error("Failed to write listing of HDFS", e);
-                    return;
+            final FileCountRemoteIterator<FileStatus> fileStatusIterator = getFileStatusIterator(rootPath, recursive, hdfs, pathFilter);
+
+            final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+            final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
+            final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+            final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
+
+            writer.beginListing();
+
+            FileStatus status;
+            while (fileStatusIterator.hasNext()) {
+                status = fileStatusIterator.next();
+                if (status != null && determineListable(status, minimumAge, maximumAge, isTransitioningFromLegacyState, isLegacyLastStatusListed)) {
+                    writer.addToListing(status);
+                    fileStatusManager.update(status);
+                    listedFileCount++;
                 }
-            } else {
-                createFlowFiles(listable, session);
             }
-        }
+            writer.finishListing();
 
-        for (final FileStatus status : listable) {
-            final long fileModTime = status.getModificationTime();
-            if (fileModTime > latestTimestampEmitted) {
-                latestTimestampEmitted = fileModTime;
-            }
+            long totalFileCount = fileStatusIterator.getFileCount();
+            getLogger().debug("Found a total of {} files in HDFS, {} are listed", totalFileCount, listedFileCount);
+        } catch (final IOException | IllegalArgumentException | SchemaNotFoundException e) {
+            getLogger().error("Failed to perform listing of HDFS", e);
+            writer.finishListingExceptionally(e);
+            return;

Review Comment:
   This relates to my previous comment about `HdfsObjectWriter` being too granular.
   When we have a `FlowFileObjectWriter` we don't remove the created flowfiles from the session  and neither do we rollback the session. So the flowfiles will be emitted.
   However the status won't be saved so we will emit the flowfiles for the same listed files again when the `onTrigger` runs next time. This leads to duplication of flowfiles.
   
   Consolidating the `HdfsObjectWriter` interface will probably solve this problem as well.



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/HdfsObjectWriter.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+
+import java.io.IOException;
+
+/**
+ * Interface for common management of writing to records and to flowfiles.
+ */
+public interface HdfsObjectWriter {

Review Comment:
   This interface has many methods that are only implemented by one of its two implementing classes. That is usually a sign that the separation of concerns is not adequate.
   In this case a single `write` method or something similar would be probably better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1226887367


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HdfsObjectWriter.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
+
+import java.util.List;
+
+/**
+ * Interface for common management of writing to records and to flowfiles.
+ */
+public interface HdfsObjectWriter {
+
+    void write();
+
+    long getListedFileCount();
+
+    default boolean determineListable(final FileStatus status, final long minimumAge, final long maximumAge, final PathFilter filter,
+                                      final long latestModificationTime, final List<String> latestModifiedStatuses) {
+        final boolean isCopyInProgress = status.getPath().getName().endsWith("_COPYING_");
+        final boolean isFilterAccepted = filter.accept(status.getPath());
+        if (isCopyInProgress || !isFilterAccepted) {
+            return false;
+        }
+        // If the file was created during the processor's last iteration we have to check if it was already listed
+        if (status.getModificationTime() == latestModificationTime) {
+            return !latestModifiedStatuses.contains(status.getPath().toString());
+        }
+
+        final long fileAge = System.currentTimeMillis() - status.getModificationTime();

Review Comment:
   I see the logic was copied from the original version but calling `System.currentTimeMillis()` multiple times (individually for each file) may lead to skipped files so it is also a bug which should be fixed.
   
   Example (min age = 5 sec):
   - T0: file1 is created
   - T0+1: file2 is created
   - T0+4999: file1 is listed but it is not old enough (4999 - 0 = 4999 ms), so it will be skipped in this iteration
   - T0+5001: file2 is listed and it is old enough (5001 - 1 = 5000 ms), so it passes the test and will be processed
   
   The problem is that file2 also sets the latest modification time to T0+1 so file1 will not be listed in the next iteration either.
   
   Using a "global" current time (determined before all listings) would solve this issue.
   In this example, if the base timestamp is T+4998, then both files are not old enough and will be processed in the next iteration only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1226922484


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -319,131 +240,36 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
             this.resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestModifiedStatuses = new ArrayList<>();
+            StateMap stateMap = session.getState(Scope.CLUSTER);
+            String latestListedTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
+            String latestFiles = stateMap.get(LATEST_FILES_KEY);
+
+            final String legacyLatestListingTimestampString = stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = Long.parseLong(legacyLatestEmittedTimestampString);
+                latestModificationTime = legacyLatestListingTimestamp == legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : legacyLatestListingTimestamp;
+            } else if (latestListedTimestampString != null) {
+                latestModificationTime = Long.parseLong(latestListedTimestampString);
+                latestModifiedStatuses = new ArrayList<>(Arrays.asList(latestFiles.split("\\s")));

Review Comment:
   It looks minor but it is hard to follow why `latestListedTimestampString` becomes `latestModificationTime` after a simple parsing.
   Could not we just call these variables: `latestTimestampString`, `latestTimestamp`, `latestFilesString`, `latestFiles`?
   It would be more consistent with the state property names too.
   I would apply these names in `FileStatusManager` too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] markap14 commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "markap14 (via GitHub)" <gi...@apache.org>.
markap14 commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1227160286


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FilterMode.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.nifi.components.DescribedValue;
+
+import java.util.stream.Stream;
+
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILE_FILTER;
+import static org.apache.nifi.processors.hadoop.ListHDFS.RECURSE_SUBDIRS;
+
+public enum FilterMode implements DescribedValue {
+
+    FILTER_DIRECTORIES_AND_FILES(
+            "filter-mode-directories-and-files",
+            "Directories and Files",
+            "Filtering will be applied to the names of directories and files.  If " + RECURSE_SUBDIRS.getDisplayName()
+                    + " is set to true, only subdirectories with a matching name will be searched for files that match "
+                    + "the regular expression defined in " + FILE_FILTER.getDisplayName() + "."
+    ),
+    FILTER_MODE_FILES_ONLY(
+            "filter-mode-files-only",
+            "Files Only",
+            "Filtering will only be applied to the names of files.  If " + RECURSE_SUBDIRS.getDisplayName()
+                    + " is set to true, the entire subdirectory tree will be searched for files that match "
+                    + "the regular expression defined in " + FILE_FILTER.getDisplayName() + "."
+    ),
+
+    FILTER_MODE_FULL_PATH(
+            "filter-mode-full-path",
+            "Full Path",
+            "Filtering will be applied by evaluating the regular expression defined in " + FILE_FILTER.getDisplayName()
+                    + " against the full path of files with and without the scheme and authority.  If "
+                    + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+                    + "the file matches the regular expression defined in " + FILE_FILTER.getDisplayName() + ".  See 'Additional Details' for more information."
+    );
+
+    private final String value;
+    private final String displayName;
+    private final String description;
+
+    FilterMode(final String value, final String displayName, final String description) {
+        this.value = value;
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    public static FilterMode forName(String filterMode) {
+        return Stream.of(values())
+                .filter(fm -> fm.getValue().equalsIgnoreCase(filterMode))
+                .findFirst()
+                .orElseThrow(
+                        () -> new IllegalArgumentException("Invalid SnowflakeInternalStageType: " + filterMode));

Review Comment:
   Seems to be a copy/paste error - SNowflakeInternalStageType??



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "Lehel44 (via GitHub)" <gi...@apache.org>.
Lehel44 commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1228820394


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FilterMode.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.nifi.components.DescribedValue;
+
+import java.util.stream.Stream;
+
+import static org.apache.nifi.processors.hadoop.ListHDFS.FILE_FILTER;
+import static org.apache.nifi.processors.hadoop.ListHDFS.RECURSE_SUBDIRS;
+
+public enum FilterMode implements DescribedValue {
+
+    FILTER_DIRECTORIES_AND_FILES(
+            "filter-mode-directories-and-files",
+            "Directories and Files",
+            "Filtering will be applied to the names of directories and files.  If " + RECURSE_SUBDIRS.getDisplayName()
+                    + " is set to true, only subdirectories with a matching name will be searched for files that match "
+                    + "the regular expression defined in " + FILE_FILTER.getDisplayName() + "."
+    ),
+    FILTER_MODE_FILES_ONLY(
+            "filter-mode-files-only",
+            "Files Only",
+            "Filtering will only be applied to the names of files.  If " + RECURSE_SUBDIRS.getDisplayName()
+                    + " is set to true, the entire subdirectory tree will be searched for files that match "
+                    + "the regular expression defined in " + FILE_FILTER.getDisplayName() + "."
+    ),
+
+    FILTER_MODE_FULL_PATH(
+            "filter-mode-full-path",
+            "Full Path",
+            "Filtering will be applied by evaluating the regular expression defined in " + FILE_FILTER.getDisplayName()
+                    + " against the full path of files with and without the scheme and authority.  If "
+                    + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the entire subdirectory tree will be searched for files in which the full path of "
+                    + "the file matches the regular expression defined in " + FILE_FILTER.getDisplayName() + ".  See 'Additional Details' for more information."
+    );
+
+    private final String value;
+    private final String displayName;
+    private final String description;
+
+    FilterMode(final String value, final String displayName, final String description) {
+        this.value = value;
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    public static FilterMode forName(String filterMode) {

Review Comment:
   Using `valueOf` the name must match exactly the identifier used to declare an enum constant. In createPathFilter
   final FilterMode filterMode = FilterMode.forName(context.getProperty(FILE_FILTER_MODE).getValue());
   we get the property value from the context which is the value member of the enum and different from its name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1259588150


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml:
##########
@@ -154,6 +154,10 @@
             <groupId>org.glassfish.jaxb</groupId>
             <artifactId>jaxb-runtime</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+        </dependency>

Review Comment:
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-mock-record-utils</artifactId>
               <scope>compile</scope>
           </dependency>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "turcsanyip (via GitHub)" <gi...@apache.org>.
turcsanyip commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1252713068


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FileStatusIterable implements Iterable<FileStatus> {
+
+    private final Path path;
+    private final boolean recursive;
+    private final FileSystem hdfs;

Review Comment:
   The underlying `FileSystem` is not necessarily HDFS but can be a cloud storage.
   I would rename it to `fs` or `fileSystem`.



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HdfsObjectWriter.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+
+import java.util.List;
+
+/**
+ * Interface for common management of writing to records and to flowfiles.
+ */
+public abstract class HdfsObjectWriter {
+
+    protected final ProcessSession session;
+    protected final FileStatusIterable fileStatusIterable;
+    final long minimumAge;
+    final long maximumAge;
+    final PathFilter pathFilter;
+    final FileStatusManager fileStatusManager;
+    final long latestModificationTime;
+    final List<String> latestModifiedStatuses;
+    final long currentTimeMillis;
+    long fileCount;
+
+
+    HdfsObjectWriter(ProcessSession session, FileStatusIterable fileStatusIterable, long minimumAge, long maximumAge, PathFilter pathFilter,
+                     FileStatusManager fileStatusManager, long latestModificationTime, List<String> latestModifiedStatuses) {
+        this.session = session;
+        this.fileStatusIterable = fileStatusIterable;
+        this.minimumAge = minimumAge;
+        this.maximumAge = maximumAge;
+        this.pathFilter = pathFilter;
+        this.fileStatusManager = fileStatusManager;
+        this.latestModificationTime = latestModificationTime;
+        this.latestModifiedStatuses = latestModifiedStatuses;
+        currentTimeMillis = System.currentTimeMillis();
+        fileCount = 0L;
+    }
+
+    public abstract void write();
+
+    public long getListedFileCount() {
+        return fileCount;
+    }
+
+    boolean determineListable(final FileStatus status, final long minimumAge, final long maximumAge, final PathFilter filter,

Review Comment:
   Please use `protected` visibility for members to be accessed by subclasses (even if they are in the same package).
   Also the fields above.



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FileStatusIterable implements Iterable<FileStatus> {
+
+    private final Path path;
+    private final boolean recursive;
+    private final FileSystem hdfs;
+    private final UserGroupInformation userGroupInformation;
+    private final AtomicLong totalFileCount = new AtomicLong();
+
+    public FileStatusIterable(final Path path, final boolean recursive, final FileSystem hdfs, final UserGroupInformation userGroupInformation) {
+        this.path = path;
+        this.recursive = recursive;
+        this.hdfs = hdfs;
+        this.userGroupInformation = userGroupInformation;
+    }
+
+    @Override
+    public Iterator<FileStatus> iterator() {
+        return new FileStatusIterator();
+    }
+
+    public long getTotalFileCount() {
+        return totalFileCount.get();
+    }
+
+    class FileStatusIterator implements Iterator<FileStatus> {
+
+        private static final String IO_ERROR_MESSAGE = "IO error occurred while iterating HFDS";
+
+        private final Deque<Path> dirStatuses;
+
+        private FileStatus nextFileStatus;
+        private RemoteIterator<FileStatus> hdfsIterator;

Review Comment:
   The underlying `FileSystem` is not necessarily HDFS but can be a cloud storage.
   I would rename it to `remoteIterator`.



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -286,392 +200,141 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
             problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
                     .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
+            resetState = true;
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestTimestamp = 0L;
+            latestFiles = new ArrayList<>();
+            StateMap stateMap = session.getState(Scope.CLUSTER);
+            String latestTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
+
+            final String legacyLatestListingTimestampString = stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = Long.parseLong(legacyLatestEmittedTimestampString);
+                latestTimestamp = legacyLatestListingTimestamp == legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : legacyLatestListingTimestamp;
+                getLogger().debug("Transitioned from legacy state to new state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp': {}'," +
+                        "'latestTimestamp': {}", legacyLatestListingTimestamp, legacyLatestEmittedTimestamp, latestTimestamp);
+            } else if (latestTimestampString != null) {
+                latestTimestamp = Long.parseLong(latestTimestampString);
+                this.latestFiles = stateMap.toMap().entrySet().stream()
+                        .filter(entry -> entry.getKey().startsWith("latest.file"))
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
             }
-        } catch (final IOException ioe) {
+        } catch (IOException e) {
             getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
             context.yield();
             return;
         }
 
         // Pull in any file that is newer than the timestamp that we have.
-        final FileSystem hdfs = getFileSystem();
-        final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
+        try (final FileSystem hdfs = getFileSystem()) {
+            final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+            final PathFilter pathFilter = createPathFilter(context);
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final Set<FileStatus> statuses;
-        try {
+            final FileStatusManager fileStatusManager = new FileStatusManager();

Review Comment:
   It may happen that a new file arrives with the same timestamp as the latest one(s) from the previous run and there are no other newer files in this round (so no new timestamp). In this case, the file should be appended to the others already in the state having the same latest timestamp.
   So I believe `FileStatusManager` should be initialized with `latestTimestamp` and `latestFiles` from `onTrigger()` instead of 0 / empty list in its default constructor.
   In this case, it would also be enough to pass `FileStatusManager` to the object writers and they can retrieve the previous latest values from it in the constructor.



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HdfsObjectWriter.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+
+import java.util.List;
+
+/**
+ * Interface for common management of writing to records and to flowfiles.
+ */
+public abstract class HdfsObjectWriter {
+
+    protected final ProcessSession session;
+    protected final FileStatusIterable fileStatusIterable;
+    final long minimumAge;
+    final long maximumAge;
+    final PathFilter pathFilter;
+    final FileStatusManager fileStatusManager;
+    final long latestModificationTime;
+    final List<String> latestModifiedStatuses;
+    final long currentTimeMillis;
+    long fileCount;
+
+
+    HdfsObjectWriter(ProcessSession session, FileStatusIterable fileStatusIterable, long minimumAge, long maximumAge, PathFilter pathFilter,
+                     FileStatusManager fileStatusManager, long latestModificationTime, List<String> latestModifiedStatuses) {
+        this.session = session;
+        this.fileStatusIterable = fileStatusIterable;
+        this.minimumAge = minimumAge;
+        this.maximumAge = maximumAge;
+        this.pathFilter = pathFilter;
+        this.fileStatusManager = fileStatusManager;
+        this.latestModificationTime = latestModificationTime;
+        this.latestModifiedStatuses = latestModifiedStatuses;
+        currentTimeMillis = System.currentTimeMillis();
+        fileCount = 0L;
+    }
+
+    public abstract void write();
+
+    public long getListedFileCount() {
+        return fileCount;
+    }
+
+    boolean determineListable(final FileStatus status, final long minimumAge, final long maximumAge, final PathFilter filter,

Review Comment:
   Why are `minimumAge`, `maximumAge`, etc. parameters?
   Those fields could directly be accessed in the method.



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FileStatusIterable implements Iterable<FileStatus> {
+
+    private final Path path;
+    private final boolean recursive;
+    private final FileSystem hdfs;
+    private final UserGroupInformation userGroupInformation;
+    private final AtomicLong totalFileCount = new AtomicLong();
+
+    public FileStatusIterable(final Path path, final boolean recursive, final FileSystem hdfs, final UserGroupInformation userGroupInformation) {
+        this.path = path;
+        this.recursive = recursive;
+        this.hdfs = hdfs;
+        this.userGroupInformation = userGroupInformation;
+    }
+
+    @Override
+    public Iterator<FileStatus> iterator() {
+        return new FileStatusIterator();
+    }
+
+    public long getTotalFileCount() {
+        return totalFileCount.get();
+    }
+
+    class FileStatusIterator implements Iterator<FileStatus> {
+
+        private static final String IO_ERROR_MESSAGE = "IO error occurred while iterating HFDS";
+
+        private final Deque<Path> dirStatuses;

Review Comment:
   ```suggestion
           private final Deque<Path> dirPaths;
   ```



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HdfsObjectWriter.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+
+import java.util.List;
+
+/**
+ * Interface for common management of writing to records and to flowfiles.
+ */
+public abstract class HdfsObjectWriter {

Review Comment:
   `Hdfs` is not the best name here either and `Object` is too generic. Maybe `HadoopFileStatusWriter`?



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HdfsObjectWriter.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+
+import java.util.List;
+
+/**
+ * Interface for common management of writing to records and to flowfiles.
+ */
+public abstract class HdfsObjectWriter {
+
+    protected final ProcessSession session;
+    protected final FileStatusIterable fileStatusIterable;
+    final long minimumAge;
+    final long maximumAge;
+    final PathFilter pathFilter;
+    final FileStatusManager fileStatusManager;
+    final long latestModificationTime;
+    final List<String> latestModifiedStatuses;

Review Comment:
   `latestModifiedStatuses` does not contain `FileStatus`-es but paths to files so the suffix is incorrect.
   
   Also, I would use the same terminology as in `ListHDFS` and `FileStatusManager`: `latestTimestamp`, `latestFiles`.
   
   The difference what should be emphasized here in my opinion is that the fields contain the latest values from the _previous_ run and FileStatusManager collects the _new_ latest values.
   For this reason, the suggested names:
   - `previousLatestTimestamp`
   - `previousLatestFiles`



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FileStatusIterable implements Iterable<FileStatus> {
+
+    private final Path path;
+    private final boolean recursive;
+    private final FileSystem hdfs;
+    private final UserGroupInformation userGroupInformation;
+    private final AtomicLong totalFileCount = new AtomicLong();

Review Comment:
   I don't think `FileStatusIterable` can be used concurrently so a simple `long` would be enough too.



##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -286,392 +200,141 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
             problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
                     .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
+            resetState = true;
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestTimestamp = 0L;
+            latestFiles = new ArrayList<>();

Review Comment:
   These fields should be local variables because the state is preserved unnecessarily as it gets overridden at the beginning of the next `onTrigger()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1253211213


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -286,392 +200,141 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
             problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
                     .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
+            resetState = true;
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
-        final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) {
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 'old' format
-                final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, latestTimestampListed});
-                }
+            latestTimestamp = 0L;
+            latestFiles = new ArrayList<>();
+            StateMap stateMap = session.getState(Scope.CLUSTER);
+            String latestTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
+
+            final String legacyLatestListingTimestampString = stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = Long.parseLong(legacyLatestEmittedTimestampString);
+                latestTimestamp = legacyLatestListingTimestamp == legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : legacyLatestListingTimestamp;
+                getLogger().debug("Transitioned from legacy state to new state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp': {}'," +
+                        "'latestTimestamp': {}", legacyLatestListingTimestamp, legacyLatestEmittedTimestamp, latestTimestamp);
+            } else if (latestTimestampString != null) {
+                latestTimestamp = Long.parseLong(latestTimestampString);
+                this.latestFiles = stateMap.toMap().entrySet().stream()
+                        .filter(entry -> entry.getKey().startsWith("latest.file"))
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
             }
-        } catch (final IOException ioe) {
+        } catch (IOException e) {
             getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
             context.yield();
             return;
         }
 
         // Pull in any file that is newer than the timestamp that we have.
-        final FileSystem hdfs = getFileSystem();
-        final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = context.getProperty(FILE_FILTER_MODE).getValue();
+        try (final FileSystem hdfs = getFileSystem()) {
+            final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
+            final PathFilter pathFilter = createPathFilter(context);
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final Set<FileStatus> statuses;
-        try {
+            final FileStatusManager fileStatusManager = new FileStatusManager();

Review Comment:
   Also a heads up when passing the file list. In the non-legacy case it is created like this:
   ```
   latestFiles = stateMap.toMap().entrySet().stream()
                           .filter(entry -> entry.getKey().startsWith("latest.file"))
                           .map(Map.Entry::getValue)
                           .collect(Collectors.toList());
   ```
   Although this is usually a mutable list the specification doesn't guarantee it. So the `FileStatusManger` constructor should create an instance of it's own, like this for example:
   ```
       public FileStatusManager(final long initialLatestTimestamp, final List<String> initialLatestFiles) {
           this.latestTimestamp = initialLatestTimestamp;
           this.latestFiles = new ArrayList<>(initialLatestFiles);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #7240: NIFI-11178: Improve ListHDFS performance, incremental loading refactor.

Posted by "Lehel44 (via GitHub)" <gi...@apache.org>.
Lehel44 commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1253703892


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/FileStatusIterable.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FileStatusIterable implements Iterable<FileStatus> {
+
+    private final Path path;
+    private final boolean recursive;
+    private final FileSystem hdfs;
+    private final UserGroupInformation userGroupInformation;
+    private final AtomicLong totalFileCount = new AtomicLong();
+
+    public FileStatusIterable(final Path path, final boolean recursive, final FileSystem hdfs, final UserGroupInformation userGroupInformation) {
+        this.path = path;
+        this.recursive = recursive;
+        this.hdfs = hdfs;
+        this.userGroupInformation = userGroupInformation;
+    }
+
+    @Override
+    public Iterator<FileStatus> iterator() {
+        return new FileStatusIterator();
+    }
+
+    public long getTotalFileCount() {
+        return totalFileCount.get();
+    }
+
+    class FileStatusIterator implements Iterator<FileStatus> {
+
+        private static final String IO_ERROR_MESSAGE = "IO error occurred while iterating HFDS";
+
+        private final Deque<Path> dirStatuses;
+
+        private FileStatus nextFileStatus;
+        private RemoteIterator<FileStatus> hdfsIterator;

Review Comment:
   Thanks for the suggestion. The outer custom iterator class is also called FileStatusIterator, I'd avoid the "hiding field" code smell, remoteIterator works well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org