You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2019/08/28 08:33:47 UTC
[nifi] branch master updated: NIFI-6595: Fixed bug in TailFile that
caused it not to properly honor the Initial Start Position after state has
been restored
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 3c9426d NIFI-6595: Fixed bug in TailFile that caused it not to properly honor the Initial Start Position after state has been restored
3c9426d is described below
commit 3c9426d287d34a617496fc75b1e1ffde485da2e4
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Aug 27 16:14:27 2019 -0400
NIFI-6595: Fixed bug in TailFile that caused it not to properly honor the Initial Start Position after state has been restored
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #3675.
---
.../apache/nifi/processors/standard/TailFile.java | 60 ++++++------
.../nifi/processors/standard/TestTailFile.java | 108 ++++++++++++++++++---
2 files changed, 124 insertions(+), 44 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index bc9c476..22cc78e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -118,11 +118,6 @@ public class TailFile extends AbstractProcessor {
"In this mode, the 'Files to tail' property accepts a regular expression and the processor will look"
+ " for files in 'Base directory' to list the files to tail by the processor.");
- static final AllowableValue FIXED_NAME = new AllowableValue("Fixed name", "Fixed name", "With this rolling strategy, the files "
- + "where the log messages are appended have always the same name.");
- static final AllowableValue CHANGING_NAME = new AllowableValue("Changing name", "Changing name", "With this rolling strategy, "
- + "the files where the log messages are appended have not a fixed name (for example: filename contaning the current day.");
-
static final AllowableValue START_BEGINNING_OF_TIME = new AllowableValue("Beginning of Time", "Beginning of Time",
"Start with the oldest data that matches the Rolling Filename Pattern and then begin reading from the File to Tail");
static final AllowableValue START_CURRENT_FILE = new AllowableValue("Beginning of File", "Beginning of File",
@@ -322,9 +317,11 @@ public class TailFile extends AbstractProcessor {
final Scope scope = getStateScope(context);
final StateMap stateMap = context.getStateManager().getState(scope);
- if (stateMap.getVersion() == -1L) {
+ final String startPosition = context.getProperty(START_POSITION).getValue();
+
+ if (stateMap.getVersion() == -1L || stateMap.toMap().isEmpty()) {
//state has been cleared or never stored so recover as 'empty state'
- initStates(filesToTail, Collections.emptyMap(), true);
+ initStates(filesToTail, Collections.emptyMap(), true, startPosition);
recoverState(context, filesToTail, Collections.emptyMap());
return;
}
@@ -350,23 +347,23 @@ public class TailFile extends AbstractProcessor {
getLogger().info("statesMap has been migrated. {}", new Object[]{migratedStatesMap});
}
- initStates(filesToTail, statesMap, false);
+ initStates(filesToTail, statesMap, false, startPosition);
recoverState(context, filesToTail, statesMap);
}
- private void initStates(List<String> filesToTail, Map<String, String> statesMap, boolean isCleared) {
- int i = 0;
+ private void initStates(final List<String> filesToTail, final Map<String, String> statesMap, final boolean isCleared, final String startPosition) {
+ int fileIndex = 0;
- if(isCleared) {
+ if (isCleared) {
states.clear();
} else {
// we have to deal with the case where NiFi has been restarted. In this
// case 'states' object is empty but the statesMap is not. So we have to
// put back the files we already know about in 'states' object before
// doing the recovery
- if(states.isEmpty() && !statesMap.isEmpty()) {
- for(String key : statesMap.keySet()) {
- if(key.endsWith(TailFileState.StateKeys.FILENAME)) {
+ if( states.isEmpty() && !statesMap.isEmpty()) {
+ for (String key : statesMap.keySet()) {
+ if (key.endsWith(TailFileState.StateKeys.FILENAME)) {
int index = Integer.valueOf(key.split("\\.")[1]);
states.put(statesMap.get(key), new TailFileObject(index, statesMap));
}
@@ -374,8 +371,8 @@ public class TailFile extends AbstractProcessor {
}
// first, we remove the files that are no longer present
- List<String> toBeRemoved = new ArrayList<String>();
- for(String file : states.keySet()) {
+ final List<String> toBeRemoved = new ArrayList<String>();
+ for (String file : states.keySet()) {
if(!filesToTail.contains(file)) {
toBeRemoved.add(file);
cleanReader(states.get(file));
@@ -385,21 +382,22 @@ public class TailFile extends AbstractProcessor {
// then we need to get the highest ID used so far to be sure
// we don't mix different files in case we add new files to tail
- for(String file : states.keySet()) {
- if(i <= states.get(file).getFilenameIndex()) {
- i = states.get(file).getFilenameIndex() + 1;
+ for (String file : states.keySet()) {
+ if (fileIndex <= states.get(file).getFilenameIndex()) {
+ fileIndex = states.get(file).getFilenameIndex() + 1;
}
}
}
- for (String file : filesToTail) {
- if(isCleared || !states.containsKey(file)) {
- states.put(file, new TailFileObject(i));
- i++;
+ for (String filename : filesToTail) {
+ if (isCleared || !states.containsKey(filename)) {
+ final TailFileState tailFileState = new TailFileState(filename, null, null, 0L, 0L, 0L, null, ByteBuffer.allocate(65536));
+ states.put(filename, new TailFileObject(fileIndex, tailFileState, true));
+
+ fileIndex++;
}
}
-
}
private void recoverState(final ProcessContext context, final List<String> filesToTail, final Map<String, String> map) throws IOException {
@@ -585,7 +583,7 @@ public class TailFile extends AbstractProcessor {
final List<String> filesToTail = lookup(context);
final Scope scope = getStateScope(context);
final StateMap stateMap = context.getStateManager().getState(scope);
- initStates(filesToTail, stateMap.toMap(), false);
+ initStates(filesToTail, stateMap.toMap(), false, context.getProperty(START_POSITION).getValue());
} catch (IOException e) {
getLogger().error("Exception raised while attempting to recover state about where the tailing last left off", e);
context.yield();
@@ -641,7 +639,7 @@ public class TailFile extends AbstractProcessor {
final Checksum checksum = new CRC32();
final long position = file.length();
- final long timestamp = file.lastModified();
+ final long timestamp = file.lastModified() + 1;
try (final InputStream fis = new FileInputStream(file);
final CheckedInputStream in = new CheckedInputStream(fis, checksum)) {
@@ -1229,8 +1227,14 @@ public class TailFile extends AbstractProcessor {
private int filenameIndex;
private boolean tailFileChanged = true;
- public TailFileObject(int i) {
- this.filenameIndex = i;
+ public TailFileObject(int index) {
+ this.filenameIndex = index;
+ }
+
+ public TailFileObject(final int index, final TailFileState fileState, final boolean tailFileChanged) {
+ this.filenameIndex = index;
+ this.tailFileChanged = true;
+ this.state = fileState;
}
public TailFileObject(int index, Map<String, String> statesMap) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index 4c82703..97e1688 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -16,11 +16,17 @@
*/
package org.apache.nifi.processors.standard;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.processors.standard.TailFile.TailFileState;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
@@ -31,25 +37,22 @@ import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
-import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.processors.standard.TailFile.TailFileState;
-import org.apache.nifi.state.MockStateManager;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.After;
-import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
-import org.junit.Before;
-import org.junit.Test;
public class TestTailFile {
@@ -113,8 +116,81 @@ public class TestTailFile {
}
processor.cleanup();
+
+ final File[] files = file.getParentFile().listFiles();
+ if (files != null) {
+ for (final File file : files) {
+ if (file.getName().endsWith(".log")) {
+ file.delete();
+ }
+ }
+ }
}
+
+ @Test
+ public void testRotateMultipleBeforeConsuming() throws IOException {
+ runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
+ runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE.getValue());
+
+ raf.write("1\n".getBytes());
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
+ raf.write("1.5\n".getBytes());
+ rollover(0);
+ raf.write("2\n".getBytes());
+ rollover(1);
+ raf.write("3\n".getBytes());
+ rollover(2);
+ raf.write("4\n".getBytes());
+
+ rollover(3);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 5);
+ final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS);
+ final Set<String> lines = flowFiles.stream().map(MockFlowFile::toByteArray).map(String::new).collect(Collectors.toSet());
+ assertEquals(5, lines.size());
+ assertTrue(lines.contains("1\n"));
+ assertTrue(lines.contains("1.5\n"));
+ assertTrue(lines.contains("2\n"));
+ assertTrue(lines.contains("3\n"));
+ assertTrue(lines.contains("4\n"));
+
+ runner.clearTransferState();
+ }
+
+
+ @Test
+ public void testStartPositionCurrentTime() throws IOException {
+ raf.write("1\n".getBytes());
+ rollover(0);
+ raf.write("2\n".getBytes());
+ rollover(1);
+ raf.write("3\n4\n5\n".getBytes());
+
+ runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_TIME.getValue());
+ runner.run();
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+ raf.write("6\n".getBytes());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0);
+ out.assertContentEquals("6\n");
+ }
+
+ private void rollover(final int index) throws IOException {
+ raf.close();
+ file.renameTo(new File(file.getParentFile(), file.getName() + "." + index + ".log"));
+ raf = new RandomAccessFile(file, "rw");
+ }
+
+
@Test
public void testConsumeAfterTruncationStartAtBeginningOfFile() throws IOException, InterruptedException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");