You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2016/02/07 23:40:04 UTC
nifi git commit: NIFI-1483 Correcting logic in terms of when local
persistence files are removed during the migration process.
Repository: nifi
Updated Branches:
refs/heads/master ea5818c39 -> 2673370cb
NIFI-1483 Correcting logic in terms of when local persistence files are removed during the migration process.
Reviewed by Tony Kurc (tkurc@apache.org). This closes #206
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2673370c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2673370c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2673370c
Branch: refs/heads/master
Commit: 2673370cba9678e879cc78899a62f99bff611f9a
Parents: ea5818c
Author: Aldrin Piri <al...@apache.org>
Authored: Sat Feb 6 18:38:00 2016 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Sun Feb 7 17:40:12 2016 -0500
----------------------------------------------------------------------
.../standard/AbstractListProcessor.java | 11 ++--
.../standard/TestAbstractListProcessor.java | 60 +++++++++++++++++++-
2 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/2673370c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index 246f71a..b04deb3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -218,12 +218,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
}
}
- // delete the local file, since it is no longer needed
- final File localFile = new File(path);
- if (localFile.exists() && !localFile.delete()) {
- getLogger().warn("Migrated state but failed to delete local persistence file");
- }
-
// remove entry from Distributed cache server
if (client != null) {
try {
@@ -285,6 +279,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
latestIdentifiersListed.addAll(listing.getMatchingIdentifiers());
}
}
+
+ // delete the local file, since it is no longer needed
+ if (persistenceFile.exists() && !persistenceFile.delete()) {
+ getLogger().warn("Migrated state but failed to delete local persistence file");
+ }
}
if (minTimestamp != null) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/2673370c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
index 3a432e7..7544eb8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.commons.io.Charsets;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
@@ -42,10 +44,16 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
public class TestAbstractListProcessor {
+ @Rule
+ public final TemporaryFolder testFolder = new TemporaryFolder();
+
@Test
public void testOnlyNewEntriesEmitted() {
final ConcreteListProcessor proc = new ConcreteListProcessor();
@@ -121,7 +129,7 @@ public class TestAbstractListProcessor {
}
@Test
- public void testStateMigrated() throws InitializationException {
+ public void testStateMigratedFromCacheService() throws InitializationException {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
final DistributedCache cache = new DistributedCache();
@@ -143,6 +151,50 @@ public class TestAbstractListProcessor {
}
@Test
+ public void testNoStateToMigrate() throws Exception {
+ final ConcreteListProcessor proc = new ConcreteListProcessor();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+
+ runner.run();
+
+ final MockStateManager stateManager = runner.getStateManager();
+ final Map<String, String> expectedState = new HashMap<>();
+ stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
+ }
+
+ @Test
+ public void testStateMigratedFromLocalFile() throws Exception {
+ final ConcreteListProcessor proc = new ConcreteListProcessor();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+
+ // Create a file that we will populate with the desired state
+ File persistenceFile = testFolder.newFile(proc.persistenceFilename);
+ // Override the processor's internal persistence file
+ proc.persistenceFile = persistenceFile;
+
+ // Local File persistence was a properties file format of <key>=<JSON entity listing representation>
+ // Our ConcreteListProcessor is centered around files which are provided for a given path
+ final String serviceState = proc.getPath(runner.getProcessContext()) + "={\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}";
+
+ // Create a persistence file of the format anticipated
+ try (FileOutputStream fos = new FileOutputStream(persistenceFile);) {
+ fos.write(serviceState.getBytes(Charsets.UTF_8));
+ }
+
+ runner.run();
+
+ // Verify the local persistence file is removed
+ Assert.assertTrue("Failed to remove persistence file", !persistenceFile.exists());
+
+ // Verify the state manager now maintains the associated state
+ final Map<String, String> expectedState = new HashMap<>();
+ expectedState.put(AbstractListProcessor.TIMESTAMP, "1492");
+ expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id");
+
+ runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
+ }
+
+ @Test
public void testFetchOnStart() throws InitializationException {
final ConcreteListProcessor proc = new ConcreteListProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -239,9 +291,13 @@ public class TestAbstractListProcessor {
private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> {
private final List<ListableEntity> entities = new ArrayList<>();
+ public final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json";
+ public String persistenceFolder = "target/";
+ public File persistenceFile = new File(persistenceFolder + persistenceFilename);
+
@Override
protected File getPersistenceFile() {
- return new File("target/ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json");
+ return persistenceFile;
}
public void addEntity(final String name, final String identifier, final long timestamp) {