You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/09/08 16:44:58 UTC
nifi git commit: NIFI-4081 - Added raw message option in GrokReader
This closes #1921.
Repository: nifi
Updated Branches:
refs/heads/master 1f67cbf62 -> 655960445
NIFI-4081 - Added raw message option in GrokReader
This closes #1921.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/65596044
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/65596044
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/65596044
Branch: refs/heads/master
Commit: 65596044563bd8217a1fcfcd9bfafa02f2f13160
Parents: 1f67cbf
Author: Pierre Villard <pi...@gmail.com>
Authored: Fri Jun 16 00:09:45 2017 +0200
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Sep 8 12:44:37 2017 -0400
----------------------------------------------------------------------
.../java/org/apache/nifi/grok/GrokReader.java | 6 ++-
.../org/apache/nifi/grok/GrokRecordReader.java | 15 ++++--
.../apache/nifi/grok/TestGrokRecordReader.java | 50 ++++++++++++++++----
3 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/65596044/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index 30c7dd3..4a26975 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -61,7 +61,8 @@ import io.thekraken.grok.api.exception.GrokException;
+ "If a line in the input does not match the expected message pattern, the line of text is either considered to be part of the previous "
+ "message or is skipped, depending on the configuration, with the exception of stack traces. A stack trace that is found at the end of "
+ "a log message is considered to be part of the previous message but is added to the 'stackTrace' field of the Record. If a record has "
- + "no stack trace, it will have a NULL value for the stackTrace field (assuming that the schema does in fact include a stackTrace field of type String).")
+ + "no stack trace, it will have a NULL value for the stackTrace field (assuming that the schema does in fact include a stackTrace field of type String). "
+ + "Assuming that the schema includes a '_raw' field of type String, the raw message will be included in the Record.")
public class GrokReader extends SchemaRegistryService implements RecordReaderFactory {
private volatile Grok grok;
private volatile boolean appendUnmatchedLine;
@@ -150,6 +151,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
populateSchemaFieldNames(grok, grokExpression, fields);
fields.add(new RecordField(GrokRecordReader.STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField(GrokRecordReader.RAW_MESSAGE_NAME, RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
return schema;
@@ -241,4 +243,4 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
final RecordSchema schema = getSchema(variables, in, null);
return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, appendUnmatchedLine);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/65596044/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
index 65edf05..e7d81e4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -50,6 +50,8 @@ public class GrokRecordReader implements RecordReader {
private String nextLine;
static final String STACK_TRACE_COLUMN_NAME = "stackTrace";
+ static final String RAW_MESSAGE_NAME = "_raw";
+
private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
"^\\s*(?:(?: |\\t)+at )|"
+ "(?:(?: |\\t)+\\[CIRCULAR REFERENCE\\:)|"
@@ -73,8 +75,11 @@ public class GrokRecordReader implements RecordReader {
@Override
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
Map<String, Object> valueMap = null;
+ StringBuilder raw = new StringBuilder();
+
while (valueMap == null || valueMap.isEmpty()) {
final String line = nextLine == null ? reader.readLine() : nextLine;
+ raw.append(line);
nextLine = null; // ensure that we don't process nextLine again
if (line == null) {
return null;
@@ -98,9 +103,11 @@ public class GrokRecordReader implements RecordReader {
// the stack trace ends. Otherwise, append the next line to the last field in the record.
if (isStartOfStackTrace(nextLine)) {
stackTrace = readStackTrace(nextLine);
+ raw.append("\n").append(stackTrace);
break;
} else if (append) {
trailingText.append("\n").append(nextLine);
+ raw.append("\n").append(nextLine);
}
} else {
// The next line matched our pattern.
@@ -108,11 +115,11 @@ public class GrokRecordReader implements RecordReader {
}
}
- final Record record = createRecord(valueMap, trailingText, stackTrace, coerceTypes, dropUnknownFields);
+ final Record record = createRecord(valueMap, trailingText, stackTrace, raw.toString(), coerceTypes, dropUnknownFields);
return record;
}
- private Record createRecord(final Map<String, Object> valueMap, final StringBuilder trailingText, final String stackTrace, final boolean coerceTypes, final boolean dropUnknown) {
+ private Record createRecord(final Map<String, Object> valueMap, final StringBuilder trailingText, final String stackTrace, final String raw, final boolean coerceTypes, final boolean dropUnknown) {
final Map<String, Object> converted = new HashMap<>();
for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
final String fieldName = entry.getKey();
@@ -179,6 +186,8 @@ public class GrokRecordReader implements RecordReader {
}
converted.put(STACK_TRACE_COLUMN_NAME, stackTrace);
+ converted.put(RAW_MESSAGE_NAME, raw);
+
return new MapRecord(schema, converted);
}
@@ -257,4 +266,4 @@ public class GrokRecordReader implements RecordReader {
return schema;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/65596044/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
index b849c0a..83286dc 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokRecordReader.java
@@ -52,19 +52,24 @@ public class TestGrokRecordReader {
final String[] logLevels = new String[] {"INFO", "WARN", "ERROR", "FATAL", "FINE"};
final String[] messages = new String[] {"Test Message 1", "Red", "Green", "Blue", "Yellow"};
+ final String[] rawMessages = new String[] {"2016-11-08 21:24:23,029 INFO Test Message 1",
+ "2016-11-08 21:24:23,029 WARN Red", "2016-11-08 21:24:23,029 ERROR Green",
+ "2016-11-08 21:24:23,029 FATAL Blue", "2016-11-08 21:24:23,029 FINE Yellow"};
for (int i = 0; i < logLevels.length; i++) {
final Object[] values = deserializer.nextRecord().getValues();
assertNotNull(values);
- assertEquals(4, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE
+ assertEquals(5, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE, RAW_MESSAGE
assertEquals("2016-11-08 21:24:23,029", values[0]);
assertEquals(logLevels[i], values[1]);
assertEquals(messages[i], values[2]);
assertNull(values[3]);
+ assertEquals(rawMessages[i], values[4]);
}
assertNull(deserializer.nextRecord());
+ deserializer.close();
}
}
@@ -83,13 +88,16 @@ public class TestGrokRecordReader {
final Object[] values = deserializer.nextRecord().getValues();
assertNotNull(values);
- assertEquals(6, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE
+ assertEquals(7, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE, RAW_MESSAGE
assertEquals("2016-08-04 13:26:32,473", values[0]);
assertEquals("INFO", values[1]);
assertEquals("Leader Election Notification Thread-1", values[2]);
assertEquals("o.a.n.LoggerClass", values[3]);
assertEquals("", values[4]);
assertEquals("org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces", values[5]);
+ assertEquals(msg, values[6]);
+
+ deserializer.close();
}
@@ -109,12 +117,14 @@ public class TestGrokRecordReader {
final Object[] values = deserializer.nextRecord().getValues();
assertNotNull(values);
- assertEquals(6, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE
+ assertEquals(7, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE
assertEquals(logLevels[i], values[1]);
assertNull(values[5]);
+ assertNotNull(values[6]);
}
assertNull(deserializer.nextRecord());
+ deserializer.close();
}
}
@@ -134,18 +144,23 @@ public class TestGrokRecordReader {
final Object[] values = record.getValues();
assertNotNull(values);
- assertEquals(6, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE
+ assertEquals(7, values.length); // values[] contains 6 elements: timestamp, level, thread, class, message, STACK_TRACE, RAW_MESSAGE
assertEquals(logLevels[i], values[1]);
if ("ERROR".equals(values[1])) {
final String msg = (String) values[4];
assertEquals("One\nTwo\nThree", msg);
assertNotNull(values[5]);
+ assertTrue(values[6].toString().startsWith("2016-08-04 13:26:32,474 ERROR [Leader Election Notification Thread-2] o.apache.nifi.controller.FlowController One"));
+ assertTrue(values[6].toString().endsWith(" at org.apache.nifi.cluster."
+ + "coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress(NodeClusterCoordinator.java:185) "
+ + "[nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]\n ... 12 common frames omitted"));
} else {
assertNull(values[5]);
}
}
assertNull(deserializer.nextRecord());
+ deserializer.close();
}
}
@@ -168,9 +183,10 @@ public class TestGrokRecordReader {
final Object[] values = deserializer.nextRecord().getValues();
assertNotNull(values);
- assertEquals(4, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE
+ assertEquals(5, values.length); // values[] contains 4 elements: timestamp, level, message, STACK_TRACE, RAW_MESSAGE
assertEquals(logLevels[i], values[1]);
assertEquals(messages[i], values[2]);
+ assertNotNull(values[4]);
if (values[1].equals("ERROR")) {
final String stackTrace = (String) values[3];
@@ -182,10 +198,21 @@ public class TestGrokRecordReader {
assertTrue(stackTrace.contains("at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress("
+ "NodeClusterCoordinator.java:185) [nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]"));
assertTrue(stackTrace.endsWith(" ... 12 common frames omitted"));
+
+ final String raw = (String) values[4];
+ assertTrue(raw.startsWith("2016-11-23 16:00:02,689 ERROR Log message with stack trace"));
+ assertTrue(raw.contains("org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces"));
+ assertTrue(raw.contains(" at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress("
+ + "NodeClusterCoordinator.java:185) [nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]"));
+ assertTrue(raw.contains("Caused by: org.apache.nifi.exception.UnitTestException: Testing to ensure we are able to capture stack traces"));
+ assertTrue(raw.contains("at org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.getElectedActiveCoordinatorAddress("
+ + "NodeClusterCoordinator.java:185) [nifi-framework-cluster-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]"));
+ assertTrue(raw.endsWith(" ... 12 common frames omitted"));
}
}
assertNull(deserializer.nextRecord());
+ deserializer.close();
}
}
@@ -201,7 +228,7 @@ public class TestGrokRecordReader {
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
- assertEquals(8, fieldNames.size());
+ assertEquals(9, fieldNames.size());
assertTrue(fieldNames.contains("timestamp"));
assertTrue(fieldNames.contains("logsource"));
assertTrue(fieldNames.contains("facility"));
@@ -210,6 +237,7 @@ public class TestGrokRecordReader {
assertTrue(fieldNames.contains("pid"));
assertTrue(fieldNames.contains("message"));
assertTrue(fieldNames.contains("stackTrace")); // always implicitly there
+ assertTrue(fieldNames.contains("_raw")); // always implicitly there
final GrokRecordReader deserializer = new GrokRecordReader(in, grok, schema, schema, true);
final Record record = deserializer.nextRecord();
@@ -221,8 +249,10 @@ public class TestGrokRecordReader {
assertEquals("nifi", record.getValue("program"));
assertEquals("12345", record.getValue("pid"));
assertEquals("My Message", record.getValue("message"));
+ assertEquals("May 22 15:58:23 my-host nifi[12345]:My Message", record.getValue("_raw"));
assertNull(deserializer.nextRecord());
+ deserializer.close();
}
}
@@ -241,7 +271,7 @@ public class TestGrokRecordReader {
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
- assertEquals(6, fieldNames.size());
+ assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
assertTrue(fieldNames.contains("second"));
assertTrue(fieldNames.contains("third"));
@@ -258,6 +288,7 @@ public class TestGrokRecordReader {
assertEquals("5", record.getValue("fifth"));
assertNull(deserializer.nextRecord());
+ deserializer.close();
}
}
@@ -276,7 +307,7 @@ public class TestGrokRecordReader {
final RecordSchema schema = GrokReader.createRecordSchema(grok);
final List<String> fieldNames = schema.getFieldNames();
- assertEquals(6, fieldNames.size());
+ assertEquals(7, fieldNames.size());
assertTrue(fieldNames.contains("first"));
assertTrue(fieldNames.contains("second"));
assertTrue(fieldNames.contains("third"));
@@ -295,6 +326,7 @@ public class TestGrokRecordReader {
}
assertNull(deserializer.nextRecord());
+ deserializer.close();
}
}
-}
+}
\ No newline at end of file