You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2022/08/08 12:48:30 UTC
[nifi] branch main updated: NIFI-3964 Updated Grok Patterns to support TEXT and URL Resources
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fdf3925f81 NIFI-3964 Updated Grok Patterns to support TEXT and URL Resources
fdf3925f81 is described below
commit fdf3925f810e35bf5702b2abdb9a1ea966d5db0a
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Fri Jul 29 16:43:36 2022 -0500
NIFI-3964 Updated Grok Patterns to support TEXT and URL Resources
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #6260.
---
.../nifi/processors/standard/ExtractGrok.java | 63 ++++++++--------------
.../nifi/processors/standard/TestExtractGrok.java | 26 ++++++---
.../apache/nifi/grok/GrokExpressionValidator.java | 27 ++++------
.../main/java/org/apache/nifi/grok/GrokReader.java | 22 ++++----
.../java/org/apache/nifi/grok/TestGrokReader.java | 32 ++++++++++-
5 files changed, 91 insertions(+), 79 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
index 1a8aacfafa..90e3706eca 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
@@ -44,19 +44,12 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
-import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.Reader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
@@ -97,12 +90,13 @@ public class ExtractGrok extends AbstractProcessor {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
- public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor GROK_PATTERNS = new PropertyDescriptor.Builder()
.name("Grok Pattern file")
- .description("Grok Pattern file definition. This file will be loaded after the default Grok "
- + "patterns file. If not set, then only the Grok Expression and the default Grok patterns will be used.")
+ .displayName("Grok Patterns")
+ .description("Custom Grok pattern definitions. These definitions will be loaded after the default Grok "
+ + "patterns. The Grok Parser will use the default Grok patterns when this property is not configured.")
.required(false)
- .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
+ .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT, ResourceType.URL)
.build();
public static final PropertyDescriptor KEEP_EMPTY_CAPTURES = new PropertyDescriptor.Builder()
@@ -164,7 +158,6 @@ public class ExtractGrok extends AbstractProcessor {
private final static List<PropertyDescriptor> descriptors;
private final static Set<Relationship> relationships;
- private volatile GrokCompiler grokCompiler;
private volatile Grok grok;
private final BlockingQueue<byte[]> bufferQueue = new LinkedBlockingQueue<>();
@@ -178,7 +171,7 @@ public class ExtractGrok extends AbstractProcessor {
final List<PropertyDescriptor> _descriptors = new ArrayList<>();
_descriptors.add(GROK_EXPRESSION);
- _descriptors.add(GROK_PATTERN_FILE);
+ _descriptors.add(GROK_PATTERNS);
_descriptors.add(DESTINATION);
_descriptors.add(CHARACTER_SET);
_descriptors.add(MAX_BUFFER_SIZE);
@@ -221,15 +214,13 @@ public class ExtractGrok extends AbstractProcessor {
try {
- try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
- final Reader reader = new InputStreamReader(in)) {
- grokCompiler.register(in);
+ try (final InputStream defaultPatterns = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME)) {
+ grokCompiler.register(defaultPatterns);
}
- if (validationContext.getProperty(GROK_PATTERN_FILE).isSet()) {
- try (final InputStream in = validationContext.getProperty(GROK_PATTERN_FILE).asResource().read();
- final Reader reader = new InputStreamReader(in)) {
- grokCompiler.register(reader);
+ if (validationContext.getProperty(GROK_PATTERNS).isSet()) {
+ try (final InputStream patterns = validationContext.getProperty(GROK_PATTERNS).asResource().read()) {
+ grokCompiler.register(patterns);
}
}
grok = grokCompiler.compile(input, namedCaptures);
@@ -258,17 +249,15 @@ public class ExtractGrok extends AbstractProcessor {
bufferQueue.add(buffer);
}
- grokCompiler = GrokCompiler.newInstance();
+ final GrokCompiler grokCompiler = GrokCompiler.newInstance();
- try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
- final Reader reader = new InputStreamReader(in)) {
- grokCompiler.register(in);
+ try (final InputStream defaultPatterns = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME)) {
+ grokCompiler.register(defaultPatterns);
}
- if (context.getProperty(GROK_PATTERN_FILE).isSet()) {
- try (final InputStream in = new FileInputStream(new File(context.getProperty(GROK_PATTERN_FILE).getValue()));
- final Reader reader = new InputStreamReader(in)) {
- grokCompiler.register(reader);
+ if (context.getProperty(GROK_PATTERNS).isSet()) {
+ try (final InputStream patterns = context.getProperty(GROK_PATTERNS).asResource().read()) {
+ grokCompiler.register(patterns);
}
}
grok = grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue(), context.getProperty(NAMED_CAPTURES_ONLY).asBoolean());
@@ -292,12 +281,7 @@ public class ExtractGrok extends AbstractProcessor {
try {
final byte[] byteBuffer = buffer;
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(InputStream in) throws IOException {
- StreamUtils.fillBuffer(in, byteBuffer, false);
- }
- });
+ session.read(flowFile, in -> StreamUtils.fillBuffer(in, byteBuffer, false));
final long len = Math.min(byteBuffer.length, flowFile.getSize());
contentString = new String(byteBuffer, 0, (int) len, charset);
} finally {
@@ -310,7 +294,7 @@ public class ExtractGrok extends AbstractProcessor {
if (captureMap.isEmpty()) {
session.transfer(flowFile, REL_NO_MATCH);
- getLogger().info("Did not match any Grok Expressions for FlowFile {}", new Object[]{flowFile});
+ getLogger().info("Did not match any Grok Expressions for FlowFile {}", flowFile);
return;
}
@@ -327,16 +311,11 @@ public class ExtractGrok extends AbstractProcessor {
flowFile = session.putAllAttributes(flowFile, grokResults);
session.getProvenanceReporter().modifyAttributes(flowFile);
session.transfer(flowFile, REL_MATCH);
- getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile});
+ getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", grokResults.size(), flowFile);
break;
case FLOWFILE_CONTENT:
- FlowFile conFlowfile = session.write(flowFile, new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out) throws IOException {
- out.write(objectMapper.writeValueAsBytes(captureMap));
- }
- });
+ FlowFile conFlowfile = session.write(flowFile, outputStream -> objectMapper.writeValue(outputStream, captureMap));
conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
session.getProvenanceReporter().modifyContent(conFlowfile, "Replaced content with parsed Grok fields and values", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(conFlowfile, REL_MATCH);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java
index bc62cb4c8e..54969150c3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java
@@ -46,10 +46,22 @@ public class TestExtractGrok {
testRunner.assertNotValid();
}
+ @Test
+ public void testExtractGrokPatternsProperty() {
+ testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{USERNAME:username} - %{DATA}");
+ testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "USERNAME [a-zA-Z0-9._-]+");
+ testRunner.enqueue("admin - 127.0.0.1");
+ testRunner.run();
+ testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH);
+ final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0);
+
+ matched.assertAttributeEquals("grok.username","admin");
+ }
+
@Test
public void testExtractGrokWithMatchedContent() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}");
- testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
+ testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "src/test/resources/TestExtractGrok/patterns");
testRunner.enqueue(GROK_LOG_INPUT);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH);
@@ -66,7 +78,7 @@ public class TestExtractGrok {
}
@Test
- public void testExtractGrokKeepEmptyCaptures() throws Exception {
+ public void testExtractGrokKeepEmptyCaptures() {
String expression = "%{NUMBER}|%{NUMBER}";
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION,expression);
testRunner.enqueue("-42");
@@ -77,7 +89,7 @@ public class TestExtractGrok {
}
@Test
- public void testExtractGrokDoNotKeepEmptyCaptures() throws Exception {
+ public void testExtractGrokDoNotKeepEmptyCaptures() {
String expression = "%{NUMBER}|%{NUMBER}";
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION,expression);
testRunner.setProperty(ExtractGrok.KEEP_EMPTY_CAPTURES,"false");
@@ -92,7 +104,7 @@ public class TestExtractGrok {
@Test
public void testExtractGrokWithUnMatchedContent() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{URI}");
- testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
+ testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "src/test/resources/TestExtractGrok/patterns");
testRunner.enqueue(GROK_TEXT_INPUT);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_NO_MATCH);
@@ -103,7 +115,7 @@ public class TestExtractGrok {
@Test
public void testExtractGrokWithNotFoundPatternFile() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}");
- testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/toto_file");
+ testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "file:///src/test/resources/TestExtractGrok/file-not-found");
testRunner.enqueue(GROK_LOG_INPUT);
testRunner.assertNotValid();
}
@@ -111,7 +123,7 @@ public class TestExtractGrok {
@Test
public void testExtractGrokWithBadGrokExpression() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{TOTO");
- testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
+ testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "src/test/resources/TestExtractGrok/patterns");
testRunner.enqueue(GROK_LOG_INPUT);
testRunner.assertNotValid();
}
@@ -119,7 +131,7 @@ public class TestExtractGrok {
@Test
public void testExtractGrokWithNamedCapturesOnly() throws IOException {
testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}");
- testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
+ testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "src/test/resources/TestExtractGrok/patterns");
testRunner.setProperty(ExtractGrok.NAMED_CAPTURES_ONLY, "true");
testRunner.enqueue(GROK_LOG_INPUT);
testRunner.run();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java
index 7894b33835..0458cf07e5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java
@@ -21,38 +21,29 @@ import io.krakens.grok.api.GrokCompiler;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.resource.ResourceReference;
-import java.io.File;
-import java.io.FileInputStream;
import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
public class GrokExpressionValidator implements Validator {
- private GrokCompiler grokCompiler;
- private String patternFileName;
+ private final GrokCompiler grokCompiler;
+ private final ResourceReference patternsReference;
- public GrokExpressionValidator(String patternFileName, GrokCompiler compiler) {
- this.patternFileName = patternFileName;
+ public GrokExpressionValidator(ResourceReference patternsReference, GrokCompiler compiler) {
+ this.patternsReference = patternsReference;
this.grokCompiler = compiler;
}
- public GrokExpressionValidator() {
- this.grokCompiler = GrokCompiler.newInstance();
- }
-
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
try {
- try (final InputStream in = getClass().getResourceAsStream(GrokReader.DEFAULT_PATTERN_NAME);
- final Reader reader = new InputStreamReader(in)) {
+ try (final InputStream in = getClass().getResourceAsStream(GrokReader.DEFAULT_PATTERN_NAME)) {
grokCompiler.register(in);
}
- if (patternFileName != null) {
- try (final InputStream in = new FileInputStream(new File(patternFileName));
- final Reader reader = new InputStreamReader(in)) {
- grokCompiler.register(reader);
+ if (patternsReference != null) {
+ try (final InputStream patterns = patternsReference.read()) {
+ grokCompiler.register(patterns);
}
}
grokCompiler.compile(input);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index 6d4aa182c0..80528281da 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -94,12 +94,13 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
+ "The schema will also include a `stackTrace` field, and a `_raw` field containing the input line string."
);
- static final PropertyDescriptor PATTERN_FILE = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor GROK_PATTERNS = new PropertyDescriptor.Builder()
.name("Grok Pattern File")
- .description("Path to a file that contains Grok Patterns to use for parsing logs. If not specified, a built-in default Pattern file "
- + "will be used. If specified, all patterns in the given pattern file will override the default patterns. See the Controller Service's "
+ .displayName("Grok Patterns")
+ .description("Grok Patterns to use for parsing logs. If not specified, a built-in default Pattern file "
+ + "will be used. If specified, all patterns specified will override the default patterns. See the Controller Service's "
+ "Additional Details for a list of pre-defined patterns.")
- .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
+ .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.URL, ResourceType.TEXT)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.build();
@@ -130,7 +131,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
- properties.add(PATTERN_FILE);
+ properties.add(GROK_PATTERNS);
properties.add(GROK_EXPRESSION);
properties.add(NO_MATCH_BEHAVIOR);
return properties;
@@ -144,10 +145,9 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
grokCompiler.register(defaultPatterns);
}
- if (context.getProperty(PATTERN_FILE).isSet()) {
- try (final InputStream in = context.getProperty(PATTERN_FILE).evaluateAttributeExpressions().asResource().read();
- final Reader reader = new InputStreamReader(in)) {
- grokCompiler.register(reader);
+ if (context.getProperty(GROK_PATTERNS).isSet()) {
+ try (final InputStream patterns = context.getProperty(GROK_PATTERNS).evaluateAttributeExpressions().asResource().read()) {
+ grokCompiler.register(patterns);
}
}
@@ -191,8 +191,8 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
.build());
}
- final String patternFileName = validationContext.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue();
- final GrokExpressionValidator validator = new GrokExpressionValidator(patternFileName, grokCompiler);
+ final ResourceReference patternsReference = validationContext.getProperty(GROK_PATTERNS).evaluateAttributeExpressions().asResource();
+ final GrokExpressionValidator validator = new GrokExpressionValidator(patternsReference, grokCompiler);
try {
final List<String> grokExpressions = readGrokExpressions(validationContext);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
index 0a5f3fcea9..1094eec71c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
@@ -112,7 +112,7 @@ public class TestGrokReader {
final GrokReader grokReader = new GrokReader();
runner.addControllerService(GrokReader.class.getSimpleName(), grokReader);
- runner.setProperty(grokReader, GrokReader.PATTERN_FILE, grokPatternFile);
+ runner.setProperty(grokReader, GrokReader.GROK_PATTERNS, grokPatternFile);
runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, grokExpression);
runner.enableControllerService(grokReader);
@@ -171,4 +171,34 @@ public class TestGrokReader {
assertNull(recordReader.nextRecord());
}
+
+ @Test
+ public void testPatternsProperty() throws InitializationException, IOException, SchemaNotFoundException, MalformedRecordException {
+ final String program = "NiFi";
+ final String level = "INFO";
+ final String message = "Processing Started";
+
+ final String logs = String.format("%s %s %s%n", program, level, message);
+ final byte[] bytes = logs.getBytes(StandardCharsets.UTF_8);
+
+ final String matchingExpression = "%{PROGRAM:program} %{LOGLEVEL:level} %{GREEDYDATA:message}";
+
+ final GrokReader grokReader = new GrokReader();
+ runner.addControllerService(GrokReader.class.getSimpleName(), grokReader);
+ runner.setProperty(grokReader, GrokReader.GROK_PATTERNS, "PROGRAM [a-zA-Z]+");
+ runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, matchingExpression);
+ runner.setProperty(grokReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, GrokReader.STRING_FIELDS_FROM_GROK_EXPRESSION);
+ runner.enableControllerService(grokReader);
+
+ final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+ final RecordReader recordReader = grokReader.createRecordReader(Collections.emptyMap(), inputStream, bytes.length, runner.getLogger());
+
+ final Record firstRecord = recordReader.nextRecord();
+ assertNotNull(firstRecord);
+ assertEquals(program, firstRecord.getValue(PROGRAM_FIELD));
+ assertEquals(level, firstRecord.getValue(LEVEL_FIELD));
+ assertEquals(message, firstRecord.getValue(MESSAGE_FIELD));
+
+ assertNull(recordReader.nextRecord());
+ }
}