You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/03/19 19:52:51 UTC

[GitHub] [nifi] tpalfy commented on a change in pull request #4916: NIFI-8344: Introduced new Rollover Tail Period property

tpalfy commented on a change in pull request #4916:
URL: https://github.com/apache/nifi/pull/4916#discussion_r597880479



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -735,13 +757,15 @@ private void processTailFile(final ProcessContext context, final ProcessSession
         if (!rotated) {
             final long fileLength = file.length();
             if (length > fileLength) {
+                getLogger().debug("Rotated = true because TailFile State Length = {}, File Length = {}", length, fileLength);

Review comment:
       Minor (bit confusing otherwise)
   ```suggestion
                   getLogger().debug("Rotated = true because TailFileState Length = {}, File Length = {}", length, fileLength);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -166,10 +168,21 @@
                     + "(without extension), and will assume that the files that have rolled over live in the same directory as the file being tailed. "
                     + "The same glob pattern will be used for all files.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .expressionLanguageSupported(NONE)
             .required(false)
             .build();
 
+    static final PropertyDescriptor ROLLOVER_TAIL_PERIOD = new PropertyDescriptor.Builder()

Review comment:
       Minor: It maybe just me but to me "rollover" feels more like a verb rather than a noun/adjective. So "Rollover Tail Period" is a bit confusing. "Post-Rollover Tail Period" feels more natural and highlights the idea that we do something that we usually do not, _after_ the rollover.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -691,6 +705,14 @@ private void processTailFile(final ProcessContext context, final ProcessSession
             }
 
             rolloverOccurred = recoverRolledFiles(context, session, tailFile, expectedChecksumValue, tfo.getState().getTimestamp(), tfo.getState().getPosition());
+            if (rolloverOccurred) {
+                final boolean tailRollover = context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS) > 0;

Review comment:
       Minor: Similar to previous comment.
   We could use `tailAfterRollover` or even `tailRolledOver` for example.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -1215,11 +1242,30 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe
                                         TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
                                 session.transfer(flowFile, REL_SUCCESS);
                                 getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[]{flowFile, firstFile});
+                            }
 
+                            // We need to update the state to account for the fact that we just brought data in.
+                            // If we are going to tail a rolled over file for some amount of time, then we need to keep the state pointing to the
+                            // same file, just using an updated position/timestamp/checksum/length. This way, the next iteration will compare against these
+                            // updated values.
+                            // But if we are not going to tail the rolled over file for any period of time, we can essentially reset the state.
+                            final long rolloverTailMillis = context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);

Review comment:
       Minor
   ```suggestion
                               final long postRolloverTailMillis = context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
##########
@@ -311,6 +313,79 @@ private File rollover(final int index) throws IOException {
     }
 
 
+    @Test
+    public void testFileWrittenToAfterRollover() throws IOException, InterruptedException {
+        Assume.assumeTrue("Test requires renaming a file while a file handle is still open to it, so it won't run on Windows", !SystemUtils.IS_OS_WINDOWS);
+
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+        runner.setProperty(TailFile.START_POSITION, TailFile.START_BEGINNING_OF_TIME.getValue());
+        runner.setProperty(TailFile.REREAD_ON_NUL, "true");
+        runner.setProperty(TailFile.ROLLOVER_TAIL_PERIOD, "10 mins");
+
+        // first line fully written, second partially

Review comment:
       I'd rather remove these comments. Code is clear enough and some of these already seem to be outdated.
   Like this one: why do we consider the second line partial?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -1215,11 +1242,30 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe
                                         TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
                                 session.transfer(flowFile, REL_SUCCESS);
                                 getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[]{flowFile, firstFile});
+                            }
 
+                            // We need to update the state to account for the fact that we just brought data in.
+                            // If we are going to tail a rolled over file for some amount of time, then we need to keep the state pointing to the
+                            // same file, just using an updated position/timestamp/checksum/length. This way, the next iteration will compare against these
+                            // updated values.
+                            // But if we are not going to tail the rolled over file for any period of time, we can essentially reset the state.
+                            final long rolloverTailMillis = context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+                            final long millisSinceUpdate = System.currentTimeMillis() - firstFile.lastModified();

Review comment:
       Minor: Just to help not to become uncertain what `timestamp` was about. 
   ```suggestion
                               final long millisSinceUpdate = System.currentTimeMillis() - timestamp;
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
##########
@@ -311,6 +313,79 @@ private File rollover(final int index) throws IOException {
     }
 
 
+    @Test
+    public void testFileWrittenToAfterRollover() throws IOException, InterruptedException {
+        Assume.assumeTrue("Test requires renaming a file while a file handle is still open to it, so it won't run on Windows", !SystemUtils.IS_OS_WINDOWS);
+
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+        runner.setProperty(TailFile.START_POSITION, TailFile.START_BEGINNING_OF_TIME.getValue());
+        runner.setProperty(TailFile.REREAD_ON_NUL, "true");
+        runner.setProperty(TailFile.ROLLOVER_TAIL_PERIOD, "10 mins");
+
+        // first line fully written, second partially
+        raf.write("a\nb\n".getBytes());
+        runner.run(1, false, true);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("a\nb\n");
+        runner.clearTransferState();
+
+        raf.write("c\n".getBytes());
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("c\n");
+        runner.clearTransferState();
+
+        // Write additional data to file, then roll file over
+        raf.write("d\n".getBytes());
+
+        final File rolledFile = new File("target/log.1");
+        final boolean renamed = file.renameTo(rolledFile);
+        assertTrue(renamed);
+        raf.getChannel().force(true);
+
+        System.out.println("Wrote d\\n and rolled file");
+
+        // Create the new file
+        final RandomAccessFile newFile = new RandomAccessFile(new File("target/log.txt"), "rw");
+        newFile.write("new file\n".getBytes()); // This should not get consumed until the old file's last modified date indicates it's complete
+
+        // Trigger processor and verify data is consumed properly
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("d\n");
+        runner.clearTransferState();
+
+        // Write to the file and trigger again.
+        raf.write("e\n".getBytes());
+        System.out.println("Wrote e\\n");
+        runner.run(1, false, false);
+
+        // There should be no data consumed because the last modified time is too recent.

Review comment:
       Another outdated(?) comment. "e" _has_ been consumed (although from the rolled over file)

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -1215,11 +1242,30 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe
                                         TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
                                 session.transfer(flowFile, REL_SUCCESS);
                                 getLogger().debug("Created {} from rolled over file {} and routed to success", new Object[]{flowFile, firstFile});
+                            }
 
+                            // We need to update the state to account for the fact that we just brought data in.
+                            // If we are going to tail a rolled over file for some amount of time, then we need to keep the state pointing to the
+                            // same file, just using an updated position/timestamp/checksum/length. This way, the next iteration will compare against these
+                            // updated values.
+                            // But if we are not going to tail the rolled over file for any period of time, we can essentially reset the state.
+                            final long rolloverTailMillis = context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+                            final long millisSinceUpdate = System.currentTimeMillis() - firstFile.lastModified();
+                            if (rolloverTailMillis > 0 && millisSinceUpdate < rolloverTailMillis) {
+                                getLogger().debug("File {} has been rolled over, but it was updated {} millis ago, which is less than the configured Rollover Tail Period, so will continue " +
+                                    "tailing", firstFile, millisSinceUpdate);

Review comment:
       ```suggestion
                                   
                                   getLogger().debug("File {} has been rolled over, but it was updated {} millis ago, which is less than the configured " + ROLLOVER_TAIL_PERIOD.getDisplayName() +
                                       " ({} ms), so will continue tailing", firstFile, millisSinceUpdate, rolloverTailMillis);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org