You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/09/08 16:44:58 UTC

nifi git commit: NIFI-4081 - Added raw message option in GrokReader This closes #1921.

Repository: nifi
Updated Branches:
  refs/heads/master 1f67cbf62 -> 655960445


NIFI-4081 - Added raw message option in GrokReader
This closes #1921.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/65596044
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/65596044
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/65596044

Branch: refs/heads/master
Commit: 65596044563bd8217a1fcfcd9bfafa02f2f13160
Parents: 1f67cbf
Author: Pierre Villard <pi...@gmail.com>
Authored: Fri Jun 16 00:09:45 2017 +0200
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Sep 8 12:44:37 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/grok/GrokReader.java   |  6 ++-
 .../org/apache/nifi/grok/GrokRecordReader.java  | 15 ++++--
 .../apache/nifi/grok/TestGrokRecordReader.java  | 50 ++++++++++++++++----
 3 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/65596044/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index 30c7dd3..4a26975 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -61,7 +61,8 @@ import io.thekraken.grok.api.exception.GrokException;
     + "If a line in the input does not match the expected message pattern, the line of text is either considered to be part of the previous "
     + "message or is skipped, depending on the configuration, with the exception of stack traces. A stack trace that is found at the end of "
     + "a log message is considered to be part of the previous message but is added to the 'stackTrace' field of the Record. If a record has "
-    + "no stack trace, it will have a NULL value for the stackTrace field (assuming that the schema does in fact include a stackTrace field of type String).")
+    + "no stack trace, it will have a NULL value for the stackTrace field (assuming that the schema does in fact include a stackTrace field of type String). "
+    + "Assuming that the schema includes a '_raw' field of type String, the raw message will be included in the Record.")
 public class GrokReader extends SchemaRegistryService implements RecordReaderFactory {
     private volatile Grok grok;
     private volatile boolean appendUnmatchedLine;
@@ -150,6 +151,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
         populateSchemaFieldNames(grok, grokExpression, fields);
 
         fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType()));
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
         return schema;
@@ -241,4 +243,4 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
         final RecordSchema schema = getSchema(variables, in, null);
         return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, appendUnmatchedLine);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/65596044/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
index 65edf05..e7d81e4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -50,6 +50,8 @@ public class GrokRecordReader implements RecordReader {
     private String nextLine;
 
     static final String STACK_TRACE_COLUMN_NAME = "stackTrace";
+    static final String RAW_MESSAGE_NAME = "_raw";
+
     private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
         "^\\s*(?:(?:    |\\t)+at )|"
             + "(?:(?:    |\\t)+\\[CIRCULAR REFERENCE\\:)|"
@@ -73,8 +75,11 @@ public class GrokRecordReader implements RecordReader {
     @Override
     public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
         Map<String, Object> valueMap = null;
+        StringBuilder raw = new StringBuilder();
+
         while (valueMap == null || valueMap.isEmpty()) {
             final String line = nextLine == null ? reader.readLine() : nextLine;
+            raw.append(line);
             nextLine = null; // ensure that we don't process nextLine again
             if (line == null) {
                 return null;
@@ -98,9 +103,11 @@ public class GrokRecordReader implements RecordReader {
                 // the stack trace ends. Otherwise, append the next line to the last field in the record.
                 if (isStartOfStackTrace(nextLine)) {
                     stackTrace = readStackTrace(nextLine);
+                    raw.append("\n").append(stackTrace);
                     break;
                 } else if (append) {
                     trailingText.append("\n").append(nextLine);
+                    raw.append("\n").append(nextLine);
                 }
             } else {
                 // The next line matched our pattern.
@@ -108,11 +115,11 @@ public class GrokRecordReader implements RecordReader {
             }
         }
 
-        final Record record = createRecord(valueMap, trailingText, stackTrace, coerceTypes, dropUnknownFields);
+        final Record record = createRecord(valueMap, trailingText, stackTrace, raw.toString(), coerceTypes, dropUnknownFields);
         return record;
     }
 
-    private Record createRecord(final Map<String, Object> valueMap, final StringBuilder trailingText, final String stackTrace, final boolean coerceTypes, final boolean dropUnknown) {
+    private Record createRecord(final Map<String, Object> valueMap, final StringBuilder trailingText, final String stackTrace, final String raw, final boolean coerceTypes, final boolean dropUnknown) {
         final Map<String, Object> converted = new HashMap<>();
         for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
             final String fieldName = entry.getKey();
@@ -179,6 +186,8 @@ public class GrokRecordReader implements RecordReader {
         }
 
         converted.put(STACK_TRACE_COLUMN_NAME, stackTrace);
+        converted.put(RAW_MESSAGE_NAME, raw);
+
         return new MapRecord(schema, converted);
     }
 
@@ -257,4 +266,4 @@ public class GrokRecordReader implements RecordReader {
         return schema;
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/65596044/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
index b849c0a..83286dc 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
@@ -52,19 +52,24 @@ public class TestGrokRecordReader {
 
             final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", "FATAL", "FINE"};
             final String[] messages = new String[] {"Test Message 1", "Red", "Green", "Blue", "Yellow"};
+            final String[] rawMessages = new String[] {"2016-11-08 21:24:23,029 INFO Test Message 1",
+                    "2016-11-08 21:24:23,029 WARN Red", "2016-11-08 21:24:23,029 ERROR Green",
+                    "2016-11-08 21:24:23,029 FATAL Blue", "2016-11-08 21:24:23,029 FINE Yellow"};
 
             for (int i = 0; i < logLevels.length; i++) {
                 final Object[] values = deserializer.nextRecord().getValues();
 
                 assertNotNull(values);
-                assertEquals(4, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE
+                assertEquals(5, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE, RAW_MESSAGE
                 assertEquals("2016-11-08 21:24:23,029", values[0]);
                 assertEquals(logLevels[i], values[1]);
                 assertEquals(messages[i], values[2]);
                 assertNull(values[3]);
+                assertEquals(rawMessages[i], values[4]);
             }
 
             assertNull(deserializer.nextRecord());
+            deserializer.close();
         }
     }
 
@@ -83,13 +88,16 @@ public class TestGrokRecordReader {
         final Object[] values = deserializer.nextRecord().getValues();
 
         assertNotNull(values);
-        assertEquals(6, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE
+        assertEquals(7, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE, RAW_MESSAGE
         assertEquals("2016-08-04 13:26:32,473", values[0]);
         assertEquals("INFO", values[1]);
         assertEquals("Leader Election Notification Thread-1", values[2]);
         assertEquals("o.a.n.LoggerClass", values[3]);
         assertEquals("", values[4]);
         assertEquals("org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces", values[5]);
+        assertEquals(msg, values[6]);
+
+        deserializer.close();
     }
 
 
@@ -109,12 +117,14 @@ public class TestGrokRecordReader {
                 final Object[] values = deserializer.nextRecord().getValues();
 
                 assertNotNull(values);
-                assertEquals(6, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE
+                assertEquals(7, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE
                 assertEquals(logLevels[i], values[1]);
                 assertNull(values[5]);
+                assertNotNull(values[6]);
             }
 
             assertNull(deserializer.nextRecord());
+            deserializer.close();
         }
     }
 
@@ -134,18 +144,23 @@ public class TestGrokRecordReader {
                 final Object[] values = record.getValues();
 
                 assertNotNull(values);
-                assertEquals(6, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE
+                assertEquals(7, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE
                 assertEquals(logLevels[i], values[1]);
                 if ("ERROR".equals(values[1])) {
                     final String msg = (String) values[4];
                     assertEquals("One\nTwo\nThree", msg);
                     assertNotNull(values[5]);
+                    assertTrue(values[6].toString().startsWith("2016-08-04 13:26:32,474 ERROR [Leader Election Notification Thread-2] o.apache.nifi.controller.FlowController One"));
+                    assertTrue(values[6].toString().endsWith("    at org.apache.nifi.cluster."
+                            + "coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) "
+                            + "[nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]\n    ... 12 common frames omitted"));
                 } else {
                     assertNull(values[5]);
                 }
             }
 
             assertNull(deserializer.nextRecord());
+            deserializer.close();
         }
     }
 
@@ -168,9 +183,10 @@ public class TestGrokRecordReader {
                 final Object[] values = deserializer.nextRecord().getValues();
 
                 assertNotNull(values);
-                assertEquals(4, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE
+                assertEquals(5, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE, RAW_MESSAGE
                 assertEquals(logLevels[i], values[1]);
                 assertEquals(messages[i], values[2]);
+                assertNotNull(values[4]);
 
                 if (values[1].equals("ERROR")) {
                     final String stackTrace = (String) values[3];
@@ -182,10 +198,21 @@ public class TestGrokRecordReader {
                     assertTrue(stackTrace.contains("at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress("
                         + "NodeClusterCoordinator.java:185) [nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]"));
                     assertTrue(stackTrace.endsWith("    ... 12 common frames omitted"));
+
+                    final String raw = (String) values[4];
+                    assertTrue(raw.startsWith("2016-11-23 16:00:02,689 ERROR Log message with stack trace"));
+                    assertTrue(raw.contains("org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces"));
+                    assertTrue(raw.contains("        at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress("
+                        + "NodeClusterCoordinator.java:185) [nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]"));
+                    assertTrue(raw.contains("Caused by: org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces"));
+                    assertTrue(raw.contains("at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress("
+                        + "NodeClusterCoordinator.java:185) [nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]"));
+                    assertTrue(raw.endsWith("    ... 12 common frames omitted"));
                 }
             }
 
             assertNull(deserializer.nextRecord());
+            deserializer.close();
         }
     }
 
@@ -201,7 +228,7 @@ public class TestGrokRecordReader {
 
             final RecordSchema schema = GrokReader.createRecordSchema(grok);
             final List<String> fieldNames = schema.getFieldNames();
-            assertEquals(8, fieldNames.size());
+            assertEquals(9, fieldNames.size());
             assertTrue(fieldNames.contains("timestamp"));
             assertTrue(fieldNames.contains("logsource"));
             assertTrue(fieldNames.contains("facility"));
@@ -210,6 +237,7 @@ public class TestGrokRecordReader {
             assertTrue(fieldNames.contains("pid"));
             assertTrue(fieldNames.contains("message"));
             assertTrue(fieldNames.contains("stackTrace"));  // always implicitly there
+            assertTrue(fieldNames.contains("_raw"));  // always implicitly there
 
             final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, schema, true);
             final Record record = deserializer.nextRecord();
@@ -221,8 +249,10 @@ public class TestGrokRecordReader {
             assertEquals("nifi", record.getValue("program"));
             assertEquals("12345", record.getValue("pid"));
             assertEquals("My Message", record.getValue("message"));
+            assertEquals("May 22 15:58:23 my-host nifi[12345]:My Message", record.getValue("_raw"));
 
             assertNull(deserializer.nextRecord());
+            deserializer.close();
         }
     }
 
@@ -241,7 +271,7 @@ public class TestGrokRecordReader {
 
             final RecordSchema schema = GrokReader.createRecordSchema(grok);
             final List<String> fieldNames = schema.getFieldNames();
-            assertEquals(6, fieldNames.size());
+            assertEquals(7, fieldNames.size());
             assertTrue(fieldNames.contains("first"));
             assertTrue(fieldNames.contains("second"));
             assertTrue(fieldNames.contains("third"));
@@ -258,6 +288,7 @@ public class TestGrokRecordReader {
             assertEquals("5", record.getValue("fifth"));
 
             assertNull(deserializer.nextRecord());
+            deserializer.close();
         }
     }
 
@@ -276,7 +307,7 @@ public class TestGrokRecordReader {
 
             final RecordSchema schema = GrokReader.createRecordSchema(grok);
             final List<String> fieldNames = schema.getFieldNames();
-            assertEquals(6, fieldNames.size());
+            assertEquals(7, fieldNames.size());
             assertTrue(fieldNames.contains("first"));
             assertTrue(fieldNames.contains("second"));
             assertTrue(fieldNames.contains("third"));
@@ -295,6 +326,7 @@ public class TestGrokRecordReader {
             }
 
             assertNull(deserializer.nextRecord());
+            deserializer.close();
         }
     }
-}
+}
\ No newline at end of file