You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2016/02/07 23:40:04 UTC

nifi git commit: NIFI-1483 Correcting logic in terms of when local persistence files are removed during the migration process.

Repository: nifi
Updated Branches:
  refs/heads/master ea5818c39 -> 2673370cb


NIFI-1483 Correcting logic in terms of when local persistence files are removed during the migration process.

Reviewed by Tony Kurc (tkurc@apache.org). This closes #206


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2673370c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2673370c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2673370c

Branch: refs/heads/master
Commit: 2673370cba9678e879cc78899a62f99bff611f9a
Parents: ea5818c
Author: Aldrin Piri <al...@apache.org>
Authored: Sat Feb 6 18:38:00 2016 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sun Feb 7 17:40:12 2016 -0500

----------------------------------------------------------------------
 .../standard/AbstractListProcessor.java         | 11 ++--
 .../standard/TestAbstractListProcessor.java     | 60 +++++++++++++++++++-
 2 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2673370c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index 246f71a..b04deb3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -218,12 +218,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
             }
         }
 
-        // delete the local file, since it is no longer needed
-        final File localFile = new File(path);
-        if (localFile.exists() && !localFile.delete()) {
-            getLogger().warn("Migrated state but failed to delete local persistence file");
-        }
-
         // remove entry from Distributed cache server
         if (client != null) {
             try {
@@ -285,6 +279,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
                     latestIdentifiersListed.addAll(listing.getMatchingIdentifiers());
                 }
             }
+
+            // delete the local file, since it is no longer needed
+            if (persistenceFile.exists() && !persistenceFile.delete()) {
+                getLogger().warn("Migrated state but failed to delete local persistence file");
+            }
         }
 
         if (minTimestamp != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/2673370c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
index 3a432e7..7544eb8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -29,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.commons.io.Charsets;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
@@ -42,10 +44,16 @@ import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class TestAbstractListProcessor {
 
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder();
+
     @Test
     public void testOnlyNewEntriesEmitted() {
         final ConcreteListProcessor proc = new ConcreteListProcessor();
@@ -121,7 +129,7 @@ public class TestAbstractListProcessor {
     }
 
     @Test
-    public void testStateMigrated() throws InitializationException {
+    public void testStateMigratedFromCacheService() throws InitializationException {
         final ConcreteListProcessor proc = new ConcreteListProcessor();
         final TestRunner runner = TestRunners.newTestRunner(proc);
         final DistributedCache cache = new DistributedCache();
@@ -143,6 +151,50 @@ public class TestAbstractListProcessor {
     }
 
     @Test
+    public void testNoStateToMigrate() throws Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        runner.run();
+
+        final MockStateManager stateManager = runner.getStateManager();
+        final Map<String, String> expectedState = new HashMap<>();
+        stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
+    }
+
+    @Test
+    public void testStateMigratedFromLocalFile() throws Exception {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+
+        // Create a file that we will populate with the desired state
+        File persistenceFile = testFolder.newFile(proc.persistenceFilename);
+        // Override the processor's internal persistence file
+        proc.persistenceFile = persistenceFile;
+
+        // Local File persistence was a properties file format of <key>=<JSON entity listing representation>
+        // Our ConcreteListProcessor is centered around files which are provided for a given path
+        final String serviceState = proc.getPath(runner.getProcessContext()) + "={\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}";
+
+        // Create a persistence file of the format anticipated
+        try (FileOutputStream fos = new FileOutputStream(persistenceFile);) {
+            fos.write(serviceState.getBytes(Charsets.UTF_8));
+        }
+
+        runner.run();
+
+        // Verify the local persistence file is removed
+        Assert.assertTrue("Failed to remove persistence file", !persistenceFile.exists());
+
+        // Verify the state manager now maintains the associated state
+        final Map<String, String> expectedState = new HashMap<>();
+        expectedState.put(AbstractListProcessor.TIMESTAMP, "1492");
+        expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id");
+
+        runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
+    }
+
+    @Test
     public void testFetchOnStart() throws InitializationException {
         final ConcreteListProcessor proc = new ConcreteListProcessor();
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -239,9 +291,13 @@ public class TestAbstractListProcessor {
     private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> {
         private final List<ListableEntity> entities = new ArrayList<>();
 
+        public final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json";
+        public String persistenceFolder = "target/";
+        public File persistenceFile = new File(persistenceFolder + persistenceFilename);
+
         @Override
         protected File getPersistenceFile() {
-            return new File("target/ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json");
+            return persistenceFile;
         }
 
         public void addEntity(final String name, final String identifier, final long timestamp) {