You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/08/28 15:32:00 UTC

[1/2] nifi git commit: NIFI-4069: Make ListXXX work with timestamp precision in seconds or minutes

Repository: nifi
Updated Branches:
  refs/heads/master a4e729c7a -> e68ff153e


NIFI-4069: Make ListXXX work with timestamp precision in seconds or minutes

- Refactored variable names to better represents what those are meant for.
- Added deterministic logic which detects target filesystem timestamp precision and adjust lag time based on it.
- Changed from using System.nanoTime() to System.currentTimeMillis in test because Java File API reports timestamp in milliseconds at the best granularity. Also, System.nanoTime should not be used in mix with epoch milliseconds because it uses arbitrary origin and measured differently.
- Changed TestListFile to use more longer interval between file timestamps those are used by testFilterAge to provide more consistent test result because sleep time can be longer with filesystems whose timestamp in seconds precision.
- Added logging at TestListFile.
- Added TestWatcher to dump state in case assertion fails for further investigation.
- Added Timestamp Precision property so that user can set if auto-detect is not enough
- Adjust timestamps for ages test

This closes #1915.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/28ee7022
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/28ee7022
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/28ee7022

Branch: refs/heads/master
Commit: 28ee70222b892fb799f5f74a31a9de678d9fb629
Parents: a4e729c
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Jun 14 15:21:01 2017 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Aug 28 11:31:03 2017 -0400

----------------------------------------------------------------------
 .../nifi-processor-utils/pom.xml                |   2 +-
 .../util/list/AbstractListProcessor.java        | 161 +++++++++++-----
 .../util/list/ListProcessorTestWatcher.java     | 128 +++++++++++++
 .../processor/util/list/ListableEntity.java     |   2 +-
 .../util/list/TestAbstractListProcessor.java    | 190 +++++++++++++------
 .../nifi/processors/standard/ListFTP.java       |   1 +
 .../nifi/processors/standard/ListFile.java      |   1 +
 .../nifi/processors/standard/ListSFTP.java      |   1 +
 .../nifi/processors/standard/TestListFile.java  | 107 ++++++++---
 9 files changed, 463 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
index 3f5e60c..dd38e10 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
@@ -70,7 +70,7 @@
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <scope>test</scope>
+            <scope>compile</scope>
         </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 2666e2c..8d93a65 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -37,6 +38,7 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
@@ -138,14 +140,42 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         .identifiesControllerService(DistributedMapCacheClient.class)
         .build();
 
+    public static final AllowableValue PRECISION_AUTO_DETECT = new AllowableValue("auto-detect", "Auto Detect",
+    "Automatically detect time unit deterministically based on candidate entries timestamp."
+            + " Please note that this option may take longer to list entities unnecessarily, if none of entries has a precise precision timestamp."
+            + " E.g. even if a target system supports millis, if all entries only have timestamps without millis, such as '2017-06-16 09:06:34.000', then its precision is determined as 'seconds'.");
+    public static final AllowableValue PRECISION_MILLIS = new AllowableValue("millis", "Milliseconds",
+            "This option provides the minimum latency for an entry from being available to being listed if target system supports millis, if not, use other options.");
+    public static final AllowableValue PRECISION_SECONDS = new AllowableValue("seconds", "Seconds","For a target system that does not have millis precision, but has in seconds.");
+    public static final AllowableValue PRECISION_MINUTES = new AllowableValue("minutes", "Minutes", "For a target system that only supports precision in minutes.");
+
+    public static final PropertyDescriptor TARGET_SYSTEM_TIMESTAMP_PRECISION = new PropertyDescriptor.Builder()
+        .name("target-system-timestamp-precision")
+        .displayName("Target System Timestamp Precision")
+        .description("Specify timestamp precision at the target system."
+                + " Since this processor uses timestamp of entities to decide which should be listed, it is crucial to use the right timestamp precision.")
+        .required(true)
+        .allowableValues(PRECISION_AUTO_DETECT, PRECISION_MILLIS, PRECISION_SECONDS, PRECISION_MINUTES)
+        .defaultValue(PRECISION_AUTO_DETECT.getValue())
+        .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
         .description("All FlowFiles that are received are routed to success")
         .build();
 
-    private volatile Long lastListingTime = null;
-    private volatile Long lastProcessedTime = 0L;
-    private volatile Long lastRunTime = 0L;
+    /**
+     * Represents the timestamp of an entity which was the latest one within those listed at the previous cycle.
+     * It does not necessary mean it has been processed as well.
+     * Whether it was processed or not depends on target system time precision and how old the entity timestamp was.
+     */
+    private volatile Long lastListedLatestEntryTimestampMillis = null;
+    /**
+     * Represents the timestamp of an entity which was the latest one
+     * within those picked up and written to the output relationship at the previous cycle.
+     */
+    private volatile Long lastProcessedLatestEntryTimestampMillis = 0L;
+    private volatile Long lastRunTimeNanos = 0L;
     private volatile boolean justElectedPrimaryNode = false;
     private volatile boolean resetState = false;
 
@@ -154,9 +184,16 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
      * files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled
      * near instantaneously after the prior iteration effectively voiding the built in buffer
      */
-    public static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
-    static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
-    static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp";
+    public static final Map<TimeUnit, Long> LISTING_LAG_MILLIS;
+    static {
+        final Map<TimeUnit, Long> nanos = new HashMap<>();
+        nanos.put(TimeUnit.MILLISECONDS, 100L);
+        nanos.put(TimeUnit.SECONDS, 1_000L);
+        nanos.put(TimeUnit.MINUTES, 60_000L);
+        LISTING_LAG_MILLIS = Collections.unmodifiableMap(nanos);
+    }
+    static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
+    static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
 
     public File getPersistenceFile() {
         return new File("conf/state/" + getIdentifier());
@@ -166,6 +203,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(DISTRIBUTED_CACHE_SERVICE);
+        properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
         return properties;
     }
 
@@ -208,7 +246,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         }
 
         // When scheduled to run, check if the associated timestamp is null, signifying a clearing of state and reset the internal timestamp
-        if (lastListingTime != null && stateMap.get(LISTING_TIMESTAMP_KEY) == null) {
+        if (lastListedLatestEntryTimestampMillis != null && stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY) == null) {
             getLogger().info("Detected that state was cleared for this component.  Resetting internal values.");
             resetTimeStates();
         }
@@ -283,10 +321,12 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         }
     }
 
-    private void persist(final long listingTimestamp, final long processedTimestamp, final StateManager stateManager, final Scope scope) throws IOException {
+    private void persist(final long latestListedEntryTimestampThisCycleMillis,
+                         final long lastProcessedLatestEntryTimestampMillis,
+                         final StateManager stateManager, final Scope scope) throws IOException {
         final Map<String, String> updatedState = new HashMap<>(1);
-        updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(listingTimestamp));
-        updatedState.put(PROCESSED_TIMESTAMP_KEY, String.valueOf(processedTimestamp));
+        updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
+        updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
         stateManager.setState(updatedState, scope);
     }
 
@@ -303,26 +343,26 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        Long minTimestamp = lastListingTime;
+        Long minTimestampToListMillis = lastListedLatestEntryTimestampMillis;
 
-        if (this.lastListingTime == null || this.lastProcessedTime == null || justElectedPrimaryNode) {
+        if (this.lastListedLatestEntryTimestampMillis == null || this.lastProcessedLatestEntryTimestampMillis == null || justElectedPrimaryNode) {
             try {
                 // Attempt to retrieve state from the state manager if a last listing was not yet established or
                 // if just elected the primary node
                 final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
-                final String listingTimestampString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                final String lastProcessedString= stateMap.get(PROCESSED_TIMESTAMP_KEY);
-                if (lastProcessedString != null) {
-                    this.lastProcessedTime = Long.parseLong(lastProcessedString);
+                final String latestListedEntryTimestampString = stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY);
+                final String lastProcessedLatestEntryTimestampString= stateMap.get(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY);
+                if (lastProcessedLatestEntryTimestampString != null) {
+                    this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(lastProcessedLatestEntryTimestampString);
                 }
-                if (listingTimestampString != null) {
-                    minTimestamp = Long.parseLong(listingTimestampString);
+                if (latestListedEntryTimestampString != null) {
+                    minTimestampToListMillis = Long.parseLong(latestListedEntryTimestampString);
                     // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
-                    if (minTimestamp == this.lastListingTime) {
+                    if (minTimestampToListMillis == this.lastListedLatestEntryTimestampMillis) {
                         context.yield();
                         return;
                     } else {
-                        this.lastListingTime = minTimestamp;
+                        this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
                     }
                 }
                 justElectedPrimaryNode = false;
@@ -334,10 +374,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         }
 
         final List<T> entityList;
-        final long currentListingTimestamp = System.nanoTime();
+        final long currentRunTimeNanos = System.nanoTime();
+        final long currentRunTimeMillis = System.currentTimeMillis();
         try {
             // track of when this last executed for consideration of the lag nanos
-            entityList = performListing(context, minTimestamp);
+            entityList = performListing(context, minTimestampToListMillis);
         } catch (final IOException e) {
             getLogger().error("Failed to perform listing on remote host due to {}", e);
             context.yield();
@@ -349,14 +390,22 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
             return;
         }
 
-        Long latestListingTimestamp = null;
+        Long latestListedEntryTimestampThisCycleMillis = null;
         final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
 
         // Build a sorted map to determine the latest possible entries
+        boolean targetSystemHasMilliseconds = false;
+        boolean targetSystemHasSeconds = false;
         for (final T entity : entityList) {
-            final long entityTimestamp = entity.getTimestamp();
+            final long entityTimestampMillis = entity.getTimestamp();
+            if (!targetSystemHasMilliseconds) {
+                targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0;
+            }
+            if (!targetSystemHasSeconds) {
+                targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
+            }
             // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = minTimestamp == null || entityTimestamp >= minTimestamp && entityTimestamp > lastProcessedTime;
+            final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis > lastProcessedLatestEntryTimestampMillis;
 
             if (newEntry) {
                 List<T> entitiesForTimestamp = orderedEntries.get(entity.getTimestamp());
@@ -371,25 +420,43 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         int flowfilesCreated = 0;
 
         if (orderedEntries.size() > 0) {
-            latestListingTimestamp = orderedEntries.lastKey();
+            latestListedEntryTimestampThisCycleMillis = orderedEntries.lastKey();
+
+            // Determine target system time precision.
+            final String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
+            final TimeUnit targetSystemTimePrecision
+                    = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
+                        ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
+                    : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS
+                    : PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES;
+            final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
 
             // If the last listing time is equal to the newest entries previously seen,
             // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp.equals(lastListingTime)) {
-                /* We are done when either:
-                 *   - the latest listing timestamp is If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run
-                 *   - the latest listing timestamp is equal to the last processed time, meaning we handled those items originally passed over
+            if (latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis)) {
+                /* We need to wait for another cycle when either:
+                 *   - If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run
+                 *   - The latest listed entity timestamp is equal to the last processed time, meaning we handled those items originally passed over. No need to process it again.
                  */
-                if (System.nanoTime() - lastRunTime < LISTING_LAG_NANOS || latestListingTimestamp.equals(lastProcessedTime)) {
+                final long  listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
+                if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos || latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)) {
                     context.yield();
                     return;
                 }
 
-            } else if (latestListingTimestamp >= currentListingTimestamp - LISTING_LAG_NANOS) {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
+            } else {
+                // Convert minimum reliable timestamp into target system time unit, in order to truncate unreliable digits.
+                final long minimumReliableTimestampInFilesystemTimeUnit = targetSystemTimePrecision.convert(currentRunTimeMillis - listingLagMillis, TimeUnit.MILLISECONDS);
+                final long minimumReliableTimestampMillis = targetSystemTimePrecision.toMillis(minimumReliableTimestampInFilesystemTimeUnit);
+                // If the latest listed entity is not old enough, compared with the minimum timestamp, then wait for another cycle.
+                // The minimum timestamp should be reliable to determine that no further entries will be added with the same timestamp based on the target system time precision.
+                if (minimumReliableTimestampMillis < latestListedEntryTimestampThisCycleMillis) {
+                    // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
+                    orderedEntries.remove(latestListedEntryTimestampThisCycleMillis);
+                }
             }
 
+
             for (List<T> timestampEntities : orderedEntries.values()) {
                 for (T entity : timestampEntities) {
                     // Create the FlowFile for this path.
@@ -403,18 +470,20 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         }
 
         // As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
-        if (latestListingTimestamp != null) {
+        if (latestListedEntryTimestampThisCycleMillis != null) {
             boolean processedNewFiles = flowfilesCreated > 0;
             if (processedNewFiles) {
-                // If there have been files created, update the last timestamp we processed
-                lastProcessedTime = orderedEntries.lastKey();
+                // If there have been files created, update the last timestamp we processed.
+                // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
+                // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
+                lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
                 getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated});
                 session.commit();
             }
 
-            lastRunTime = System.nanoTime();
+            lastRunTimeNanos = currentRunTimeNanos;
 
-            if (!latestListingTimestamp.equals(lastListingTime) || processedNewFiles) {
+            if (!latestListedEntryTimestampThisCycleMillis.equals(lastListedLatestEntryTimestampMillis) || processedNewFiles) {
                 // We have performed a listing and pushed any FlowFiles out that may have been generated
                 // Now, we need to persist state about the Last Modified timestamp of the newest file
                 // that we evaluated. We do this in order to avoid pulling in the same file twice.
@@ -424,8 +493,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
                 // We also store the state locally so that if the node is restarted, and the node cannot contact
                 // the distributed state cache, the node can continue to run (if it is primary node).
                 try {
-                    lastListingTime = latestListingTimestamp;
-                    persist(latestListingTimestamp, lastProcessedTime, context.getStateManager(), getStateScope(context));
+                    lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
+                    persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, context.getStateManager(), getStateScope(context));
                 } catch (final IOException ioe) {
                     getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
                         + "if another node begins executing this Processor, data duplication may occur.", ioe);
@@ -437,8 +506,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
             context.yield();
 
             // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
-            if (lastListingTime == null) {
-                lastListingTime = 0L;
+            if (lastListedLatestEntryTimestampMillis == null) {
+                lastListedLatestEntryTimestampMillis = 0L;
             }
 
             return;
@@ -446,9 +515,9 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
     }
 
     private void resetTimeStates() {
-        lastListingTime = null;
-        lastProcessedTime = 0L;
-        lastRunTime = 0L;
+        lastListedLatestEntryTimestampMillis = null;
+        lastProcessedLatestEntryTimestampMillis = 0L;
+        lastRunTimeNanos = 0L;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java
new file mode 100644
index 0000000..dcb53e0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListProcessorTestWatcher.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides a way to dump list-able entities, processor state and transferred FlowFiles into 'success' relationship,
+ * which is useful to debug test issues especially at automation test environment such as Travis that is difficult to debug.
+ */
+public class ListProcessorTestWatcher extends TestWatcher {
+
+    private static final Logger logger = LoggerFactory.getLogger(ListProcessorTestWatcher.class);
+    private static final Consumer<String> logStateDump = logger::info;
+
+    @FunctionalInterface
+    public interface Provider<T> {
+        T provide();
+    }
+
+    private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+    private final Provider<Map<String, String>> stateMapProvider;
+    private final Provider<List<ListableEntity>> entitiesProvider;
+    private final Provider<List<FlowFile>> successFlowFilesProvider;
+
+    private long startedAtMillis;
+
+    public ListProcessorTestWatcher(Provider<Map<String, String>> stateMapProvider, Provider<List<ListableEntity>> entitiesProvider, Provider<List<FlowFile>> successFlowFilesProvider) {
+        this.stateMapProvider = stateMapProvider;
+        this.entitiesProvider = entitiesProvider;
+        this.successFlowFilesProvider = successFlowFilesProvider;
+    }
+
+    private void log(Consumer<String> dumper, String format, Object ... args) {
+        dumper.accept(String.format(format, args));
+    }
+
+    public void dumpState(final long start) {
+        dumpState(logStateDump, stateMapProvider.provide(), entitiesProvider.provide(), successFlowFilesProvider.provide(), start);
+    }
+
+    private void dumpState(Consumer<String> d, final Map<String, String> state, final List<ListableEntity> entities, final List<FlowFile> flowFiles, final long start) {
+
+        final long nTime = System.currentTimeMillis();
+        log(d, "--------------------------------------------------------------------");
+        log(d, "%-19s   %-13s %-23s %s", "", "timestamp", "date from timestamp", "t0 delta");
+        log(d, "%-19s   %-13s %-23s %s", "-------------------", "-------------", "-----------------------", "--------");
+        log(d, "%-19s = %13d %s %8d", "started at", start, dateFormat.format(start), 0);
+        log(d, "%-19s = %13d %s %8d", "current time", nTime, dateFormat.format(nTime), 0);
+        log(d, "---- processor state -----------------------------------------------");
+        if (state.containsKey("processed.timestamp")) {
+            final long pTime = Long.parseLong(state.get("processed.timestamp"));
+            log(d, "%19s = %13d %s %8d", "processed.timestamp", pTime, dateFormat.format(pTime), pTime - nTime);
+        } else {
+            log(d, "%19s = na", "processed.timestamp");
+        }
+        if (state.containsKey("listing.timestamp")) {
+            final long lTime = Long.parseLong(state.get("listing.timestamp"));
+            log(d, "%19s = %13d %s %8d", "listing.timestamp", lTime, dateFormat.format(lTime), lTime - nTime);
+        } else {
+            log(d, "%19s = na", "listing.timestamp");
+        }
+        log(d, "---- input folder contents -----------------------------------------");
+        entities.sort(Comparator.comparing(ListableEntity::getIdentifier));
+        for (ListableEntity entity : entities) {
+            log(d, "%19s = %12d %s %8d", entity.getIdentifier(), entity.getTimestamp(), dateFormat.format(entity.getTimestamp()), entity.getTimestamp() - nTime);
+        }
+        log(d, "---- output flowfiles ----------------------------------------------");
+        final Map<String, Long> fileTimes = entities.stream().collect(Collectors.toMap(ListableEntity::getIdentifier, ListableEntity::getTimestamp));
+        for (FlowFile ff : flowFiles) {
+            String fName = ff.getAttribute(CoreAttributes.FILENAME.key());
+            Long fTime = fileTimes.get(fName);
+            log(d, "%19s = %13d %s %8d", fName, fTime, dateFormat.format(fTime), fTime - nTime);
+        }
+        log(d, "REL_SUCCESS count = " + flowFiles.size());
+        log(d, "--------------------------------------------------------------------");
+        log(d, "");
+    }
+
+    @Override
+    protected void starting(Description description) {
+        startedAtMillis = System.currentTimeMillis();
+    }
+
+    /**
+     * Throw additional AssertionError with stateDump as its message.
+     */
+    @Override
+    protected void failed(Throwable e, Description description) {
+        if (!(e instanceof AssertionError)) {
+            return;
+        }
+
+        final StringBuilder msg = new StringBuilder("State dump:\n");
+        dumpState(s -> msg.append(s).append("\n"),
+                stateMapProvider.provide(),
+                entitiesProvider.provide(),
+                successFlowFilesProvider.provide(),
+                startedAtMillis);
+        throw new AssertionError(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
index 3c7c08d..01837cb 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
@@ -32,7 +32,7 @@ public interface ListableEntity {
 
 
     /**
-     * @return the timestamp for this entity so that we can be efficient about not performing listings of the same
+     * @return the timestamp for this entity in milliseconds so that we can be efficient about not performing listings of the same
      *         entities multiple times
      */
     long getTimestamp();

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index 2417d52..69705f2 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import org.apache.commons.io.Charsets;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -40,37 +41,78 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.list.AbstractListProcessor;
-import org.apache.nifi.processor.util.list.ListableEntity;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestWatcher;
 
 public class TestAbstractListProcessor {
 
-    static final long DEFAULT_SLEEP_MILLIS = TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2);
+    /**
+     * @return current timestamp in milliseconds, but truncated at specified target precision (e.g. SECONDS or MINUTES).
+     */
+    private static long getCurrentTimestampMillis(final TimeUnit targetPrecision) {
+        final long timestampInTargetPrecision = targetPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+        return TimeUnit.MILLISECONDS.convert(timestampInTargetPrecision, targetPrecision);
+    }
+
+    private static long getSleepMillis(final TimeUnit targetPrecision) {
+        return AbstractListProcessor.LISTING_LAG_MILLIS.get(targetPrecision) * 2;
+    }
+
+    private static final long DEFAULT_SLEEP_MILLIS = getSleepMillis(TimeUnit.MILLISECONDS);
+
+    private ConcreteListProcessor proc;
+    private TestRunner runner;
+
+    @Rule
+    public TestWatcher dumpState = new ListProcessorTestWatcher(
+            () -> {
+                try {
+                    return runner.getStateManager().getState(Scope.LOCAL).toMap();
+                } catch (IOException e) {
+                    throw new RuntimeException("Failed to retrieve state", e);
+                }
+            },
+            () -> proc.entities,
+            () -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList())
+    );
+
+    @Before
+    public void setup() {
+        proc = new ConcreteListProcessor();
+        runner = TestRunners.newTestRunner(proc);
+    }
 
     @Rule
     public final TemporaryFolder testFolder = new TemporaryFolder();
 
+    /**
+     * <p>Ensures that files are listed when those are old enough:
+     *   <li>Files with last modified timestamp those are old enough to determine that those are completely written
+     *     and no further files are expected to be added with the same timestamp.</li>
+     *   <li>This behavior is expected when a processor is scheduled less frequently, such as hourly or daily.</li>
+     * </p>
+     */
     @Test
     public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception {
-        final long oldTimestamp = System.nanoTime() - (AbstractListProcessor.LISTING_LAG_NANOS * 2);
+        final long oldTimestamp = System.currentTimeMillis() - getSleepMillis(TimeUnit.MILLISECONDS);
 
         // These entries have existed before the processor runs at the first time.
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
         proc.addEntity("name", "id", oldTimestamp);
         proc.addEntity("name", "id2", oldTimestamp);
 
         // First run, the above listed entries should be emitted since it has existed.
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-
         runner.run();
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
         runner.clearTransferState();
@@ -83,13 +125,10 @@ public class TestAbstractListProcessor {
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
     }
 
-    @Test
-    public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
+    private void testPreviouslySkippedEntriesEmmitedOnNextIteration(final TimeUnit targetPrecision) throws InterruptedException {
         runner.run();
 
-        final long initialTimestamp = System.nanoTime();
+        final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
 
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         proc.addEntity("name", "id", initialTimestamp);
@@ -101,20 +140,47 @@ public class TestAbstractListProcessor {
         runner.clearTransferState();
 
         // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+        Thread.sleep(getSleepMillis(targetPrecision));
 
         // Run again without introducing any new entries
         runner.run();
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
     }
 
+    /**
+     * <p>Ensures that newly created files should wait to confirm there is no more files created with the same timestamp:
+     *   <li>If files have the latest modified timestamp at an iteration, then those should be postponed to be listed</li>
+     *   <li>If those files still are the latest files at the next iteration, then those should be listed</li>
+     * </p>
+     */
     @Test
-    public void testOnlyNewEntriesEmitted() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.run();
+    public void testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision() throws Exception {
+        testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Same as {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but simulates that the target
+     * filesystem only provide timestamp precision in Seconds.
+     */
+    @Test
+    public void testPreviouslySkippedEntriesEmittedOnNextIterationSecondPrecision() throws Exception {
+        testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.SECONDS);
+    }
+
+    /**
+     * Same as {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but simulates that the target
+     * filesystem only provide timestamp precision in Minutes.
+     * This test is ignored because it needs to wait two minutes. Not good for automated unit testing, but still valuable when executed manually.
+     */
+    @Ignore
+    @Test
+    public void testPreviouslySkippedEntriesEmittedOnNextIterationMinutesPrecision() throws Exception {
+        testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MINUTES);
+    }
+
+    private void testOnlyNewEntriesEmitted(final TimeUnit targetPrecision) throws InterruptedException {
 
-        final long initialTimestamp = System.nanoTime();
+        final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
 
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         proc.addEntity("name", "id", initialTimestamp);
@@ -126,7 +192,7 @@ public class TestAbstractListProcessor {
         runner.clearTransferState();
 
         // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+        Thread.sleep(getSleepMillis(targetPrecision));
 
         // Running again, our two previously seen files are now cleared to be released
         runner.run();
@@ -139,18 +205,20 @@ public class TestAbstractListProcessor {
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
-        proc.addEntity("name", "id3", initialTimestamp - 1);
+        // An entry that is older than already processed entry should not be listed.
+        proc.addEntity("name", "id3", initialTimestamp - targetPrecision.toMillis(1));
         runner.run();
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
+        // If an entry whose timestamp is the same with the last processed timestamp should not be listed.
         proc.addEntity("name", "id2", initialTimestamp);
         runner.run();
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         runner.clearTransferState();
 
         // Now a new file beyond the current time enters
-        proc.addEntity("name", "id2", initialTimestamp + 1);
+        proc.addEntity("name", "id2", initialTimestamp + targetPrecision.toMillis(1));
 
         // It should show up
         runner.run();
@@ -159,19 +227,36 @@ public class TestAbstractListProcessor {
     }
 
     @Test
+    public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception {
+        testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testOnlyNewEntriesEmittedSecondPrecision() throws Exception {
+        testOnlyNewEntriesEmitted(TimeUnit.SECONDS);
+    }
+
+    /**
+     * This test is ignored because it needs to wait two minutes. Not good for automated unit testing, but still valuable when executed manually.
+     */
+    @Ignore
+    @Test
+    public void testOnlyNewEntriesEmittedMinutesPrecision() throws Exception {
+        testOnlyNewEntriesEmitted(TimeUnit.MINUTES);
+    }
+
+    @Test
     public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
 
-        final long initialTimestamp = System.nanoTime();
+        final long initialTimestamp = System.currentTimeMillis();
 
         proc.addEntity("name", "id", initialTimestamp);
         proc.addEntity("name", "id2", initialTimestamp);
 
         // Emulate having state but not having had the processor run such as in a restart
         final Map<String, String> preexistingState = new HashMap<>();
-        preexistingState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, Long.toString(initialTimestamp));
-        preexistingState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, Long.toString(initialTimestamp));
+        preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
+        preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
         runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
 
         // run for the first time
@@ -216,37 +301,35 @@ public class TestAbstractListProcessor {
 
     @Test
     public void testStateStoredInClusterStateManagement() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
+
         final DistributedCache cache = new DistributedCache();
         runner.addControllerService("cache", cache);
         runner.enableControllerService(cache);
         runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
 
-        final long initialTimestamp = System.nanoTime();
+        final long initialTimestamp = System.currentTimeMillis();
 
         proc.addEntity("name", "id", initialTimestamp);
         runner.run();
 
         final Map<String, String> expectedState = new HashMap<>();
         // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0");
+        expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+        expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "0");
         runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
 
         Thread.sleep(DEFAULT_SLEEP_MILLIS);
 
         runner.run();
         // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+        expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+        expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
         runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
     }
 
     @Test
     public void testStateMigratedFromCacheService() throws InitializationException {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
+
         final DistributedCache cache = new DistributedCache();
         runner.addControllerService("cache", cache);
         runner.enableControllerService(cache);
@@ -261,15 +344,13 @@ public class TestAbstractListProcessor {
         final MockStateManager stateManager = runner.getStateManager();
         final Map<String, String> expectedState = new HashMap<>();
         // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
+        expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492");
+        expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492");
         stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
     }
 
     @Test
     public void testNoStateToMigrate() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
 
         runner.run();
 
@@ -280,8 +361,6 @@ public class TestAbstractListProcessor {
 
     @Test
     public void testStateMigratedFromLocalFile() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
 
         // Create a file that we will populate with the desired state
         File persistenceFile = testFolder.newFile(proc.persistenceFilename);
@@ -305,20 +384,17 @@ public class TestAbstractListProcessor {
         // Verify the state manager now maintains the associated state
         final Map<String, String> expectedState = new HashMap<>();
         // Ensure only timestamp is migrated
-        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
-        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
+        expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492");
+        expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492");
         runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
     }
 
     @Test
     public void testResumeListingAfterClearingState() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.run();
 
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
 
-        final long initialEventTimestamp = System.nanoTime();
+        final long initialEventTimestamp = System.currentTimeMillis();
         proc.addEntity("name", "id", initialEventTimestamp);
         proc.addEntity("name", "id2", initialEventTimestamp);
 
@@ -350,8 +426,7 @@ public class TestAbstractListProcessor {
 
     @Test
     public void testFetchOnStart() throws InitializationException {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
+
         final DistributedCache cache = new DistributedCache();
         runner.addControllerService("cache", cache);
         runner.enableControllerService(cache);
@@ -364,11 +439,10 @@ public class TestAbstractListProcessor {
 
     @Test
     public void testOnlyNewStateStored() throws Exception {
-        final ConcreteListProcessor proc = new ConcreteListProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
+
         runner.run();
 
-        final long initialTimestamp = System.nanoTime();
+        final long initialTimestamp = System.currentTimeMillis();
 
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         proc.addEntity("name", "id", initialTimestamp);
@@ -390,8 +464,8 @@ public class TestAbstractListProcessor {
         final Map<String, String> map = stateMap.toMap();
         // Ensure only timestamp is migrated
         assertEquals(2, map.size());
-        assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
-        assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
+        assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
+        assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
 
         proc.addEntity("new name", "new id", initialTimestamp + 1);
         runner.run();
@@ -403,9 +477,9 @@ public class TestAbstractListProcessor {
         assertEquals(3, updatedStateMap.getVersion());
 
         assertEquals(2, updatedStateMap.toMap().size());
-        assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
+        assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
         // Processed timestamp is now caught up
-        assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
+        assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
     }
 
     private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
@@ -502,7 +576,9 @@ public class TestAbstractListProcessor {
 
         @Override
         protected Map<String, String> createAttributes(final ListableEntity entity, final ProcessContext context) {
-            return Collections.emptyMap();
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), entity.getIdentifier());
+            return attributes;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
index f445588..8204830 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
@@ -84,6 +84,7 @@ public class ListFTP extends ListFileTransfer {
         properties.add(FTPTransfer.PROXY_PORT);
         properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
         properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
+        properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
         return properties;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index 33d7867..5f2e2d2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -213,6 +213,7 @@ public class ListFile extends AbstractListProcessor<FileInfo> {
         properties.add(MIN_SIZE);
         properties.add(MAX_SIZE);
         properties.add(IGNORE_HIDDEN_FILES);
+        properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
         this.properties = Collections.unmodifiableList(properties);
 
         final Set<Relationship> relationships = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
index cb5a7e7..b7805e9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListSFTP.java
@@ -82,6 +82,7 @@ public class ListSFTP extends ListFileTransfer {
         properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
         properties.add(SFTPTransfer.DATA_TIMEOUT);
         properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
+        properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION);
         return properties;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/28ee7022/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 26dcbbf..1b5b2a4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -30,54 +30,87 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processor.util.list.ListProcessorTestWatcher;
+import org.apache.nifi.processors.standard.util.FileInfo;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.Description;
 
 public class TestListFile {
 
-    final String TESTDIR = "target/test/data/in";
-    final File testDir = new File(TESTDIR);
-    ListFile processor;
-    TestRunner runner;
-    ProcessContext context;
+    private final String TESTDIR = "target/test/data/in";
+    private final File testDir = new File(TESTDIR);
+    private ListFile processor;
+    private TestRunner runner;
+    private ProcessContext context;
 
     // Testing factors in milliseconds for file ages that are configured on each run by resetAges()
     // age#millis are relative time references
     // time#millis are absolute time references
     // age#filter are filter label strings for the filter properties
-    Long syncTime = System.currentTimeMillis();
-    Long time0millis, time1millis, time2millis, time3millis, time4millis, time5millis;
-    Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis;
-    String age0, age1, age2, age3, age4, age5;
-
-    static final long DEFAULT_SLEEP_MILLIS = TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2);
+    private Long syncTime = getTestModifiedTime();
+    private Long time0millis, time1millis, time2millis, time3millis, time4millis, time5millis;
+    private Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis;
+    private String age0, age1, age2, age3, age4, age5;
+
+    @Rule
+    public ListProcessorTestWatcher dumpState = new ListProcessorTestWatcher(
+            () -> {
+                try {
+                    return runner.getStateManager().getState(Scope.LOCAL).toMap();
+                } catch (IOException e) {
+                    throw new RuntimeException("Failed to retrieve state", e);
+                }
+            },
+            () -> listFiles(testDir).stream()
+                    .map(f -> new FileInfo.Builder().filename(f.getName()).lastModifiedTime(f.lastModified()).build()).collect(Collectors.toList()),
+            () -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList())
+    ) {
+        @Override
+        protected void finished(Description description) {
+            try {
+                // In order to refer files in testDir, we want to execute this rule before tearDown, because tearDown removes files.
+                // And @After is always executed before @Rule.
+                tearDown();
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to tearDown.", e);
+            }
+        }
+    };
 
     @Before
     public void setUp() throws Exception {
         processor = new ListFile();
         runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, AbstractListProcessor.PRECISION_SECONDS.getValue());
         context = runner.getProcessContext();
         deleteDirectory(testDir);
         assertTrue("Unable to create test data directory " + testDir.getAbsolutePath(), testDir.exists() || testDir.mkdirs());
         resetAges();
     }
 
-    @After
     public void tearDown() throws Exception {
         deleteDirectory(testDir);
         File tempFile = processor.getPersistenceFile();
@@ -91,14 +124,38 @@ public class TestListFile {
         }
     }
 
+    private List<File> listFiles(final File file) {
+        if (file.isDirectory()) {
+            final List<File> result = new ArrayList<>();
+            Optional.ofNullable(file.listFiles()).ifPresent(files -> Arrays.stream(files).forEach(f -> result.addAll(listFiles(f))));
+            return result;
+        } else {
+            return Collections.singletonList(file);
+        }
+    }
+
     /**
      * This method ensures runner clears transfer state,
-     * and sleeps the current thread for DEFAULT_SLEEP_MILLIS before executing runner.run().
+     * and sleeps the current thread for specific period defined at {@link AbstractListProcessor#LISTING_LAG_MILLIS}
+     * based on local filesystem timestamp precision before executing runner.run().
      */
     private void runNext() throws InterruptedException {
         runner.clearTransferState();
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        final List<File> files = listFiles(testDir);
+        final boolean isMillisecondSupported = files.stream().anyMatch(file -> file.lastModified() % 1_000 > 0);
+        final Long lagMillis;
+        if (isMillisecondSupported) {
+            lagMillis = AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS);
+        } else {
+            // Filesystems such as Mac OS X HFS (Hierarchical File System) or EXT3 are known that only support timestamp in seconds precision.
+            lagMillis = AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.SECONDS);
+        }
+        Thread.sleep(lagMillis * 2);
+
+        final long startedAtMillis = System.currentTimeMillis();
         runner.run();
+        dumpState.dumpState(startedAtMillis);
     }
 
     @Test
@@ -305,7 +362,7 @@ public class TestListFile {
 
     @Test
     public void testFilterHidden() throws Exception {
-        final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+        final long now = getTestModifiedTime();
 
         FileOutputStream fos;
 
@@ -388,7 +445,7 @@ public class TestListFile {
 
     @Test
     public void testFilterPathPattern() throws Exception {
-        final long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+        final long now = getTestModifiedTime();
 
         final File subdir1 = new File(TESTDIR + "/subdir1");
         assertTrue(subdir1.mkdirs());
@@ -595,20 +652,20 @@ public class TestListFile {
      * Provides "now" minus 1 second in millis
     */
     private static long getTestModifiedTime() {
-        final long nowNanos = System.nanoTime();
+        final long nowMillis = System.currentTimeMillis();
         // Subtract a second to avoid possible rounding issues
-        final long nowSeconds = TimeUnit.SECONDS.convert(nowNanos, TimeUnit.NANOSECONDS) - 1;
+        final long nowSeconds = TimeUnit.SECONDS.convert(nowMillis, TimeUnit.MILLISECONDS) - 1;
         return TimeUnit.MILLISECONDS.convert(nowSeconds, TimeUnit.SECONDS);
     }
 
-    public void resetAges() {
-        syncTime = System.currentTimeMillis();
+    private void resetAges() {
+        syncTime = getTestModifiedTime();
 
         age0millis = 0L;
-        age1millis = 2000L;
-        age2millis = 5000L;
-        age3millis = 7000L;
-        age4millis = 10000L;
+        age1millis = 5000L;
+        age2millis = 10000L;
+        age3millis = 15000L;
+        age4millis = 20000L;
         age5millis = 100000L;
 
         time0millis = syncTime - age0millis;


[2/2] nifi git commit: NIFI-3332: ListXXX to not miss files with the latest processed timestamp

Posted by bb...@apache.org.
NIFI-3332: ListXXX to not miss files with the latest processed timestamp

Before this fix, it's possible that ListXXX processors can miss files those have the same timestamp as the one which was the latest processed timestamp at the previous cycle. Since it only used timestamps, it was not possible to determine whether a file is already processed or not.

However, storing every single processed identifier as we used to will not perform well.
Instead, this commit makes ListXXX to store only identifiers those have the latest timestamp at a cycle to minimize the amount of state data to store.

NIFI-3332: ListXXX to not miss files with the latest processed timestamp

- Fixed TestAbstractListProcessor to use appropriate time precision.
  Without this fix, arbitrary test can fail if generated timestamp does
  not have the desired time unit value, e.g. generated '10:51:00' where
  second precision is tested.
- Fixed TestFTP.basicFileList to use millisecond time precision explicitly
  because FakeFtpServer's time precision is in minutes.
- Changed junit dependency scope to 'provided' as it is needed by
  ListProcessorTestWatcher which is shared among different modules.

This closes #1975.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e68ff153
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e68ff153
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e68ff153

Branch: refs/heads/master
Commit: e68ff153e81ddb82d1136d44a96bdb7a70da86d1
Parents: 28ee702
Author: Koji Kawamura <ij...@apache.org>
Authored: Tue Jul 4 17:34:31 2017 +0900
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Aug 28 11:31:04 2017 -0400

----------------------------------------------------------------------
 .../nifi-processor-utils/pom.xml                |  3 +-
 .../util/list/AbstractListProcessor.java        | 73 ++++++++++++++------
 .../util/list/TestAbstractListProcessor.java    | 37 ++++++++--
 .../nifi/processors/standard/TestFTP.java       | 10 ++-
 .../nifi/processors/standard/TestListFile.java  | 40 +++++++++++
 5 files changed, 137 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e68ff153/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
index dd38e10..a86dda1 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
@@ -68,9 +68,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <!-- Dependency marked as provided, not test, because ListProcessorTestWatcher uses TestWatcher -->
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <scope>compile</scope>
+            <scope>provided</scope>
         </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e68ff153/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 8d93a65..52049ed 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -32,6 +32,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
@@ -178,6 +179,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
     private volatile Long lastRunTimeNanos = 0L;
     private volatile boolean justElectedPrimaryNode = false;
     private volatile boolean resetState = false;
+    private volatile List<String> latestIdentifiersProcessed = new ArrayList<>();
 
     /*
      * A constant used in determining an internal "yield" of processing files. Given the logic to provide a pause on the newest
@@ -194,6 +196,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
     }
     static final String LATEST_LISTED_ENTRY_TIMESTAMP_KEY = "listing.timestamp";
     static final String LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY = "processed.timestamp";
+    static final String IDENTIFIER_PREFIX = "id";
 
     public File getPersistenceFile() {
         return new File("conf/state/" + getIdentifier());
@@ -307,6 +310,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
                 // if the local file's latest timestamp is beyond that of the value provided from the cache, replace
                 if (minTimestamp == null || localTimestamp > minTimestamp) {
                     minTimestamp = localTimestamp;
+                    latestIdentifiersProcessed.clear();
+                    latestIdentifiersProcessed.addAll(listing.getMatchingIdentifiers());
                 }
             }
 
@@ -317,16 +322,20 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         }
 
         if (minTimestamp != null) {
-            persist(minTimestamp, minTimestamp, stateManager, scope);
+            persist(minTimestamp, minTimestamp, latestIdentifiersProcessed, stateManager, scope);
         }
     }
 
     private void persist(final long latestListedEntryTimestampThisCycleMillis,
                          final long lastProcessedLatestEntryTimestampMillis,
+                         final List<String> processedIdentifiesWithLatestTimestamp,
                          final StateManager stateManager, final Scope scope) throws IOException {
-        final Map<String, String> updatedState = new HashMap<>(1);
+        final Map<String, String> updatedState = new HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2);
         updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis));
         updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis));
+        for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); i++) {
+            updatedState.put(IDENTIFIER_PREFIX + "." + i, processedIdentifiesWithLatestTimestamp.get(i));
+        }
         stateManager.setState(updatedState, scope);
     }
 
@@ -350,19 +359,27 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
                 // Attempt to retrieve state from the state manager if a last listing was not yet established or
                 // if just elected the primary node
                 final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
-                final String latestListedEntryTimestampString = stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY);
-                final String lastProcessedLatestEntryTimestampString= stateMap.get(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY);
-                if (lastProcessedLatestEntryTimestampString != null) {
-                    this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(lastProcessedLatestEntryTimestampString);
-                }
-                if (latestListedEntryTimestampString != null) {
-                    minTimestampToListMillis = Long.parseLong(latestListedEntryTimestampString);
-                    // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
-                    if (minTimestampToListMillis == this.lastListedLatestEntryTimestampMillis) {
-                        context.yield();
-                        return;
-                    } else {
-                        this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
+                latestIdentifiersProcessed.clear();
+                for (Map.Entry<String, String> state : stateMap.toMap().entrySet()) {
+                    final String k = state.getKey();
+                    final String v = state.getValue();
+                    if (v == null || v.isEmpty()) {
+                        continue;
+                    }
+
+                    if (LATEST_LISTED_ENTRY_TIMESTAMP_KEY.equals(k)) {
+                        minTimestampToListMillis = Long.parseLong(v);
+                        // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
+                        if (minTimestampToListMillis.equals(this.lastListedLatestEntryTimestampMillis)) {
+                            context.yield();
+                            return;
+                        } else {
+                            this.lastListedLatestEntryTimestampMillis = minTimestampToListMillis;
+                        }
+                    } else if (LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY.equals(k)) {
+                        this.lastProcessedLatestEntryTimestampMillis = Long.parseLong(v);
+                    } else if (k.startsWith(IDENTIFIER_PREFIX)) {
+                        latestIdentifiersProcessed.add(v);
                     }
                 }
                 justElectedPrimaryNode = false;
@@ -405,7 +422,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
                 targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0;
             }
             // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis > lastProcessedLatestEntryTimestampMillis;
+            final boolean newEntry = minTimestampToListMillis == null || entityTimestampMillis >= minTimestampToListMillis && entityTimestampMillis >= lastProcessedLatestEntryTimestampMillis;
 
             if (newEntry) {
                 List<T> entitiesForTimestamp = orderedEntries.get(entity.getTimestamp());
@@ -439,7 +456,10 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
                  *   - The latest listed entity timestamp is equal to the last processed time, meaning we handled those items originally passed over. No need to process it again.
                  */
                 final long  listingLagNanos = TimeUnit.MILLISECONDS.toNanos(listingLagMillis);
-                if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos || latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)) {
+                if (currentRunTimeNanos - lastRunTimeNanos < listingLagNanos
+                        || (latestListedEntryTimestampThisCycleMillis.equals(lastProcessedLatestEntryTimestampMillis)
+                            && orderedEntries.get(latestListedEntryTimestampThisCycleMillis).stream()
+                                    .allMatch(entity -> latestIdentifiersProcessed.contains(entity.getIdentifier())))) {
                     context.yield();
                     return;
                 }
@@ -456,9 +476,14 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
                 }
             }
 
+            for (Map.Entry<Long, List<T>> timestampEntities : orderedEntries.entrySet()) {
+                List<T> entities = timestampEntities.getValue();
+                if (timestampEntities.getKey().equals(lastProcessedLatestEntryTimestampMillis)) {
+                    // Filter out previously processed entities.
+                    entities = entities.stream().filter(entity -> !latestIdentifiersProcessed.contains(entity.getIdentifier())).collect(Collectors.toList());
+                }
 
-            for (List<T> timestampEntities : orderedEntries.values()) {
-                for (T entity : timestampEntities) {
+                for (T entity : entities) {
                     // Create the FlowFile for this path.
                     final Map<String, String> attributes = createAttributes(entity, context);
                     FlowFile flowFile = session.create();
@@ -476,6 +501,13 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
                 // If there have been files created, update the last timestamp we processed.
                 // Retrieving lastKey instead of using latestListedEntryTimestampThisCycleMillis is intentional here,
                 // because latestListedEntryTimestampThisCycleMillis might be removed if it's not old enough.
+                if (!orderedEntries.lastKey().equals(lastProcessedLatestEntryTimestampMillis)) {
+                    // If the latest timestamp at this cycle becomes different than the previous one, we need to clear identifiers.
+                    // If it didn't change, we need to add identifiers.
+                    latestIdentifiersProcessed.clear();
+                }
+                // Capture latestIdentifierProcessed.
+                latestIdentifiersProcessed.addAll(orderedEntries.lastEntry().getValue().stream().map(T::getIdentifier).collect(Collectors.toList()));
                 lastProcessedLatestEntryTimestampMillis = orderedEntries.lastKey();
                 getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated});
                 session.commit();
@@ -494,7 +526,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
                 // the distributed state cache, the node can continue to run (if it is primary node).
                 try {
                     lastListedLatestEntryTimestampMillis = latestListedEntryTimestampThisCycleMillis;
-                    persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, context.getStateManager(), getStateScope(context));
+                    persist(latestListedEntryTimestampThisCycleMillis, lastProcessedLatestEntryTimestampMillis, latestIdentifiersProcessed, context.getStateManager(), getStateScope(context));
                 } catch (final IOException ioe) {
                     getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
                         + "if another node begins executing this Processor, data duplication may occur.", ioe);
@@ -518,6 +550,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
         lastListedLatestEntryTimestampMillis = null;
         lastProcessedLatestEntryTimestampMillis = 0L;
         lastRunTimeNanos = 0L;
+        latestIdentifiersProcessed.clear();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/e68ff153/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index 69705f2..1ecbce7 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -17,6 +17,10 @@
 
 package org.apache.nifi.processor.util.list;
 
+import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MILLIS;
+import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MINUTES;
+import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_SECONDS;
+import static org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION;
 import static org.junit.Assert.assertEquals;
 
 import java.io.File;
@@ -130,6 +134,8 @@ public class TestAbstractListProcessor {
 
         final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
 
+        setTargetSystemTimestampPrecision(targetPrecision);
+
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         proc.addEntity("name", "id", initialTimestamp);
         proc.addEntity("name", "id2", initialTimestamp);
@@ -182,6 +188,8 @@ public class TestAbstractListProcessor {
 
         final long initialTimestamp = getCurrentTimestampMillis(targetPrecision);
 
+        setTargetSystemTimestampPrecision(targetPrecision);
+
         runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
         proc.addEntity("name", "id", initialTimestamp);
         proc.addEntity("name", "id2", initialTimestamp);
@@ -226,6 +234,20 @@ public class TestAbstractListProcessor {
         runner.clearTransferState();
     }
 
+    private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) {
+        switch (targetPrecision) {
+            case MINUTES:
+                runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MINUTES);
+                break;
+            case SECONDS:
+                runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_SECONDS);
+                break;
+            case MILLISECONDS:
+                runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MILLIS);
+                break;
+        }
+    }
+
     @Test
     public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception {
         testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS);
@@ -257,6 +279,8 @@ public class TestAbstractListProcessor {
         final Map<String, String> preexistingState = new HashMap<>();
         preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
         preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp));
+        preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id");
+        preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id2");
         runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
 
         // run for the first time
@@ -324,6 +348,7 @@ public class TestAbstractListProcessor {
         // Ensure only timestamp is migrated
         expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
         expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+        expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id");
         runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
     }
 
@@ -383,9 +408,10 @@ public class TestAbstractListProcessor {
 
         // Verify the state manager now maintains the associated state
         final Map<String, String> expectedState = new HashMap<>();
-        // Ensure only timestamp is migrated
+        // Ensure timestamp and identifies are migrated
         expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492");
         expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492");
+        expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id");
         runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
     }
 
@@ -462,10 +488,12 @@ public class TestAbstractListProcessor {
         assertEquals(2, stateMap.getVersion());
 
         final Map<String, String> map = stateMap.toMap();
-        // Ensure only timestamp is migrated
-        assertEquals(2, map.size());
+        // Ensure timestamp and identifiers are migrated
+        assertEquals(4, map.size());
         assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
         assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
+        assertEquals("id", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0"));
+        assertEquals("id2", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".1"));
 
         proc.addEntity("new name", "new id", initialTimestamp + 1);
         runner.run();
@@ -476,10 +504,11 @@ public class TestAbstractListProcessor {
         StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
         assertEquals(3, updatedStateMap.getVersion());
 
-        assertEquals(2, updatedStateMap.toMap().size());
+        assertEquals(3, updatedStateMap.toMap().size());
         assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
         // Processed timestamp is now caught up
         assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
+        assertEquals("new id", updatedStateMap.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0"));
     }
 
     private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e68ff153/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
index d8797dc..96a4236 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
 import org.apache.nifi.processors.standard.util.FTPTransfer;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
@@ -41,6 +42,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 
@@ -201,7 +203,7 @@ public class TestFTP {
     }
 
     @Test
-    public void basicFileList() throws IOException {
+    public void basicFileList() throws IOException, InterruptedException {
         FileSystem results = fakeFtpServer.getFileSystem();
 
         FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2");
@@ -217,10 +219,16 @@ public class TestFTP {
         runner.setProperty(FTPTransfer.PASSWORD, password);
         runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort));
         runner.setProperty(ListFTP.REMOTE_PATH, "/");
+        // FakeFTPServer has timestamp precision in minutes.
+        // Specify milliseconds precision so that test does not need to wait for minutes.
+        runner.setProperty(ListFile.TARGET_SYSTEM_TIMESTAMP_PRECISION, ListFile.PRECISION_MILLIS);
         runner.assertValid();
 
+        // Ensure wait for enough lag time.
+        Thread.sleep(AbstractListProcessor.LISTING_LAG_MILLIS.get(TimeUnit.MILLISECONDS) * 2);
         runner.run();
 
+        runner.assertTransferCount(FetchFTP.REL_SUCCESS, 1);
         final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0);
         runner.assertAllFlowFilesContainAttribute("ftp.remote.host");
         runner.assertAllFlowFilesContainAttribute("ftp.remote.port");

http://git-wip-us.apache.org/repos/asf/nifi/blob/e68ff153/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
index 1b5b2a4..bf2755b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java
@@ -33,8 +33,10 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -646,6 +648,44 @@ public class TestListFile {
         assertEquals(false, processor.isListingResetNecessary(new PropertyDescriptor.Builder().name("x").build()));
     }
 
+    private void makeTestFile(final String name, final long millis, final Map<String, Long> fileTimes) throws IOException {
+        final File file = new File(TESTDIR + name);
+        assertTrue(file.createNewFile());
+        assertTrue(file.setLastModified(millis));
+        fileTimes.put(file.getName(), file.lastModified());
+    }
+
+    @Test
+    public void testFilterRunMidFileWrites() throws Exception {
+        final Map<String, Long> fileTimes = new HashMap<>();
+
+        runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
+
+        makeTestFile("/batch1-age3.txt", time3millis, fileTimes);
+        makeTestFile("/batch1-age4.txt", time4millis, fileTimes);
+        makeTestFile("/batch1-age5.txt", time5millis, fileTimes);
+
+        // check files
+        runNext();
+
+        runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 3);
+        assertEquals(3, runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS).size());
+
+        // should be picked since it's newer than age3
+        makeTestFile("/batch2-age2.txt", time2millis, fileTimes);
+        // should be picked even if it has the same age3 timestamp, because it wasn't there at the previous cycle.
+        makeTestFile("/batch2-age3.txt", time3millis, fileTimes);
+        // should be ignored since it's older than age3
+        makeTestFile("/batch2-age4.txt", time4millis, fileTimes);
+
+        runNext();
+
+        runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
+        runner.assertTransferCount(ListFile.REL_SUCCESS, 2);
+        assertEquals(2, runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS).size());
+    }
+
     /*
      * HFS+, default for OS X, only has granularity to one second, accordingly, we go back in time to establish consistent test cases
      *