You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/11/04 21:32:27 UTC

[nifi] branch main updated: NIFI-10767: When an empty line is encountered with Syslog Readers, just skip over the empty line instead of throwing an Exception

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 381e0f84e1 NIFI-10767: When an empty line is encountered with Syslog Readers, just skip over the empty line instead of throwing an Exception
381e0f84e1 is described below

commit 381e0f84e122a7ff357ad08b7b8fe2c08fee0c24
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Nov 4 15:31:50 2022 -0400

    NIFI-10767: When an empty line is encountered with Syslog Readers, just skip over the empty line instead of throwing an Exception
    
    This closes #6623
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../apache/nifi/syslog/Syslog5424RecordReader.java | 33 +++++++++++++---------
 .../org/apache/nifi/syslog/SyslogRecordReader.java | 31 ++++++++++++--------
 .../apache/nifi/syslog/TestSyslogRecordReader.java | 12 ++------
 3 files changed, 42 insertions(+), 34 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java
index 520c95fba1..be48f65904 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424RecordReader.java
@@ -26,6 +26,8 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes;
 import org.apache.nifi.syslog.events.Syslog5424Event;
 import org.apache.nifi.syslog.parsers.StrictSyslog5424Parser;
 import org.apache.nifi.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -39,6 +41,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class Syslog5424RecordReader implements RecordReader {
+    private static final Logger logger = LoggerFactory.getLogger(Syslog5424RecordReader.class);
+
     private final BufferedReader reader;
     private RecordSchema schema;
     private final StrictSyslog5424Parser parser;
@@ -53,36 +57,39 @@ public class Syslog5424RecordReader implements RecordReader {
 
     @Override
     public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
-        String line = reader.readLine();
+        String line;
+        while (true) {
+            line = reader.readLine();
 
-        if ( line == null ) {
-            // a null return from readLine() signals the end of the stream
-            return null;
-        }
+            if (line == null) {
+                // a null return from readLine() signals the end of the stream
+                return null;
+            }
+
+            if (StringUtils.isBlank(line)) {
+                logger.debug("Encountered empty line, will skip");
+                continue;
+            }
 
-        if (StringUtils.isBlank(line)) {
-            // while an empty string is an error
-            throw new MalformedRecordException("Encountered a blank message!");
+            break;
         }
 
 
-        final MalformedRecordException malformedRecordException;
         Syslog5424Event event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName())));
 
         if (!event.isValid()) {
             if (event.getException() != null) {
-                malformedRecordException = new MalformedRecordException(
+                throw new MalformedRecordException(
                         String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC "+
                                 "formats supported", line), event.getException());
             } else {
-                malformedRecordException = new MalformedRecordException(
+                throw new MalformedRecordException(
                         String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC" +
                                 " formats supported", line));
             }
-            throw malformedRecordException;
         }
 
-        Map<String,Object> modifiedMap = new HashMap<>(event.getFieldMap());
+        final Map<String,Object> modifiedMap = new HashMap<>(event.getFieldMap());
         modifiedMap.put(SyslogAttributes.TIMESTAMP.key(),convertTimeStamp((String)event.getFieldMap().get(SyslogAttributes.TIMESTAMP.key())));
 
         if(includeRaw) {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogRecordReader.java
index 43ceab92ca..e92f296444 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogRecordReader.java
@@ -26,6 +26,8 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes;
 import org.apache.nifi.syslog.events.SyslogEvent;
 import org.apache.nifi.syslog.parsers.SyslogParser;
 import org.apache.nifi.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -36,6 +38,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class SyslogRecordReader implements RecordReader {
+    private static final Logger logger = LoggerFactory.getLogger(SyslogRecordReader.class);
+
     private final BufferedReader reader;
     private RecordSchema schema;
     private final SyslogParser parser;
@@ -50,24 +54,27 @@ public class SyslogRecordReader implements RecordReader {
 
     @Override
     public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException {
-        String line = reader.readLine();
+        String line;
+        while (true) {
+            line = reader.readLine();
 
-        if (line == null) {
-            // a null return from readLine() signals the end of the stream
-            return null;
-        }
+            if (line == null) {
+                // a null return from readLine() signals the end of the stream
+                return null;
+            }
 
-        if (StringUtils.isBlank(line)) {
-            // while an empty string is an error
-            throw new MalformedRecordException("Encountered a blank message!");
-        }
+            if (StringUtils.isBlank(line)) {
+                logger.debug("Encountered empty line, will skip");
+                continue;
+            }
 
+            break;
+        }
 
-        final MalformedRecordException malformedRecordException;
-        SyslogEvent event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName())));
+        final SyslogEvent event = parser.parseEvent(ByteBuffer.wrap(line.getBytes(parser.getCharsetName())));
 
         if (!event.isValid()) {
-            malformedRecordException = new MalformedRecordException(
+            final MalformedRecordException malformedRecordException = new MalformedRecordException(
                     String.format("Failed to parse %s as a Syslog message: it does not conform to any of the RFC" +
                             " formats supported", line));
             throw malformedRecordException;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslogRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslogRecordReader.java
index 27751ac258..1af8a96cf8 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslogRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/syslog/TestSyslogRecordReader.java
@@ -157,18 +157,12 @@ public class TestSyslogRecordReader {
 
             Record record = deserializer.nextRecord();
             int count = 0;
-            int exceptionCount = 0;
             while (record != null){
                 assertNotNull(record.getValues());
-                try {
-                    record = deserializer.nextRecord();
-                    count++;
-                } catch (Exception e) {
-                    exceptionCount++;
-                }
+                record = deserializer.nextRecord();
+                count++;
             }
-            assertEquals(count, 3);
-            assertEquals(exceptionCount,1);
+            assertEquals(3, count);
             deserializer.close();
         }
     }