You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2021/03/31 11:43:52 UTC

[nifi] branch main updated: NIFI-8344: If configured to tail a file for some period of time post-rollover, ensure that we only consume lines that are fully written (i.e., end with a newline). Once we stop tailing that file post-rollover, consume any data from that file that has not yet been consumed, up to the end of the file, even if there is no newline.

This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2edf551  NIFI-8344: If configured to tail a file for some period of time post-rollover, ensure that we only consume lines that are fully written (i.e., end with a newline). Once we stop tailing that file post-rollover, consume any data from that file that has not yet been consumed, up to the end of the file, even if there is no newline.
2edf551 is described below

commit 2edf5514b72691fb77a3fe391a48808155d3b29b
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Mar 25 14:28:47 2021 -0400

    NIFI-8344: If configured to tail a file for some period of time post-rollover, ensure that we only consume lines that are fully written (i.e., end with a newline). Once we stop tailing that file post-rollover, consume any data from that file that has not yet been consumed, up to the end of the file, even if there is no newline.
    
    This closes #4937.
    
    Signed-off-by: Tamas Palfy <ta...@gmail.com>
---
 .../apache/nifi/processors/standard/TailFile.java  | 271 +++++++++++++--------
 .../nifi/processors/standard/TestTailFile.java     |  10 +-
 2 files changed, 178 insertions(+), 103 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 6ca3f23..01cf285 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -46,6 +46,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.TailFile.TailFileState.StateKeys;
 import org.apache.nifi.stream.io.NullOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 
@@ -176,7 +177,9 @@ public class TailFile extends AbstractProcessor {
         .name("Post-Rollover Tail Period")
         .description("When a file is rolled over, the processor will continue tailing the rolled over file until it has not been modified for this amount of time. " +
             "This allows for another process to rollover a file, and then flush out any buffered data. Note that when this value is set, and the tailed file rolls over, " +
-            "the new file will not be tailed until the old file has not been modified for the configured amount of time.")
+            "the new file will not be tailed until the old file has not been modified for the configured amount of time. Additionally, when using this capability, in order to avoid data " +
+            "duplication, this period must be set longer than the Processor's Run Schedule, and the Processor must not be stopped after the file being tailed has been " +
+            "rolled over and before the data has been fully consumed. Otherwise, the data may be duplicated, as the entire file may be written out as the contents of a single FlowFile.")
         .required(false)
         .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
         .expressionLanguageSupported(NONE)
@@ -253,7 +256,7 @@ public class TailFile extends AbstractProcessor {
             .description("All FlowFiles are routed to this Relationship.")
             .build();
 
-    private volatile Map<String, TailFileObject> states = new HashMap<String, TailFileObject>();
+    private volatile Map<String, TailFileObject> states = new HashMap<>();
     private volatile AtomicLong lastLookup = new AtomicLong(0L);
     private volatile AtomicBoolean isMultiChanging = new AtomicBoolean(false);
     private volatile boolean requireStateLookup = true;
@@ -283,7 +286,7 @@ public class TailFile extends AbstractProcessor {
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
         if (isConfigurationRestored() && FILENAME.equals(descriptor)) {
-            states = new HashMap<String, TailFileObject>();
+            states = new HashMap<>();
         }
     }
 
@@ -324,7 +327,7 @@ public class TailFile extends AbstractProcessor {
         long maxAge = context.getProperty(MAXIMUM_AGE).getValue() == null ? Long.MAX_VALUE : context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
 
         // get list of files to tail
-        List<String> filesToTail = new ArrayList<String>();
+        List<String> filesToTail = new ArrayList<>();
 
         if(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
             filesToTail.addAll(getFilesToTail(context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue(),
@@ -394,7 +397,7 @@ public class TailFile extends AbstractProcessor {
             if( states.isEmpty() && !statesMap.isEmpty()) {
                 for (String key : statesMap.keySet()) {
                     if (key.endsWith(TailFileState.StateKeys.FILENAME)) {
-                        int index = Integer.valueOf(key.split("\\.")[1]);
+                        int index = Integer.parseInt(key.split("\\.")[1]);
                         states.put(statesMap.get(key), new TailFileObject(index, statesMap));
                     }
                 }
@@ -423,7 +426,7 @@ public class TailFile extends AbstractProcessor {
         for (String filename : filesToTail) {
             if (isCleared || !states.containsKey(filename)) {
                 final TailFileState tailFileState = new TailFileState(filename, null, null, 0L, 0L, 0L, null, ByteBuffer.allocate(65536));
-                states.put(filename, new TailFileObject(fileIndex, tailFileState, true));
+                states.put(filename, new TailFileObject(fileIndex, tailFileState));
 
                 fileIndex++;
             }
@@ -446,7 +449,7 @@ public class TailFile extends AbstractProcessor {
      */
     private List<String> getFilesToTail(final String baseDir, String fileRegex, boolean isRecursive, long maxAge) {
         final Collection<File> files = FileUtils.listFiles(new File(baseDir), null, isRecursive);
-        final List<String> result = new ArrayList<String>();
+        final List<String> result = new ArrayList<>();
 
         final String baseDirNoTrailingSeparator = baseDir.endsWith(File.separator) ? baseDir.substring(0, baseDir.length() - 1) : baseDir;
         final String fullRegex;
@@ -1042,7 +1045,7 @@ public class TailFile extends AbstractProcessor {
         // Sort files based on last modified timestamp. If same timestamp, use filename as a secondary sort, as often
         // files that are rolled over are given a naming scheme that is lexicographically sort in the same order as the
         // timestamp, such as yyyy-MM-dd-HH-mm-ss
-        Collections.sort(rolledOffFiles, new Comparator<File>() {
+        rolledOffFiles.sort(new Comparator<File>() {
             @Override
             public int compare(final File o1, final File o2) {
                 final int lastModifiedComp = Long.compare(o1.lastModified(), o2.lastModified());
@@ -1073,7 +1076,7 @@ public class TailFile extends AbstractProcessor {
     private void persistState(final Map<String, String> state, final ProcessSession session, final ProcessContext context) {
         try {
             final StateMap oldState = session.getState(getStateScope(context));
-            Map<String, String> updatedState = new HashMap<String, String>();
+            Map<String, String> updatedState = new HashMap<>();
 
             for(String key : oldState.toMap().keySet()) {
                 // These states are stored by older version of NiFi, and won't be used anymore.
@@ -1191,88 +1194,71 @@ public class TailFile extends AbstractProcessor {
             getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[]{rolledOffFiles.size()});
             TailFileObject tfo = states.get(tailFile);
 
+            final long postRolloverTailMillis = context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+            final boolean tailingPostRollover = tfo.getState().isTailingPostRollover();
+            final boolean shouldTailPostRollover = postRolloverTailMillis > 0;
+
             // For first file that we find, it may or may not be the file that we were last reading from.
             // As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match,
             // then we know we've already processed this file. If the checksums do not match, then we have not
             // processed this file and we need to seek back to position 0 and ingest the entire file.
             // For all other files that have been rolled over, we need to just ingest the entire file.
             boolean rolloverOccurred = !rolledOffFiles.isEmpty();
-            if (rolloverOccurred && expectedChecksum != null && rolledOffFiles.get(0).length() >= position) {
+
+            final boolean tailFirstFile;
+            if (rolloverOccurred) {
                 final File firstFile = rolledOffFiles.get(0);
+                final long millisSinceModified = System.currentTimeMillis() - firstFile.lastModified();
+                final boolean fileGrew = firstFile.length() >= position && position > 0;
+                final boolean tailRolledFile = postRolloverTailMillis == 0 || millisSinceModified < postRolloverTailMillis;
+                tailFirstFile = fileGrew && tailRolledFile && expectedChecksum != null;
+            } else {
+                tailFirstFile = false;
+            }
 
-                final long startNanos = System.nanoTime();
-                final Boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();
-                if (position > 0) {
-                    try (final FileInputStream fis = new FileInputStream(firstFile);
-                            final CheckedInputStream in = new CheckedInputStream(fis, new CRC32())) {
-                        StreamUtils.copy(in, new NullOutputStream(), position);
+            if (tailFirstFile) {
+                final File firstFile = rolledOffFiles.get(0);
 
-                        final long checksumResult = in.getChecksum().getValue();
-                        if (checksumResult == expectedChecksum) {
-                            getLogger().debug("Checksum for {} matched expected checksum. Will skip first {} bytes", new Object[]{firstFile, position});
-
-                            // This is the same file that we were reading when we shutdown. Start reading from this point on.
-                            rolledOffFiles.remove(0);
-                            FlowFile flowFile = session.create();
-
-                            final TailFileState currentState = tfo.getState();
-                            final Checksum checksum = currentState.getChecksum() == null ? new CRC32() : currentState.getChecksum();
-                            final ByteBuffer buffer = currentState.getBuffer() == null ? ByteBuffer.allocate(65536) : currentState.getBuffer();
-                            final FileChannel channel = fis.getChannel();
-                            final long timestamp = firstFile.lastModified();
-
-                            try {
-                                flowFile = session.write(flowFile, out -> readLines(channel, buffer, out, checksum, reReadOnNul, true));
-                            } catch (NulCharacterEncounteredException ncee) {
-                                rolledOffFiles.add(0, firstFile);
-                                session.remove(flowFile);
-                                throw ncee;
-                            }
+                final boolean consumed;
+                if (shouldTailPostRollover) {
+                    // User has configured to continue tailing file after it has been rolled over, until it's no longer being modified.
+                    // Consume any newly added lines from the rolled over file, but do not consume the last line, if it doesn't have a newline.
+                    // Keep the state indicating that we are currently tailing a file post-rollover.
+                    consumed = tailRolledFile(context, session, tailFile, expectedChecksum, position, tfo, firstFile, false, true);
+                } else {
+                    // User has not configured to continue tailing file after it has been rolled over. If any data was written to the rolled file before
+                    // rolling it over, consume that data, up to the end of the file, including the last line, even if it doesn't have a newline.
+                    consumed = tailRolledFile(context, session, tailFile, expectedChecksum, position, tfo, firstFile, true, false);
+                }
 
-                            if (flowFile.getSize() == 0L) {
-                                session.remove(flowFile);
-                            } else {
-                                final Map<String, String> attributes = new HashMap<>(3);
-                                attributes.put(CoreAttributes.FILENAME.key(), firstFile.getName());
-                                attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
-                                attributes.put("tailfile.original.path", tailFile);
-                                flowFile = session.putAllAttributes(flowFile, attributes);
-
-                                session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + position + " of source file",
-                                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
-                                session.transfer(flowFile, REL_SUCCESS);
-                                getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[]{flowFile, firstFile});
-                            }
+                if (consumed) {
+                    rolledOffFiles.remove(0);
+                }
+            } else if (tailingPostRollover && shouldTailPostRollover) {
+                // This condition is encountered when we are tailing a file post-rollover, and we've now reached the point where the rolled file
+                // has not changed.
+                final List<File> allRolledFiles = getRolledOffFiles(context, 0L, tailFile);
+                allRolledFiles.sort(Comparator.comparing(File::lastModified).reversed());
+                final File newestFile = allRolledFiles.get(0);
+
+                // If we don't notice that the file has been modified, per the checks above, then we want to keep checking until the last modified
+                // date has eclipsed the configured value for the Post-Rollover Tail Period. Until then, return false. Once that occurs, we will
+                // consume the rest of the data, including the last line, even if it doesn't have a line ending.
+                final long millisSinceModified = System.currentTimeMillis() - newestFile.lastModified();
+                if (millisSinceModified < postRolloverTailMillis) {
+                    getLogger().debug("Rolled over file {} (size={}, lastModified={}) was modified {} millis ago, which isn't long enough to consume file fully without taking line endings into " +
+                        "account. Will do nothing will file for now.", newestFile, newestFile.length(), newestFile.lastModified(), millisSinceModified);
+                    return true;
+                }
 
-                            // We need to update the state to account for the fact that we just brought data in.
-                            // If we are going to tail a rolled over file for some amount of time, then we need to keep the state pointing to the
-                            // same file, just using an updated position/timestamp/checksum/length. This way, the next iteration will compare against these
-                            // updated values.
-                            // But if we are not going to tail the rolled over file for any period of time, we can essentially reset the state.
-                            final long postRolloverTailMillis = context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
-                            final long millisSinceUpdate = System.currentTimeMillis() - timestamp;
-                            if (postRolloverTailMillis > 0 && millisSinceUpdate < postRolloverTailMillis) {
-                                getLogger().debug("File {} has been rolled over, but it was updated {} millis ago, which is less than the configured {} ({} ms), so will continue tailing",
-                                    firstFile, millisSinceUpdate, POST_ROLLOVER_TAIL_PERIOD.getDisplayName(), postRolloverTailMillis);
-
-                                final long length = currentState.getLength() + flowFile.getSize();
-                                final long updatedPosition = position + flowFile.getSize();
-                                final TailFileState updatedState = new TailFileState(currentState.getFilename(), currentState.getFile(), channel, updatedPosition, timestamp, length, checksum,
-                                    buffer);
-
-                                tfo.setState(updatedState);
-                                persistState(tfo, session, context);
-                            } else {
-                                // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
-                                cleanup();
-                                tfo.setState(new TailFileState(tailFile, null, null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, tfo.getState().getBuffer()));
-                                persistState(tfo, session, context);
-                            }
-                        } else {
-                            getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file",
-                                    new Object[]{firstFile, checksumResult, expectedChecksum});
-                        }
-                    }
+                // The file has been rolled over and is no longer being written to. Consume all the way to the end of the file, including the last line,
+                // even if it does not have a newline after it.
+                final boolean consumed = tailRolledFile(context, session, tailFile, expectedChecksum, position, tfo, newestFile, true, false);
+                if (consumed) {
+                    getLogger().debug("Consumed the final data from {}", newestFile);
+                    rolledOffFiles.remove(newestFile);
+                } else {
+                    getLogger().debug("No more data to consume from {} (size={}, lastModified={})", newestFile, newestFile.length(), newestFile.lastModified());
                 }
             }
 
@@ -1291,6 +1277,87 @@ public class TailFile extends AbstractProcessor {
         }
     }
 
+    private boolean tailRolledFile(final ProcessContext context, final ProcessSession session, final String tailFile, final Long expectedChecksum,
+                                final long position, final TailFileObject tfo, final File fileToTail, final boolean readFully, final boolean tailingPostRollover) throws IOException {
+
+        final Boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();
+        final long startNanos = System.nanoTime();
+
+        try (final FileInputStream fis = new FileInputStream(fileToTail);
+             final CheckedInputStream in = new CheckedInputStream(fis, new CRC32())) {
+            StreamUtils.copy(in, new NullOutputStream(), position);
+
+            final long checksumResult = in.getChecksum().getValue();
+            if (checksumResult != expectedChecksum) {
+                getLogger().debug("Checksum for {} did not match expected checksum. Checksum for file was {} but expected {}. Will consume entire file",
+                    new Object[]{fileToTail, checksumResult, expectedChecksum});
+
+                return false;
+            }
+
+            getLogger().debug("Checksum for {} matched expected checksum. Will skip first {} bytes", new Object[]{fileToTail, position});
+
+            // This is the same file that we were reading when we shutdown. Start reading from this point on.
+            FlowFile flowFile = session.create();
+
+            final TailFileState currentState = tfo.getState();
+            final Checksum checksum = currentState.getChecksum() == null ? new CRC32() : currentState.getChecksum();
+            final ByteBuffer buffer = currentState.getBuffer() == null ? ByteBuffer.allocate(65536) : currentState.getBuffer();
+            final FileChannel channel = fis.getChannel();
+            final long timestamp = fileToTail.lastModified();
+
+            try {
+                flowFile = session.write(flowFile, out -> readLines(channel, buffer, out, checksum, reReadOnNul, readFully));
+            } catch (NulCharacterEncounteredException ncee) {
+                session.remove(flowFile);
+                throw ncee;
+            }
+
+            if (flowFile.getSize() == 0L) {
+                session.remove(flowFile);
+            } else {
+                final Map<String, String> attributes = new HashMap<>(3);
+                attributes.put(CoreAttributes.FILENAME.key(), fileToTail.getName());
+                attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+                attributes.put("tailfile.original.path", tailFile);
+                flowFile = session.putAllAttributes(flowFile, attributes);
+
+                session.getProvenanceReporter().receive(flowFile, fileToTail.toURI().toString(), "FlowFile contains bytes 0 through " + position + " of source file",
+                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+                session.transfer(flowFile, REL_SUCCESS);
+                getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[]{flowFile, fileToTail});
+            }
+
+            // We need to update the state to account for the fact that we just brought data in.
+            // If we are going to tail a rolled over file for some amount of time, then we need to keep the state pointing to the
+            // same file, just using an updated position/timestamp/checksum/length. This way, the next iteration will compare against these
+            // updated values.
+            // But if we are not going to tail the rolled over file for any period of time, we can essentially reset the state.
+            final long postRolloverTailMillis = context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+            final long millisSinceUpdate = System.currentTimeMillis() - timestamp;
+            if (tailingPostRollover && postRolloverTailMillis > 0) {
+                getLogger().debug("File {} has been rolled over, but it was updated {} millis ago, which is less than the configured {} ({} ms), so will continue tailing",
+                    fileToTail, millisSinceUpdate, POST_ROLLOVER_TAIL_PERIOD.getDisplayName(), postRolloverTailMillis);
+
+                final long length = currentState.getLength() + flowFile.getSize();
+                final long updatedPosition = position + flowFile.getSize();
+                final TailFileState updatedState = new TailFileState(currentState.getFilename(), currentState.getFile(), channel, updatedPosition, timestamp, length, checksum,
+                    buffer, tailingPostRollover);
+
+                tfo.setState(updatedState);
+            } else {
+                // use a timestamp of lastModified() + 1 so that we do not ingest this file again.
+                getLogger().debug("Completed tailing of file {}; will cleanup state", tailFile);
+                cleanup();
+                tfo.setState(new TailFileState(tailFile, null, null, 0L, fileToTail.lastModified() + 1L, fileToTail.length(), null, tfo.getState().getBuffer(), tailingPostRollover));
+            }
+
+            persistState(tfo, session, context);
+            return true;
+        }
+    }
+
+
     /**
      * Creates a new FlowFile that contains the entire contents of the given
      * file and transfers that FlowFile to success. This method will commit the
@@ -1332,18 +1399,13 @@ public class TailFile extends AbstractProcessor {
 
     static class TailFileObject {
 
-        private TailFileState state = new TailFileState(null, null, null, 0L, 0L, 0L, null, ByteBuffer.allocate(65536));
+        private TailFileState state;
         private Long expectedRecoveryChecksum;
         private int filenameIndex;
         private boolean tailFileChanged = true;
 
-        public TailFileObject(int index) {
-            this.filenameIndex = index;
-        }
-
-        public TailFileObject(final int index, final TailFileState fileState, final boolean tailFileChanged) {
+        public TailFileObject(final int index, final TailFileState fileState) {
             this.filenameIndex = index;
-            this.tailFileChanged = true;
             this.state = fileState;
         }
 
@@ -1352,20 +1414,17 @@ public class TailFile extends AbstractProcessor {
             this.tailFileChanged = false;
             final String prefix = MAP_PREFIX + index + '.';
             final String filename = statesMap.get(prefix + TailFileState.StateKeys.FILENAME);
-            final long position = Long.valueOf(statesMap.get(prefix + TailFileState.StateKeys.POSITION));
-            final long timestamp = Long.valueOf(statesMap.get(prefix + TailFileState.StateKeys.TIMESTAMP));
-            final long length = Long.valueOf(statesMap.get(prefix + TailFileState.StateKeys.LENGTH));
-            this.state = new TailFileState(filename, new File(filename), null, position, timestamp, length, null, ByteBuffer.allocate(65536));
+            final long position = Long.parseLong(statesMap.get(prefix + TailFileState.StateKeys.POSITION));
+            final long timestamp = Long.parseLong(statesMap.get(prefix + TailFileState.StateKeys.TIMESTAMP));
+            final long length = Long.parseLong(statesMap.get(prefix + TailFileState.StateKeys.LENGTH));
+            final boolean tailingPostRollover = Boolean.parseBoolean(prefix + StateKeys.TAILING_POST_ROLLOVER);
+            this.state = new TailFileState(filename, new File(filename), null, position, timestamp, length, null, ByteBuffer.allocate(65536), tailingPostRollover);
         }
 
         public int getFilenameIndex() {
             return filenameIndex;
         }
 
-        public void setFilenameIndex(int filenameIndex) {
-            this.filenameIndex = filenameIndex;
-        }
-
         public TailFileState getState() {
             return state;
         }
@@ -1406,17 +1465,24 @@ public class TailFile extends AbstractProcessor {
         private final long length;
         private final Checksum checksum;
         private final ByteBuffer buffer;
+        private final boolean tailingPostRollover;
 
-        private static class StateKeys {
+        static class StateKeys {
             public static final String FILENAME = "filename";
             public static final String POSITION = "position";
             public static final String TIMESTAMP = "timestamp";
             public static final String CHECKSUM = "checksum";
             public static final String LENGTH = "length";
+            public static final String TAILING_POST_ROLLOVER = "tailingPostRollover";
+        }
+
+        public TailFileState(final String filename, final File file, final FileChannel reader, final long position, final long timestamp,
+                             final long length, final Checksum checksum, final ByteBuffer buffer) {
+            this(filename, file, reader, position, timestamp, length, checksum, buffer, false);
         }
 
-        public TailFileState(final String filename, final File file, final FileChannel reader,
-                final long position, final long timestamp, final long length, final Checksum checksum, final ByteBuffer buffer) {
+        public TailFileState(final String filename, final File file, final FileChannel reader, final long position, final long timestamp,
+                             final long length, final Checksum checksum, final ByteBuffer buffer, final boolean tailingPostRollover) {
             this.filename = filename;
             this.file = file;
             this.reader = reader;
@@ -1425,6 +1491,7 @@ public class TailFile extends AbstractProcessor {
             this.timestamp = timestamp; // many operating systems will use only second-level precision for last-modified times so cut off milliseconds
             this.checksum = checksum;
             this.buffer = buffer;
+            this.tailingPostRollover = tailingPostRollover;
         }
 
         public String getFilename() {
@@ -1459,9 +1526,14 @@ public class TailFile extends AbstractProcessor {
             return buffer;
         }
 
+        public boolean isTailingPostRollover() {
+            return tailingPostRollover;
+        }
+
         @Override
         public String toString() {
-            return "TailFileState[filename=" + filename + ", position=" + position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? "null" : checksum.getValue()) + "]";
+            return "TailFileState[filename=" + filename + ", position=" + position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? "null" : checksum.getValue()) +
+                ", tailingPostRollover=" + tailingPostRollover + "]";
         }
 
         public Map<String, String> toStateMap(int index) {
@@ -1472,6 +1544,7 @@ public class TailFile extends AbstractProcessor {
             map.put(prefix + StateKeys.LENGTH, String.valueOf(length));
             map.put(prefix + StateKeys.TIMESTAMP, String.valueOf(timestamp));
             map.put(prefix + StateKeys.CHECKSUM, checksum == null ? null : String.valueOf(checksum.getValue()));
+            map.put(prefix + StateKeys.TAILING_POST_ROLLOVER, String.valueOf(tailingPostRollover));
             return map;
         }
     }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index a62fbea..c279c3f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -347,6 +347,7 @@ public class TestTailFile {
         // Create the new file
         final RandomAccessFile newFile = new RandomAccessFile(new File("target/log.txt"), "rw");
         newFile.write("new file\n".getBytes()); // This should not get consumed until the old file's last modified date indicates it's complete
+        newFile.close();
 
         // Trigger processor and verify data is consumed properly
         runner.run(1, false, false);
@@ -355,8 +356,8 @@ public class TestTailFile {
         runner.clearTransferState();
 
         // Write to the file and trigger again.
-        raf.write("e\n".getBytes());
-        System.out.println("Wrote e\\n");
+        raf.write("e\nf".getBytes());
+        System.out.println("Wrote e\\nf");
         runner.run(1, false, false);
 
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
@@ -376,8 +377,9 @@ public class TestTailFile {
         runner.run(1, false, false);
 
         // Verify results
-        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
-        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("new file\n");
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("f");
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("new file\n");
         runner.clearTransferState();
 
         raf.close();