You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2019/08/28 08:33:47 UTC

[nifi] branch master updated: NIFI-6595: Fixed bug in TailFile that caused it not to properly honor the Initial Start Position after state has been restored

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c9426d  NIFI-6595: Fixed bug in TailFile that caused it not to properly honor the Initial Start Position after state has been restored
3c9426d is described below

commit 3c9426d287d34a617496fc75b1e1ffde485da2e4
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Aug 27 16:14:27 2019 -0400

    NIFI-6595: Fixed bug in TailFile that caused it not to properly honor the Initial Start Position after state has been restored
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #3675.
---
 .../apache/nifi/processors/standard/TailFile.java  |  60 ++++++------
 .../nifi/processors/standard/TestTailFile.java     | 108 ++++++++++++++++++---
 2 files changed, 124 insertions(+), 44 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index bc9c476..22cc78e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -118,11 +118,6 @@ public class TailFile extends AbstractProcessor {
             "In this mode, the 'Files to tail' property accepts a regular expression and the processor will look"
             + " for files in 'Base directory' to list the files to tail by the processor.");
 
-    static final AllowableValue FIXED_NAME = new AllowableValue("Fixed name", "Fixed name", "With this rolling strategy, the files "
-            + "where the log messages are appended have always the same name.");
-    static final AllowableValue CHANGING_NAME = new AllowableValue("Changing name", "Changing name", "With this rolling strategy, "
-            + "the files where the log messages are appended have not a fixed name (for example: filename contaning the current day.");
-
     static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time",
             "Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail");
     static final AllowableValue START_CURRENT_FILE = new AllowableValue("Beginning of File", "Beginning of File",
@@ -322,9 +317,11 @@ public class TailFile extends AbstractProcessor {
         final Scope scope = getStateScope(context);
         final StateMap stateMap = context.getStateManager().getState(scope);
 
-        if (stateMap.getVersion() == -1L) {
+        final String startPosition = context.getProperty(START_POSITION).getValue();
+
+        if (stateMap.getVersion() == -1L || stateMap.toMap().isEmpty()) {
             //state has been cleared or never stored so recover as 'empty state'
-            initStates(filesToTail, Collections.emptyMap(), true);
+            initStates(filesToTail, Collections.emptyMap(), true, startPosition);
             recoverState(context, filesToTail, Collections.emptyMap());
             return;
         }
@@ -350,23 +347,23 @@ public class TailFile extends AbstractProcessor {
             getLogger().info("statesMap has been migrated. {}", new Object[]{migratedStatesMap});
         }
 
-        initStates(filesToTail, statesMap, false);
+        initStates(filesToTail, statesMap, false, startPosition);
         recoverState(context, filesToTail, statesMap);
     }
 
-    private void initStates(List<String> filesToTail, Map<String, String> statesMap, boolean isCleared) {
-        int i = 0;
+    private void initStates(final List<String> filesToTail, final Map<String, String> statesMap, final boolean isCleared, final String startPosition) {
+        int fileIndex = 0;
 
-        if(isCleared) {
+        if (isCleared) {
             states.clear();
         } else {
             // we have to deal with the case where NiFi has been restarted. In this
             // case 'states' object is empty but the statesMap is not. So we have to
             // put back the files we already know about in 'states' object before
             // doing the recovery
-            if(states.isEmpty() && !statesMap.isEmpty()) {
-                for(String key : statesMap.keySet()) {
-                    if(key.endsWith(TailFileState.StateKeys.FILENAME)) {
+            if( states.isEmpty() && !statesMap.isEmpty()) {
+                for (String key : statesMap.keySet()) {
+                    if (key.endsWith(TailFileState.StateKeys.FILENAME)) {
                         int index = Integer.valueOf(key.split("\\.")[1]);
                         states.put(statesMap.get(key), new TailFileObject(index, statesMap));
                     }
@@ -374,8 +371,8 @@ public class TailFile extends AbstractProcessor {
             }
 
             // first, we remove the files that are no longer present
-            List<String> toBeRemoved = new ArrayList<String>();
-            for(String file : states.keySet()) {
+            final List<String> toBeRemoved = new ArrayList<String>();
+            for (String file : states.keySet()) {
                 if(!filesToTail.contains(file)) {
                     toBeRemoved.add(file);
                     cleanReader(states.get(file));
@@ -385,21 +382,22 @@ public class TailFile extends AbstractProcessor {
 
             // then we need to get the highest ID used so far to be sure
             // we don't mix different files in case we add new files to tail
-            for(String file : states.keySet()) {
-                if(i <= states.get(file).getFilenameIndex()) {
-                    i = states.get(file).getFilenameIndex() + 1;
+            for (String file : states.keySet()) {
+                if (fileIndex <= states.get(file).getFilenameIndex()) {
+                    fileIndex = states.get(file).getFilenameIndex() + 1;
                 }
             }
 
         }
 
-        for (String file : filesToTail) {
-            if(isCleared || !states.containsKey(file)) {
-                states.put(file, new TailFileObject(i));
-                i++;
+        for (String filename : filesToTail) {
+            if (isCleared || !states.containsKey(filename)) {
+                final TailFileState tailFileState = new TailFileState(filename, null, null, 0L, 0L, 0L, null, ByteBuffer.allocate(65536));
+                states.put(filename, new TailFileObject(fileIndex, tailFileState, true));
+
+                fileIndex++;
             }
         }
-
     }
 
     private void recoverState(final ProcessContext context, final List<String> filesToTail, final Map<String, String> map) throws IOException {
@@ -585,7 +583,7 @@ public class TailFile extends AbstractProcessor {
                     final List<String> filesToTail = lookup(context);
                     final Scope scope = getStateScope(context);
                     final StateMap stateMap = context.getStateManager().getState(scope);
-                    initStates(filesToTail, stateMap.toMap(), false);
+                    initStates(filesToTail, stateMap.toMap(), false, context.getProperty(START_POSITION).getValue());
                 } catch (IOException e) {
                     getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", e);
                     context.yield();
@@ -641,7 +639,7 @@ public class TailFile extends AbstractProcessor {
 
                     final Checksum checksum = new CRC32();
                     final long position = file.length();
-                    final long timestamp = file.lastModified();
+                    final long timestamp = file.lastModified() + 1;
 
                     try (final InputStream fis = new FileInputStream(file);
                             final CheckedInputStream in = new CheckedInputStream(fis, checksum)) {
@@ -1229,8 +1227,14 @@ public class TailFile extends AbstractProcessor {
         private int filenameIndex;
         private boolean tailFileChanged = true;
 
-        public TailFileObject(int i) {
-            this.filenameIndex = i;
+        public TailFileObject(int index) {
+            this.filenameIndex = index;
+        }
+
+        public TailFileObject(final int index, final TailFileState fileState, final boolean tailFileChanged) {
+            this.filenameIndex = index;
+            this.tailFileChanged = true;
+            this.state = fileState;
         }
 
         public TailFileObject(int index, Map<String, String> statesMap) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index 4c82703..97e1688 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -16,11 +16,17 @@
  */
 package org.apache.nifi.processors.standard;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.processors.standard.TailFile.TailFileState;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
@@ -31,25 +37,22 @@ import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.RandomAccessFile;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
-import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.processors.standard.TailFile.TailFileState;
-import org.apache.nifi.state.MockStateManager;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeFalse;
-import org.junit.Before;
-import org.junit.Test;
 
 public class TestTailFile {
 
@@ -113,8 +116,81 @@ public class TestTailFile {
         }
 
         processor.cleanup();
+
+        final File[] files = file.getParentFile().listFiles();
+        if (files != null) {
+            for (final File file : files) {
+                if (file.getName().endsWith(".log")) {
+                    file.delete();
+                }
+            }
+        }
     }
 
+
+    @Test
+    public void testRotateMultipleBeforeConsuming() throws IOException {
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
+        runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE.getValue());
+
+        raf.write("1\n".getBytes());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
+        raf.write("1.5\n".getBytes());
+        rollover(0);
+        raf.write("2\n".getBytes());
+        rollover(1);
+        raf.write("3\n".getBytes());
+        rollover(2);
+        raf.write("4\n".getBytes());
+
+        rollover(3);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 5);
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS);
+        final Set<String> lines = flowFiles.stream().map(MockFlowFile::toByteArray).map(String::new).collect(Collectors.toSet());
+        assertEquals(5, lines.size());
+        assertTrue(lines.contains("1\n"));
+        assertTrue(lines.contains("1.5\n"));
+        assertTrue(lines.contains("2\n"));
+        assertTrue(lines.contains("3\n"));
+        assertTrue(lines.contains("4\n"));
+
+        runner.clearTransferState();
+    }
+
+
+    @Test
+    public void testStartPositionCurrentTime() throws IOException {
+        raf.write("1\n".getBytes());
+        rollover(0);
+        raf.write("2\n".getBytes());
+        rollover(1);
+        raf.write("3\n4\n5\n".getBytes());
+
+        runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("6\n".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0);
+        out.assertContentEquals("6\n");
+    }
+
+    private void rollover(final int index) throws IOException {
+        raf.close();
+        file.renameTo(new File(file.getParentFile(), file.getName() + "." + index + ".log"));
+        raf = new RandomAccessFile(file, "rw");
+    }
+
+
     @Test
     public void testConsumeAfterTruncationStartAtBeginningOfFile() throws IOException, InterruptedException {
         runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");