You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by br...@apache.org on 2017/05/02 18:41:01 UTC

[4/5] nifi git commit: NIFI-1833 - Addressed issues from PR review.

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/EntityListing.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/EntityListing.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/EntityListing.java
new file mode 100644
index 0000000..a815306
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/EntityListing.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processor.util.list;
+
+import java.util.Collection;
+import java.util.Date;
+
+import javax.xml.bind.annotation.XmlTransient;
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * A simple POJO for maintaining state about the last entities listed by an AbstractListProcessor that was performed so that
+ * we can avoid pulling the same file multiple times
+ */
+@XmlType(name = "listing")
+public class EntityListing {
+
+    private Date latestTimestamp;
+    private Collection<String> matchingIdentifiers;
+
+    /**
+     * @return the modification date of the newest file that was contained in the listing
+     */
+    public Date getLatestTimestamp() {
+        return latestTimestamp;
+    }
+
+    /**
+     * Sets the timestamp of the modification date of the newest file that was contained in the listing
+     *
+     * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the listing
+     */
+    public void setLatestTimestamp(Date latestTimestamp) {
+        this.latestTimestamp = latestTimestamp;
+    }
+
+    /**
+     * @return a Collection containing the identifiers of all entities in the listing whose timestamp
+     *         was equal to {@link #getLatestTimestamp()}
+     */
+    @XmlTransient
+    public Collection<String> getMatchingIdentifiers() {
+        return matchingIdentifiers;
+    }
+
+    /**
+     * Sets the Collection containing the identifiers of all entities in the listing whose Timestamp was
+     * equal to {@link #getLatestTimestamp()}
+     *
+     * @param matchingIdentifiers the identifiers that have last modified date matching the latest timestamp
+     */
+    public void setMatchingIdentifiers(Collection<String> matchingIdentifiers) {
+        this.matchingIdentifiers = matchingIdentifiers;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
new file mode 100644
index 0000000..3c7c08d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntity.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processor.util.list;
+
+public interface ListableEntity {
+
+    /**
+     * @return The name of the remote entity
+     */
+    String getName();
+
+    /**
+     * @return the identifier of the remote entity. This may or may not be the same as the name of the
+     *         entity but should be unique across all entities.
+     */
+    String getIdentifier();
+
+
+    /**
+     * @return the timestamp for this entity so that we can be efficient about not performing listings of the same
+     *         entities multiple times
+     */
+    long getTimestamp();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
new file mode 100644
index 0000000..2417d52
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processor.util.list;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.Charsets;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processor.util.list.ListableEntity;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAbstractListProcessor {
+
+    static final long DEFAULT_SLEEP_MILLIS = TimeUnit.NANOSECONDS.toMillis(AbstractListProcessor.LISTING_LAG_NANOS * 2);
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder();
+
+    @Test
+    public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception {
+        final long oldTimestamp = System.nanoTime() - (AbstractListProcessor.LISTING_LAG_NANOS * 2);
+
+        // These entries have existed before the processor runs at the first time.
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        proc.addEntity("name", "id", oldTimestamp);
+        proc.addEntity("name", "id2", oldTimestamp);
+
+        // First run, the above listed entries should be emitted since it has existed.
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        // Run again without introducing any new entries
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void testPreviouslySkippedEntriesEmittedOnNextIteration() throws Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.run();
+
+        final long initialTimestamp = System.nanoTime();
+
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+
+        // First run, the above listed entries would be skipped to avoid write synchronization issues
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        // Run again without introducing any new entries
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+    }
+
+    @Test
+    public void testOnlyNewEntriesEmitted() throws Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.run();
+
+        final long initialTimestamp = System.nanoTime();
+
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+
+        // First run, the above listed entries would be skipped to avoid write synchronization issues
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        // Running again, our two previously seen files are now cleared to be released
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        // Verify no new old files show up
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id3", initialTimestamp - 1);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Now a new file beyond the current time enters
+        proc.addEntity("name", "id2", initialTimestamp + 1);
+
+        // It should show up
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        final long initialTimestamp = System.nanoTime();
+
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
+
+        // Emulate having state but not having had the processor run such as in a restart
+        final Map<String, String> preexistingState = new HashMap<>();
+        preexistingState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, Long.toString(initialTimestamp));
+        preexistingState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, Long.toString(initialTimestamp));
+        runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
+
+        // run for the first time
+        runner.run();
+
+        // First run, the above listed entries would be skipped
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        // Running again, these files should be eligible for transfer and again skipped
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Verify no new old files show up
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id3", initialTimestamp - 1);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Now a new file beyond the current time enters
+        proc.addEntity("name", "id2", initialTimestamp + 1);
+
+        // It should now show up
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testStateStoredInClusterStateManagement() throws Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        final DistributedCache cache = new DistributedCache();
+        runner.addControllerService("cache", cache);
+        runner.enableControllerService(cache);
+        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
+
+        final long initialTimestamp = System.nanoTime();
+
+        proc.addEntity("name", "id", initialTimestamp);
+        runner.run();
+
+        final Map<String, String> expectedState = new HashMap<>();
+        // Ensure only timestamp is migrated
+        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "0");
+        runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
+
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        runner.run();
+        // Ensure only timestamp is migrated
+        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, String.valueOf(initialTimestamp));
+        runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
+    }
+
+    @Test
+    public void testStateMigratedFromCacheService() throws InitializationException {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        final DistributedCache cache = new DistributedCache();
+        runner.addControllerService("cache", cache);
+        runner.enableControllerService(cache);
+        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
+
+        final String serviceState = "{\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}";
+        final String cacheKey = runner.getProcessor().getIdentifier() + ".lastListingTime./path";
+        cache.stored.put(cacheKey, serviceState);
+
+        runner.run();
+
+        final MockStateManager stateManager = runner.getStateManager();
+        final Map<String, String> expectedState = new HashMap<>();
+        // Ensure only timestamp is migrated
+        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
+        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
+        stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
+    }
+
+    @Test
+    public void testNoStateToMigrate() throws Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        runner.run();
+
+        final MockStateManager stateManager = runner.getStateManager();
+        final Map<String, String> expectedState = new HashMap<>();
+        stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
+    }
+
+    @Test
+    public void testStateMigratedFromLocalFile() throws Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        // Create a file that we will populate with the desired state
+        File persistenceFile = testFolder.newFile(proc.persistenceFilename);
+        // Override the processor's internal persistence file
+        proc.persistenceFile = persistenceFile;
+
+        // Local File persistence was a properties file format of <key>=<JSON entity listing representation>
+        // Our ConcreteListProcessor is centered around files which are provided for a given path
+        final String serviceState = proc.getPath(runner.getProcessContext()) + "={\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}";
+
+        // Create a persistence file of the format anticipated
+        try (FileOutputStream fos = new FileOutputStream(persistenceFile);) {
+            fos.write(serviceState.getBytes(Charsets.UTF_8));
+        }
+
+        runner.run();
+
+        // Verify the local persistence file is removed
+        Assert.assertTrue("Failed to remove persistence file", !persistenceFile.exists());
+
+        // Verify the state manager now maintains the associated state
+        final Map<String, String> expectedState = new HashMap<>();
+        // Ensure only timestamp is migrated
+        expectedState.put(AbstractListProcessor.LISTING_TIMESTAMP_KEY, "1492");
+        expectedState.put(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY, "1492");
+        runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
+    }
+
+    @Test
+    public void testResumeListingAfterClearingState() throws Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+
+        final long initialEventTimestamp = System.nanoTime();
+        proc.addEntity("name", "id", initialEventTimestamp);
+        proc.addEntity("name", "id2", initialEventTimestamp);
+
+        // Add entities but these should not be transferred as they are the latest values
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+
+        // after providing a pause in listings, the files should now  transfer
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        // Verify entities are not transferred again for the given state
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Clear state for this processor, eradicating timestamp
+        runner.getStateManager().clear(Scope.CLUSTER);
+        Assert.assertEquals("State is not empty for this component after clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size());
+
+        // Ensure the original files are now transferred again.
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testFetchOnStart() throws InitializationException {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        final DistributedCache cache = new DistributedCache();
+        runner.addControllerService("cache", cache);
+        runner.enableControllerService(cache);
+        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache");
+
+        runner.run();
+
+        assertEquals(1, cache.fetchCount);
+    }
+
+    @Test
+    public void testOnlyNewStateStored() throws Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.run();
+
+        final long initialTimestamp = System.nanoTime();
+
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(2, stateMap.getVersion());
+
+        final Map<String, String> map = stateMap.toMap();
+        // Ensure only timestamp is migrated
+        assertEquals(2, map.size());
+        assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
+        assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
+
+        proc.addEntity("new name", "new id", initialTimestamp + 1);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(3, updatedStateMap.getVersion());
+
+        assertEquals(2, updatedStateMap.toMap().size());
+        assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LISTING_TIMESTAMP_KEY));
+        // Processed timestamp is now caught up
+        assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.PROCESSED_TIMESTAMP_KEY));
+    }
+
+    private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient {
+        private final Map<Object, Object> stored = new HashMap<>();
+        private int fetchCount = 0;
+
+        @Override
+        public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+            return false;
+        }
+
+        @Override
+        public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
+            return null;
+        }
+
+        @Override
+        public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+            return false;
+        }
+
+        @Override
+        public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+            stored.put(key, value);
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+            fetchCount++;
+            return (V) stored.get(key);
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+            final Object value = stored.remove(key);
+            return value != null;
+        }
+
+        @Override
+        public long removeByPattern(String regex) throws IOException {
+            final List<Object> removedRecords = new ArrayList<>();
+            Pattern p = Pattern.compile(regex);
+            for (Object key : stored.keySet()) {
+                // Key must be backed by something that can be converted into a String
+                Matcher m = p.matcher(key.toString());
+                if (m.matches()) {
+                    removedRecords.add(stored.get(key));
+                }
+            }
+            final long numRemoved = removedRecords.size();
+            removedRecords.forEach(stored::remove);
+            return numRemoved;
+        }
+    }
+
+
+    private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> {
+        private final List<ListableEntity> entities = new ArrayList<>();
+
+        public final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json";
+        public String persistenceFolder = "target/";
+        public File persistenceFile = new File(persistenceFolder + persistenceFilename);
+
+        @Override
+        public File getPersistenceFile() {
+            return persistenceFile;
+        }
+
+        public void addEntity(final String name, final String identifier, final long timestamp) {
+            final ListableEntity entity = new ListableEntity() {
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public String getIdentifier() {
+                    return identifier;
+                }
+
+                @Override
+                public long getTimestamp() {
+                    return timestamp;
+                }
+            };
+
+            entities.add(entity);
+        }
+
+        @Override
+        protected Map<String, String> createAttributes(final ListableEntity entity, final ProcessContext context) {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        protected String getPath(final ProcessContext context) {
+            return "/path";
+        }
+
+        @Override
+        protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
+            return Collections.unmodifiableList(entities);
+        }
+
+        @Override
+        protected boolean isListingResetNecessary(PropertyDescriptor property) {
+            return false;
+        }
+
+        @Override
+        protected Scope getStateScope(final ProcessContext context) {
+            return Scope.CLUSTER;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
deleted file mode 100644
index eceee1d..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.annotation.behavior.Stateful;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
-import org.apache.nifi.annotation.notification.PrimaryNodeState;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateManager;
-import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.distributed.cache.client.Deserializer;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
-import org.apache.nifi.distributed.cache.client.exception.SerializationException;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.standard.util.EntityListing;
-import org.apache.nifi.processors.standard.util.ListableEntity;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-
-/**
- * <p>
- * An Abstract Processor that is intended to simplify the coding required in order to perform Listing operations of remote resources.
- * Those remote resources may be files, "objects", "messages", or any other sort of entity that may need to be listed in such a way that
- * we identity the entity only once. Each of these objects, messages, etc. is referred to as an "entity" for the scope of this Processor.
- * </p>
- * <p>
- * This class is responsible for triggering the listing to occur, filtering the results returned such that only new (unlisted) entities
- * or entities that have been modified will be emitted from the Processor.
- * </p>
- * <p>
- * In order to make use of this abstract class, the entities listed must meet the following criteria:
- * </p>
- * <ul>
- * <li>
- * Entity must have a timestamp associated with it. This timestamp is used to determine if entities are "new" or not. Any entity that is
- * returned by the listing will be considered "new" if the timestamp is later than the latest timestamp pulled.
- * </li>
- * <li>
- * If the timestamp of an entity is before OR equal to the latest timestamp pulled, then the entity is not considered new. If the timestamp is later
- * than the last timestamp pulled, then the entity is considered new.
- * </li>
- * <li>
- * Entity must have a user-readable name that can be used for logging purposes.
- * </li>
- * </ul>
- * <p>
- * This class persists state across restarts so that even if NiFi is restarted, duplicates will not be pulled from the target system given the above criteria. This is
- * performed using the {@link StateManager}. This allows the system to be restarted and begin processing where it left off. The state that is stored is the latest timestamp
- * that has been pulled (as determined by the timestamps of the entities that are returned). See the section above for information about how this information isused in order to
- * determine new entities.
- * </p>
- * <p>
- * NOTE: This processor performs migrations of legacy state mechanisms inclusive of locally stored, file-based state and the optional utilization of the <code>Distributed Cache
- * Service</code> property to the new {@link StateManager} functionality. Upon successful migration, the associated data from one or both of the legacy mechanisms is purged.
- * </p>
- * <p>
- * For each new entity that is listed, the Processor will send a FlowFile to the 'success' relationship. The FlowFile will have no content but will have some set
- * of attributes (defined by the concrete implementation) that can be used to fetch those remote resources or interact with them in whatever way makes sense for
- * the configured dataflow.
- * </p>
- * <p>
- * Subclasses are responsible for the following:
- * </p>
- * <ul>
- * <li>
- * Perform a listing of remote resources. The subclass will implement the {@link #performListing(ProcessContext, Long)} method, which creates a listing of all
- * entities on the remote system that have timestamps later than the provided timestamp. If the entities returned have a timestamp before the provided one, those
- * entities will be filtered out. It is therefore not necessary to perform the filtering of timestamps but is provided in order to give the implementation the ability
- * to filter those resources on the server side rather than pulling back all of the information, if it makes sense to do so in the concrete implementation.
- * </li>
- * <li>
- * Creating a Map of attributes that are applicable for an entity. The attributes that are assigned to each FlowFile are exactly those returned by the
- * {@link #createAttributes(ListableEntity, ProcessContext)}.
- * </li>
- * <li>
- * Returning the configured path. Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
- * within that path. The {@link #getPath(ProcessContext)} method is responsible for returning the path that is currently being polled for entities. If this does concept
- * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
- * </li>
- * <li>
- * Determining when the listing must be cleared. It is sometimes necessary to clear state about which entities have already been ingested, as the result of a user
- * changing a property value. The {@link #isListingResetNecessary(PropertyDescriptor)} method is responsible for determining when the listing needs to be reset by returning
- * a boolean indicating whether or not a change in the value of the provided property should trigger the timestamp and identifier information to be cleared.
- * </li>
- * </ul>
- */
-@TriggerSerially
-@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "After a listing of resources is performed, the latest timestamp of any of the resources is stored in the component's state. "
-    + "The scope used depends on the implementation.")
-public abstract class AbstractListProcessor<T extends ListableEntity> extends AbstractProcessor {
-
-    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
-        .name("Distributed Cache Service")
-        .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from the remote server so that if a new node "
-            + "begins pulling data, it won't duplicate all of the work that has been done. If not specified, the information will not be shared across the cluster. "
-            + "This property does not need to be set for standalone instances of NiFi but should be configured if NiFi is run within a cluster.")
-        .required(false)
-        .identifiesControllerService(DistributedMapCacheClient.class)
-        .build();
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("All FlowFiles that are received are routed to success")
-        .build();
-
-    private volatile Long lastListingTime = null;
-    private volatile Long lastProcessedTime = 0L;
-    private volatile Long lastRunTime = 0L;
-    private volatile boolean justElectedPrimaryNode = false;
-    private volatile boolean resetState = false;
-
-    /*
-     * A constant used in determining an internal "yield" of processing files. Given the logic to provide a pause on the newest
-     * files according to timestamp, it is ensured that at least the specified millis has been eclipsed to avoid getting scheduled
-     * near instantaneously after the prior iteration effectively voiding the built in buffer
-     */
-    static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
-    static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
-    static final String PROCESSED_TIMESTAMP_KEY = "processed.timestamp";
-
-    protected File getPersistenceFile() {
-        return new File("conf/state/" + getIdentifier());
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(DISTRIBUTED_CACHE_SERVICE);
-        return properties;
-    }
-
-    @Override
-    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
-        if (isConfigurationRestored() && isListingResetNecessary(descriptor)) {
-            resetTimeStates(); // clear lastListingTime so that we have to fetch new time
-            resetState = true;
-        }
-    }
-
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        return relationships;
-    }
-
-    @OnPrimaryNodeStateChange
-    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
-        justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
-    }
-
-    @OnScheduled
-    public final void updateState(final ProcessContext context) throws IOException {
-        final String path = getPath(context);
-        final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-
-        // Check if state already exists for this path. If so, we have already migrated the state.
-        final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
-        if (stateMap.getVersion() == -1L) {
-            try {
-                // Migrate state from the old way of managing state (distributed cache service and local file)
-                // to the new mechanism (State Manager).
-                migrateState(path, client, context.getStateManager(), getStateScope(context));
-            } catch (final IOException ioe) {
-                throw new IOException("Failed to properly migrate state to State Manager", ioe);
-            }
-        }
-
-        // When scheduled to run, check if the associated timestamp is null, signifying a clearing of state and reset the internal timestamp
-        if (lastListingTime != null && stateMap.get(LISTING_TIMESTAMP_KEY) == null) {
-            getLogger().info("Detected that state was cleared for this component.  Resetting internal values.");
-            resetTimeStates();
-        }
-
-        if (resetState) {
-            context.getStateManager().clear(getStateScope(context));
-            resetState = false;
-        }
-    }
-
-    /**
-     * This processor used to use the DistributedMapCacheClient in order to store cluster-wide state, before the introduction of
-     * the StateManager. This method will migrate state from that DistributedMapCacheClient, or from a local file, to the StateManager,
-     * if any state already exists. More specifically, this will extract out the relevant timestamp for when the processor last ran
-     *
-     * @param path         the path to migrate state for
-     * @param client       the DistributedMapCacheClient that is capable of obtaining the current state
-     * @param stateManager the StateManager to use in order to store the new state
-     * @param scope        the scope to use
-     * @throws IOException if unable to retrieve or store the state
-     */
-    private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager, final Scope scope) throws IOException {
-        Long minTimestamp = null;
-
-        // Retrieve state from Distributed Cache Client, establishing the latest file seen
-        if (client != null) {
-            final StringSerDe serde = new StringSerDe();
-            final String serializedState = client.get(getKey(path), serde, serde);
-            if (serializedState != null && !serializedState.isEmpty()) {
-                final EntityListing listing = deserialize(serializedState);
-                minTimestamp = listing.getLatestTimestamp().getTime();
-            }
-
-            // remove entry from distributed cache server
-            if (client != null) {
-                try {
-                    client.remove(path, new StringSerDe());
-                } catch (final IOException ioe) {
-                    getLogger().warn("Failed to remove entry from Distributed Cache Service. However, the state has already been migrated to use the new "
-                        + "State Management service, so the Distributed Cache Service is no longer needed.");
-                }
-            }
-        }
-
-        // Retrieve state from locally persisted file, and compare these to the minTimestamp established from the distributedCache, if there was one
-        final File persistenceFile = getPersistenceFile();
-        if (persistenceFile.exists()) {
-            final Properties props = new Properties();
-
-            try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
-                props.load(fis);
-            }
-
-            final String locallyPersistedValue = props.getProperty(path);
-            if (locallyPersistedValue != null) {
-                final EntityListing listing = deserialize(locallyPersistedValue);
-                final long localTimestamp = listing.getLatestTimestamp().getTime();
-                // if the local file's latest timestamp is beyond that of the value provided from the cache, replace
-                if (minTimestamp == null || localTimestamp > minTimestamp) {
-                    minTimestamp = localTimestamp;
-                }
-            }
-
-            // delete the local file, since it is no longer needed
-            if (persistenceFile.exists() && !persistenceFile.delete()) {
-                getLogger().warn("Migrated state but failed to delete local persistence file");
-            }
-        }
-
-        if (minTimestamp != null) {
-            persist(minTimestamp, minTimestamp, stateManager, scope);
-        }
-    }
-
-    private void persist(final long listingTimestamp, final long processedTimestamp, final StateManager stateManager, final Scope scope) throws IOException {
-        final Map<String, String> updatedState = new HashMap<>(1);
-        updatedState.put(LISTING_TIMESTAMP_KEY, String.valueOf(listingTimestamp));
-        updatedState.put(PROCESSED_TIMESTAMP_KEY, String.valueOf(processedTimestamp));
-        stateManager.setState(updatedState, scope);
-    }
-
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
-    private EntityListing deserialize(final String serializedState) throws JsonParseException, JsonMappingException, IOException {
-        final ObjectMapper mapper = new ObjectMapper();
-        final JsonNode jsonNode = mapper.readTree(serializedState);
-        return mapper.readValue(jsonNode, EntityListing.class);
-    }
-
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        Long minTimestamp = lastListingTime;
-
-        if (this.lastListingTime == null || this.lastProcessedTime == null || justElectedPrimaryNode) {
-            try {
-                // Attempt to retrieve state from the state manager if a last listing was not yet established or
-                // if just elected the primary node
-                final StateMap stateMap = context.getStateManager().getState(getStateScope(context));
-                final String listingTimestampString = stateMap.get(LISTING_TIMESTAMP_KEY);
-                final String lastProcessedString= stateMap.get(PROCESSED_TIMESTAMP_KEY);
-                if (lastProcessedString != null) {
-                    this.lastProcessedTime = Long.parseLong(lastProcessedString);
-                }
-                if (listingTimestampString != null) {
-                    minTimestamp = Long.parseLong(listingTimestampString);
-                    // If our determined timestamp is the same as that of our last listing, skip this execution as there are no updates
-                    if (minTimestamp == this.lastListingTime) {
-                        context.yield();
-                        return;
-                    } else {
-                        this.lastListingTime = minTimestamp;
-                    }
-                }
-                justElectedPrimaryNode = false;
-            } catch (final IOException ioe) {
-                getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
-                context.yield();
-                return;
-            }
-        }
-
-        final List<T> entityList;
-        final long currentListingTimestamp = System.nanoTime();
-        try {
-            // track of when this last executed for consideration of the lag nanos
-            entityList = performListing(context, minTimestamp);
-        } catch (final IOException e) {
-            getLogger().error("Failed to perform listing on remote host due to {}", e);
-            context.yield();
-            return;
-        }
-
-        if (entityList == null || entityList.isEmpty()) {
-            context.yield();
-            return;
-        }
-
-        Long latestListingTimestamp = null;
-        final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
-
-        // Build a sorted map to determine the latest possible entries
-        for (final T entity : entityList) {
-            final long entityTimestamp = entity.getTimestamp();
-            // New entries are all those that occur at or after the associated timestamp
-            final boolean newEntry = minTimestamp == null || entityTimestamp >= minTimestamp && entityTimestamp > lastProcessedTime;
-
-            if (newEntry) {
-                List<T> entitiesForTimestamp = orderedEntries.get(entity.getTimestamp());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<T>();
-                    orderedEntries.put(entity.getTimestamp(), entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(entity);
-            }
-        }
-
-        int flowfilesCreated = 0;
-
-        if (orderedEntries.size() > 0) {
-            latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries previously seen,
-            // another iteration has occurred without new files and special handling is needed to avoid starvation
-            if (latestListingTimestamp.equals(lastListingTime)) {
-                /* We are done when either:
-                 *   - the latest listing timestamp is If we have not eclipsed the minimal listing lag needed due to being triggered too soon after the last run
-                 *   - the latest listing timestamp is equal to the last processed time, meaning we handled those items originally passed over
-                 */
-                if (System.nanoTime() - lastRunTime < LISTING_LAG_NANOS || latestListingTimestamp.equals(lastProcessedTime)) {
-                    context.yield();
-                    return;
-                }
-
-            } else if (latestListingTimestamp >= currentListingTimestamp - LISTING_LAG_NANOS) {
-                // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<T> timestampEntities : orderedEntries.values()) {
-                for (T entity : timestampEntities) {
-                    // Create the FlowFile for this path.
-                    final Map<String, String> attributes = createAttributes(entity, context);
-                    FlowFile flowFile = session.create();
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                    session.transfer(flowFile, REL_SUCCESS);
-                    flowfilesCreated++;
-                }
-            }
-        }
-
-        // As long as we have a listing timestamp, there is meaningful state to capture regardless of any outputs generated
-        if (latestListingTimestamp != null) {
-            boolean processedNewFiles = flowfilesCreated > 0;
-            if (processedNewFiles) {
-                // If there have been files created, update the last timestamp we processed
-                lastProcessedTime = orderedEntries.lastKey();
-                getLogger().info("Successfully created listing with {} new objects", new Object[]{flowfilesCreated});
-                session.commit();
-            }
-
-            lastRunTime = System.nanoTime();
-
-            if (!latestListingTimestamp.equals(lastListingTime) || processedNewFiles) {
-                // We have performed a listing and pushed any FlowFiles out that may have been generated
-                // Now, we need to persist state about the Last Modified timestamp of the newest file
-                // that we evaluated. We do this in order to avoid pulling in the same file twice.
-                // However, we want to save the state both locally and remotely.
-                // We store the state remotely so that if a new Primary Node is chosen, it can pick up where the
-                // previously Primary Node left off.
-                // We also store the state locally so that if the node is restarted, and the node cannot contact
-                // the distributed state cache, the node can continue to run (if it is primary node).
-                try {
-                    lastListingTime = latestListingTimestamp;
-                    persist(latestListingTimestamp, lastProcessedTime, context.getStateManager(), getStateScope(context));
-                } catch (final IOException ioe) {
-                    getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or "
-                        + "if another node begins executing this Processor, data duplication may occur.", ioe);
-                }
-            }
-
-        } else {
-            getLogger().debug("There is no data to list. Yielding.");
-            context.yield();
-
-            // lastListingTime = 0 so that we don't continually poll the distributed cache / local file system
-            if (lastListingTime == null) {
-                lastListingTime = 0L;
-            }
-
-            return;
-        }
-    }
-
-    private void resetTimeStates() {
-        lastListingTime = null;
-        lastProcessedTime = 0L;
-        lastRunTime = 0L;
-    }
-
-    /**
-     * Creates a Map of attributes that should be applied to the FlowFile to represent this entity. This processor will emit a FlowFile for each "new" entity
-     * (see the documentation for this class for a discussion of how this class determines whether or not an entity is "new"). The FlowFile will contain no
-     * content. The attributes that will be included are exactly the attributes that are returned by this method.
-     *
-     * @param entity  the entity represented by the FlowFile
-     * @param context the ProcessContext for obtaining configuration information
-     * @return a Map of attributes for this entity
-     */
-    protected abstract Map<String, String> createAttributes(T entity, ProcessContext context);
-
-    /**
-     * Returns the path to perform a listing on.
-     * Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as well as name or identifier that is unique only
-     * within that path. This method is responsible for returning the path that is currently being polled for entities. If this does concept
-     * does not apply for the concrete implementation, it is recommended that the concrete implementation return "." or "/" for all invocations of this method.
-     *
-     * @param context the ProcessContex to use in order to obtain configuration
-     * @return the path that is to be used to perform the listing, or <code>null</code> if not applicable.
-     */
-    protected abstract String getPath(final ProcessContext context);
-
-    /**
-     * Performs a listing of the remote entities that can be pulled. If any entity that is returned has already been "discovered" or "emitted"
-     * by this Processor, it will be ignored. A discussion of how the Processor determines those entities that have already been emitted is
-     * provided above in the documentation for this class. Any entity that is returned by this method with a timestamp prior to the minTimestamp
-     * will be filtered out by the Processor. Therefore, it is not necessary that implementations perform this filtering but can be more efficient
-     * if the filtering can be performed on the server side prior to retrieving the information.
-     *
-     * @param context      the ProcessContex to use in order to pull the appropriate entities
-     * @param minTimestamp the minimum timestamp of entities that should be returned.
-     * @return a Listing of entities that have a timestamp >= minTimestamp
-     */
-    protected abstract List<T> performListing(final ProcessContext context, final Long minTimestamp) throws IOException;
-
-    /**
-     * Determines whether or not the listing must be reset if the value of the given property is changed
-     *
-     * @param property the property that has changed
-     * @return <code>true</code> if a change in value of the given property necessitates that the listing be reset, <code>false</code> otherwise.
-     */
-    protected abstract boolean isListingResetNecessary(final PropertyDescriptor property);
-
-    /**
-     * Returns a Scope that specifies where the state should be managed for this Processor
-     *
-     * @param context the ProcessContext to use in order to make a determination
-     * @return a Scope that specifies where the state should be managed for this Processor
-     */
-    protected abstract Scope getStateScope(final ProcessContext context);
-
-
-    private static class StringSerDe implements Serializer<String>, Deserializer<String> {
-        @Override
-        public String deserialize(final byte[] value) throws DeserializationException, IOException {
-            if (value == null) {
-                return null;
-            }
-
-            return new String(value, StandardCharsets.UTF_8);
-        }
-
-        @Override
-        public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
-            out.write(value.getBytes(StandardCharsets.UTF_8));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
index 1f5ccdb..33d7867 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java
@@ -36,6 +36,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
 import org.apache.nifi.processors.standard.util.FileInfo;
 
 import java.io.File;

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
index f4557dc..b6932be 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
@@ -26,6 +26,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
 import org.apache.nifi.processors.standard.util.FileInfo;
 import org.apache.nifi.processors.standard.util.FileTransfer;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
deleted file mode 100644
index 2d9525f..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/EntityListing.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard.util;
-
-import java.util.Collection;
-import java.util.Date;
-
-import javax.xml.bind.annotation.XmlTransient;
-import javax.xml.bind.annotation.XmlType;
-
-/**
- * A simple POJO for maintaining state about the last entities listed by an AbstractListProcessor that was performed so that
- * we can avoid pulling the same file multiple times
- */
-@XmlType(name = "listing")
-public class EntityListing {
-
-    private Date latestTimestamp;
-    private Collection<String> matchingIdentifiers;
-
-    /**
-     * @return the modification date of the newest file that was contained in the listing
-     */
-    public Date getLatestTimestamp() {
-        return latestTimestamp;
-    }
-
-    /**
-     * Sets the timestamp of the modification date of the newest file that was contained in the listing
-     *
-     * @param latestTimestamp the timestamp of the modification date of the newest file that was contained in the listing
-     */
-    public void setLatestTimestamp(Date latestTimestamp) {
-        this.latestTimestamp = latestTimestamp;
-    }
-
-    /**
-     * @return a Collection containing the identifiers of all entities in the listing whose timestamp
-     *         was equal to {@link #getLatestTimestamp()}
-     */
-    @XmlTransient
-    public Collection<String> getMatchingIdentifiers() {
-        return matchingIdentifiers;
-    }
-
-    /**
-     * Sets the Collection containing the identifiers of all entities in the listing whose Timestamp was
-     * equal to {@link #getLatestTimestamp()}
-     *
-     * @param matchingIdentifiers the identifiers that have last modified date matching the latest timestamp
-     */
-    public void setMatchingIdentifiers(Collection<String> matchingIdentifiers) {
-        this.matchingIdentifiers = matchingIdentifiers;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
index b893f75..ca6648b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileInfo.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.standard.util;
 
 import java.io.Serializable;
 
+import org.apache.nifi.processor.util.list.ListableEntity;
+
 public class FileInfo implements Comparable<FileInfo>, Serializable, ListableEntity {
 
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d90fbc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java
deleted file mode 100644
index 6e019ff..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ListableEntity.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard.util;
-
-public interface ListableEntity {
-
-    /**
-     * @return The name of the remote entity
-     */
-    String getName();
-
-    /**
-     * @return the identifier of the remote entity. This may or may not be the same as the name of the
-     *         entity but should be unique across all entities.
-     */
-    String getIdentifier();
-
-
-    /**
-     * @return the timestamp for this entity so that we can be efficient about not performing listings of the same
-     *         entities multiple times
-     */
-    long getTimestamp();
-
-}