You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/07/26 18:26:42 UTC

[GitHub] [nifi] markap14 opened a new pull request #5251: NIFI-8773: Implemented the 'Line Start Regex' capability. Each messag…

markap14 opened a new pull request #5251:
URL: https://github.com/apache/nifi/pull/5251


   …e encountered in the tailed file will be buffered (up to some configurable max) until the subsequent message arrives. At that point, the previous message will be flushed.
   
   <!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
         http://www.apache.org/licenses/LICENSE-2.0
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
   -->
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   _Enables X functionality; fixes bug NIFI-YYYY._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #5251: NIFI-8773: Implemented the 'Line Start Regex' capability. Each messag…

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #5251:
URL: https://github.com/apache/nifi/pull/5251#discussion_r681197130



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -251,6 +257,30 @@
             .defaultValue("false")
             .build();
 
+    static final PropertyDescriptor LINE_START_REGEX = new Builder()
+        .name("Line Start Regex")
+        .displayName("Line Start Regex")

Review comment:
       Yeah, good call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #5251: NIFI-8773: Implemented the 'Line Start Regex' capability. Each messag…

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #5251:
URL: https://github.com/apache/nifi/pull/5251#discussion_r681194673



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
##########
@@ -833,6 +834,73 @@ public void testConsumeWhenNewLineFound() throws IOException, InterruptedExcepti
         runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("return\r\r\n");
     }
 
+    @Test
+    public void testMultiLineWaitsForRegexMatchShutdownBetweenReads() throws IOException {
+        testMultiLineWaitsForRegexMatch(true);
+    }
+
+    @Test
+    public void testMultiLineWaitsForRegexMatchWithoutShutdownBetweenReads() throws IOException {
+        testMultiLineWaitsForRegexMatch(false);
+    }
+
+    private void testMultiLineWaitsForRegexMatch(final boolean shutdownBetweenReads) throws IOException {
+        runner.setProperty(TailFile.LINE_START_REGEX, "<\\d>");
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+
+        final String line1 = "<1>Hello, World\n";
+        final String line2 = "<2>Good-bye, World\n";
+        final String line3 = "<3>Start of multi-line\n";
+        final String line4 = "<4>Last One\n";
+
+        raf.write(line1.getBytes());
+        raf.write(line2.getBytes());
+
+        runner.run(1, shutdownBetweenReads, true);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        raf.write(line3.getBytes());
+        runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        for (int i=0; i < 10; i++) {
+            System.out.println("i = " + i);

Review comment:
       Probably true.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5251: NIFI-8773: Implemented the 'Line Start Regex' capability. Each messag…

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5251:
URL: https://github.com/apache/nifi/pull/5251#discussion_r681108549



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -976,14 +1032,53 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final
         }
     }
 
-    private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, final OutputStream out, final Checksum checksum) throws IOException {
-        baos.writeTo(out);
+    private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, final OutputStream out, final Checksum checksum, final boolean ignoreRegex) throws IOException {
         final byte[] baosBuffer = baos.toByteArray();
-        checksum.update(baosBuffer, 0, baos.size());
+        baos.reset();
+
+        // If the regular expression is being ignored, we need to flush anything that is buffered.
+        // This happens, for example, when a file has been rolled over. At that point, we want to flush whatever we have,
+        // even if the regex hasn't been matched.
+        if (ignoreRegex) {
+            flushLinesBuffer(out, checksum);
+        }
+
+        if (lineStartRegex == null) {
+            out.write(baosBuffer);
+
+            checksum.update(baosBuffer, 0, baosBuffer.length);
+            if (getLogger().isTraceEnabled()) {
+                getLogger().trace("Checksum updated to {}", checksum.getValue());
+            }
+
+            return;
+        }
+
+        final String bufferAsString = new String(baosBuffer, StandardCharsets.UTF_8);
+        final String[] lines = bufferAsString.split("\n");

Review comment:
       Should this be replaced with `System.lineSeparator()` or perhaps changed to a configurable property to work across different platforms?
   ```suggestion
           final String[] lines = bufferAsString.split(System.lineSeparator());
   ```

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -251,6 +257,30 @@
             .defaultValue("false")
             .build();
 
+    static final PropertyDescriptor LINE_START_REGEX = new Builder()
+        .name("Line Start Regex")
+        .displayName("Line Start Regex")

Review comment:
       With the existing property named `Rolling Filename Pattern`, what do you think about naming this property `Line Start Pattern`? 

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -108,6 +113,7 @@
 public class TailFile extends AbstractProcessor {
 
     static final String MAP_PREFIX = "file.";
+    private static final byte[] NEW_LINE_BYTES = "\n".getBytes(StandardCharsets.UTF_8);

Review comment:
       Should this use `System.lineSeparator()`?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
##########
@@ -833,6 +834,73 @@ public void testConsumeWhenNewLineFound() throws IOException, InterruptedExcepti
         runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("return\r\r\n");
     }
 
+    @Test
+    public void testMultiLineWaitsForRegexMatchShutdownBetweenReads() throws IOException {
+        testMultiLineWaitsForRegexMatch(true);
+    }
+
+    @Test
+    public void testMultiLineWaitsForRegexMatchWithoutShutdownBetweenReads() throws IOException {
+        testMultiLineWaitsForRegexMatch(false);
+    }
+
+    private void testMultiLineWaitsForRegexMatch(final boolean shutdownBetweenReads) throws IOException {
+        runner.setProperty(TailFile.LINE_START_REGEX, "<\\d>");
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+
+        final String line1 = "<1>Hello, World\n";
+        final String line2 = "<2>Good-bye, World\n";
+        final String line3 = "<3>Start of multi-line\n";
+        final String line4 = "<4>Last One\n";
+
+        raf.write(line1.getBytes());
+        raf.write(line2.getBytes());
+
+        runner.run(1, shutdownBetweenReads, true);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        raf.write(line3.getBytes());
+        runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        for (int i=0; i < 10; i++) {
+            System.out.println("i = " + i);

Review comment:
       Although this is helpful for diagnosing the test, it seems like it would be better to change the output to use a logger as opposed to `System.out`.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -847,8 +900,12 @@ public void process(final OutputStream rawOut) throws IOException {
             flowFile = session.putAllAttributes(flowFile, attributes);
 
             session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), "FlowFile contains bytes " + position + " through " + positionHolder.get() + " of source file",
-                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
             session.transfer(flowFile, REL_SUCCESS);
+            getLogger().debug("Created {} and routed to success", new Object[]{flowFile});

Review comment:
       The `Object[]` wrapper is no longer necessary, so this could be streamlined:
   ```suggestion
               getLogger().debug("Created {} and routed to success", flowFile);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #5251: NIFI-8773: Implemented the 'Line Start Regex' capability. Each messag…

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #5251:
URL: https://github.com/apache/nifi/pull/5251#discussion_r681196896



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -108,6 +113,7 @@
 public class TailFile extends AbstractProcessor {
 
     static final String MAP_PREFIX = "file.";
+    private static final byte[] NEW_LINE_BYTES = "\n".getBytes(StandardCharsets.UTF_8);

Review comment:
       No, same argument as above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #5251: NIFI-8773: Implemented the 'Line Start Regex' capability. Each messag…

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #5251:
URL: https://github.com/apache/nifi/pull/5251#discussion_r681196777



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -976,14 +1032,53 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final
         }
     }
 
-    private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, final OutputStream out, final Checksum checksum) throws IOException {
-        baos.writeTo(out);
+    private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, final OutputStream out, final Checksum checksum, final boolean ignoreRegex) throws IOException {
         final byte[] baosBuffer = baos.toByteArray();
-        checksum.update(baosBuffer, 0, baos.size());
+        baos.reset();
+
+        // If the regular expression is being ignored, we need to flush anything that is buffered.
+        // This happens, for example, when a file has been rolled over. At that point, we want to flush whatever we have,
+        // even if the regex hasn't been matched.
+        if (ignoreRegex) {
+            flushLinesBuffer(out, checksum);
+        }
+
+        if (lineStartRegex == null) {
+            out.write(baosBuffer);
+
+            checksum.update(baosBuffer, 0, baosBuffer.length);
+            if (getLogger().isTraceEnabled()) {
+                getLogger().trace("Checksum updated to {}", checksum.getValue());
+            }
+
+            return;
+        }
+
+        final String bufferAsString = new String(baosBuffer, StandardCharsets.UTF_8);
+        final String[] lines = bufferAsString.split("\n");

Review comment:
       No - System.lineSeparator() is specific to the system that NiFi is running on. There is no reason to believe that the data would be populated with the same line separator. There are, generally, 2 possible line endings: \r\n and \n. If we use System.lineSeparator() and NiFi is run on Windows, and ingesting files written with \n, it will never detect a newline. On the other hand, if it splits based on \n, then whether the line ends with \r\n or \n, it will still be split.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5251: NIFI-8773: Implemented the 'Line Start Regex' capability. Each messag…

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5251:
URL: https://github.com/apache/nifi/pull/5251#discussion_r681212926



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -976,14 +1032,53 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final
         }
     }
 
-    private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, final OutputStream out, final Checksum checksum) throws IOException {
-        baos.writeTo(out);
+    private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, final OutputStream out, final Checksum checksum, final boolean ignoreRegex) throws IOException {
         final byte[] baosBuffer = baos.toByteArray();
-        checksum.update(baosBuffer, 0, baos.size());
+        baos.reset();
+
+        // If the regular expression is being ignored, we need to flush anything that is buffered.
+        // This happens, for example, when a file has been rolled over. At that point, we want to flush whatever we have,
+        // even if the regex hasn't been matched.
+        if (ignoreRegex) {
+            flushLinesBuffer(out, checksum);
+        }
+
+        if (lineStartRegex == null) {
+            out.write(baosBuffer);
+
+            checksum.update(baosBuffer, 0, baosBuffer.length);
+            if (getLogger().isTraceEnabled()) {
+                getLogger().trace("Checksum updated to {}", checksum.getValue());
+            }
+
+            return;
+        }
+
+        final String bufferAsString = new String(baosBuffer, StandardCharsets.UTF_8);
+        final String[] lines = bufferAsString.split("\n");

Review comment:
       Thanks for the reply, that makes sense, and sounds like this approach should cover all platforms as described.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #5251: NIFI-8773: Implemented the 'Line Start Regex' capability. Each messag…

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #5251:
URL: https://github.com/apache/nifi/pull/5251


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org