You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/12/02 12:54:52 UTC

nifi git commit: NIFI-3141: Fixed TailFile ArrayIndexOutOfBounds.

Repository: nifi
Updated Branches:
  refs/heads/master afe742700 -> 8da38acf3


NIFI-3141: Fixed TailFile ArrayIndexOutOfBounds.

- Added unit test cases to simulate NiFi version update which fails without this fix.
- Added state object migration code, add file.0. prefix to state keys,
  and add length from stored position.

This closes #1289


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8da38acf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8da38acf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8da38acf

Branch: refs/heads/master
Commit: 8da38acf31688569bcc0a1d79c2f90d2e4e535d4
Parents: afe7427
Author: Koji Kawamura <ij...@apache.org>
Authored: Fri Dec 2 11:23:33 2016 +0900
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Fri Dec 2 07:53:04 2016 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/TailFile.java      | 28 ++++++
 .../nifi/processors/standard/TestTailFile.java  | 94 ++++++++++++++++++++
 2 files changed, 122 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8da38acf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 3553ce8..c5fcefb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -345,6 +345,25 @@ public class TailFile extends AbstractProcessor {
 
         Map<String, String> statesMap = stateMap.toMap();
 
+        if (statesMap.containsKey(TailFileState.StateKeys.FILENAME)
+                && !statesMap.keySet().stream().anyMatch(key -> key.startsWith(MAP_PREFIX))) {
+            // If statesMap contains "filename" key without "file.0." prefix,
+            // and there's no key with "file." prefix, then
+            // it indicates that the statesMap is created with earlier version of NiFi.
+            // In this case, we need to migrate the state by adding prefix indexed with 0.
+            final Map<String, String> migratedStatesMap = new HashMap<>(statesMap.size());
+            for (String key : statesMap.keySet()) {
+                migratedStatesMap.put(MAP_PREFIX + "0." + key, statesMap.get(key));
+            }
+
+            // LENGTH is added from NiFi 1.1.0. Set the value with using the last position so that we can use existing state
+            // to avoid sending duplicated log data after updating NiFi.
+            migratedStatesMap.put(MAP_PREFIX + "0." + TailFileState.StateKeys.LENGTH, statesMap.get(TailFileState.StateKeys.POSITION));
+            statesMap = Collections.unmodifiableMap(migratedStatesMap);
+
+            getLogger().info("statesMap has been migrated. {}", new Object[]{migratedStatesMap});
+        }
+
         initStates(filesToTail, statesMap, false);
         recoverState(context, filesToTail, statesMap);
     }
@@ -931,6 +950,15 @@ public class TailFile extends AbstractProcessor {
             Map<String, String> updatedState = new HashMap<String, String>();
 
             for(String key : oldState.toMap().keySet()) {
+                // These states are stored by older version of NiFi, and won't be used anymore.
+                // New states have 'file.<index>.' prefix.
+                if (TailFileState.StateKeys.CHECKSUM.equals(key)
+                        || TailFileState.StateKeys.FILENAME.equals(key)
+                        || TailFileState.StateKeys.POSITION.equals(key)
+                        || TailFileState.StateKeys.TIMESTAMP.equals(key)) {
+                    getLogger().info("Removed state {}={} stored by older version of NiFi.", new Object[]{key, oldState.get(key)});
+                    continue;
+                }
                 updatedState.put(key, oldState.get(key));
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8da38acf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index 0cda3f0..efd314c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -18,16 +18,26 @@ package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.io.RandomAccessFile;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 import java.util.regex.Pattern;
 
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.processors.standard.TailFile.TailFileState;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -39,6 +49,7 @@ import org.junit.Test;
 public class TestTailFile {
 
     private File file;
+    private File existingFile;
     private File otherFile;
 
     private RandomAccessFile raf;
@@ -56,6 +67,19 @@ public class TestTailFile {
         file.delete();
         assertTrue(file.createNewFile());
 
+        existingFile = new File("target/existing-log.txt");
+        existingFile.delete();
+        assertTrue(existingFile.createNewFile());
+        try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(existingFile)))) {
+            writer.write("Line 1");
+            writer.newLine();
+            writer.write("Line 2");
+            writer.newLine();
+            writer.write("Line 3");
+            writer.newLine();
+            writer.flush();
+        }
+
         File directory = new File("target/testDir");
         if(!directory.exists()) {
             assertTrue(directory.mkdirs());
@@ -812,6 +836,76 @@ public class TestTailFile {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testMigrateFrom100To110() throws IOException {
+
+        runner.setProperty(TailFile.FILENAME, "target/existing-log.txt");
+
+        final MockStateManager stateManager = runner.getStateManager();
+
+        // Before NiFi 1.1.0, TailFile only handles single file
+        // and state key doesn't have index in it.
+        final Map<String, String> state = new HashMap<>();
+        state.put("filename", "target/existing-log.txt");
+        // Simulate that it has been tailed up to the 2nd line.
+        state.put("checksum", "2279929157");
+        state.put("position", "14");
+        state.put("timestamp", "1480639134000");
+        stateManager.setState(state, Scope.LOCAL);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).iterator().next();
+
+        final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try (final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(bos))) {
+            writer.write("Line 3");
+            writer.newLine();
+        }
+
+        flowFile.assertContentEquals(bos.toByteArray());
+
+        // The old states should be replaced with new ones.
+        final StateMap updatedState = stateManager.getState(Scope.LOCAL);
+        assertNull(updatedState.get("filename"));
+        assertNull(updatedState.get("checksum"));
+        assertNull(updatedState.get("position"));
+        assertNull(updatedState.get("timestamp"));
+        assertEquals("target/existing-log.txt", updatedState.get("file.0.filename"));
+        assertEquals("3380848603", updatedState.get("file.0.checksum"));
+        assertEquals("21", updatedState.get("file.0.position"));
+        assertNotNull(updatedState.get("file.0.timestamp"));
+
+        // When it runs again, the state is already migrated, so it shouldn't emit any flow files.
+        runner.clearTransferState();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+    }
+
+
+    @Test
+    public void testMigrateFrom100To110FileNotFound() throws IOException {
+
+        runner.setProperty(TailFile.FILENAME, "target/not-existing-log.txt");
+
+        final MockStateManager stateManager = runner.getStateManager();
+
+        // Before NiFi 1.1.0, TailFile only handles single file
+        // and state key doesn't have index in it.
+        final Map<String, String> state = new HashMap<>();
+        state.put("filename", "target/not-existing-log.txt");
+        // Simulate that it has been tailed up to the 2nd line.
+        state.put("checksum", "2279929157");
+        state.put("position", "14");
+        state.put("timestamp", "1480639134000");
+        stateManager.setState(state, Scope.LOCAL);
+
+        runner.run();
+
+        runner.assertTransferCount(TailFile.REL_SUCCESS, 0);
+    }
+
     private void cleanFiles(String directory) {
         final File targetDir = new File(directory);
         if(targetDir.exists()) {