You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2022/08/08 12:48:30 UTC

[nifi] branch main updated: NIFI-3964 Updated Grok Patterns to support TEXT and URL Resources

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new fdf3925f81 NIFI-3964 Updated Grok Patterns to support TEXT and URL Resources
fdf3925f81 is described below

commit fdf3925f810e35bf5702b2abdb9a1ea966d5db0a
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Fri Jul 29 16:43:36 2022 -0500

    NIFI-3964 Updated Grok Patterns to support TEXT and URL Resources
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #6260.
---
 .../nifi/processors/standard/ExtractGrok.java      | 63 ++++++++--------------
 .../nifi/processors/standard/TestExtractGrok.java  | 26 ++++++---
 .../apache/nifi/grok/GrokExpressionValidator.java  | 27 ++++------
 .../main/java/org/apache/nifi/grok/GrokReader.java | 22 ++++----
 .../java/org/apache/nifi/grok/TestGrokReader.java  | 32 ++++++++++-
 5 files changed, 91 insertions(+), 79 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
index 1a8aacfafa..90e3706eca 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java
@@ -44,19 +44,12 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.Reader;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -97,12 +90,13 @@ public class ExtractGrok extends AbstractProcessor {
         .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
         .build();
 
-    public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor GROK_PATTERNS = new PropertyDescriptor.Builder()
         .name("Grok Pattern file")
-        .description("Grok Pattern file definition.  This file will be loaded after the default Grok "
-            + "patterns file.  If not set, then only the Grok Expression and the default Grok patterns will be used.")
+        .displayName("Grok Patterns")
+        .description("Custom Grok pattern definitions. These definitions will be loaded after the default Grok "
+            + "patterns. The Grok Parser will use the default Grok patterns when this property is not configured.")
         .required(false)
-        .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
+        .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.TEXT, ResourceType.URL)
         .build();
 
     public static final PropertyDescriptor KEEP_EMPTY_CAPTURES = new PropertyDescriptor.Builder()
@@ -164,7 +158,6 @@ public class ExtractGrok extends AbstractProcessor {
     private final static List<PropertyDescriptor> descriptors;
     private final static Set<Relationship> relationships;
 
-    private volatile GrokCompiler grokCompiler;
     private volatile Grok grok;
     private final BlockingQueue<byte[]> bufferQueue = new LinkedBlockingQueue<>();
 
@@ -178,7 +171,7 @@ public class ExtractGrok extends AbstractProcessor {
 
         final List<PropertyDescriptor> _descriptors = new ArrayList<>();
         _descriptors.add(GROK_EXPRESSION);
-        _descriptors.add(GROK_PATTERN_FILE);
+        _descriptors.add(GROK_PATTERNS);
         _descriptors.add(DESTINATION);
         _descriptors.add(CHARACTER_SET);
         _descriptors.add(MAX_BUFFER_SIZE);
@@ -221,15 +214,13 @@ public class ExtractGrok extends AbstractProcessor {
 
         try {
 
-            try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
-                 final Reader reader = new InputStreamReader(in)) {
-                grokCompiler.register(in);
+            try (final InputStream defaultPatterns = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME)) {
+                grokCompiler.register(defaultPatterns);
             }
 
-            if (validationContext.getProperty(GROK_PATTERN_FILE).isSet()) {
-                try (final InputStream in = validationContext.getProperty(GROK_PATTERN_FILE).asResource().read();
-                     final Reader reader = new InputStreamReader(in)) {
-                    grokCompiler.register(reader);
+            if (validationContext.getProperty(GROK_PATTERNS).isSet()) {
+                try (final InputStream patterns = validationContext.getProperty(GROK_PATTERNS).asResource().read()) {
+                    grokCompiler.register(patterns);
                 }
             }
             grok = grokCompiler.compile(input, namedCaptures);
@@ -258,17 +249,15 @@ public class ExtractGrok extends AbstractProcessor {
             bufferQueue.add(buffer);
         }
 
-        grokCompiler = GrokCompiler.newInstance();
+        final GrokCompiler grokCompiler = GrokCompiler.newInstance();
 
-        try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
-             final Reader reader = new InputStreamReader(in)) {
-            grokCompiler.register(in);
+        try (final InputStream defaultPatterns = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME)) {
+            grokCompiler.register(defaultPatterns);
         }
 
-        if (context.getProperty(GROK_PATTERN_FILE).isSet()) {
-            try (final InputStream in = new FileInputStream(new File(context.getProperty(GROK_PATTERN_FILE).getValue()));
-                 final Reader reader = new InputStreamReader(in)) {
-                grokCompiler.register(reader);
+        if (context.getProperty(GROK_PATTERNS).isSet()) {
+            try (final InputStream patterns = context.getProperty(GROK_PATTERNS).asResource().read()) {
+                grokCompiler.register(patterns);
             }
         }
         grok = grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue(), context.getProperty(NAMED_CAPTURES_ONLY).asBoolean());
@@ -292,12 +281,7 @@ public class ExtractGrok extends AbstractProcessor {
 
         try {
             final byte[] byteBuffer = buffer;
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(InputStream in) throws IOException {
-                    StreamUtils.fillBuffer(in, byteBuffer, false);
-                }
-            });
+            session.read(flowFile, in -> StreamUtils.fillBuffer(in, byteBuffer, false));
             final long len = Math.min(byteBuffer.length, flowFile.getSize());
             contentString = new String(byteBuffer, 0, (int) len, charset);
         } finally {
@@ -310,7 +294,7 @@ public class ExtractGrok extends AbstractProcessor {
 
         if (captureMap.isEmpty()) {
             session.transfer(flowFile, REL_NO_MATCH);
-            getLogger().info("Did not match any Grok Expressions for FlowFile {}", new Object[]{flowFile});
+            getLogger().info("Did not match any Grok Expressions for FlowFile {}", flowFile);
             return;
         }
 
@@ -327,16 +311,11 @@ public class ExtractGrok extends AbstractProcessor {
                 flowFile = session.putAllAttributes(flowFile, grokResults);
                 session.getProvenanceReporter().modifyAttributes(flowFile);
                 session.transfer(flowFile, REL_MATCH);
-                getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile});
+                getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", grokResults.size(), flowFile);
 
                 break;
             case FLOWFILE_CONTENT:
-                FlowFile conFlowfile = session.write(flowFile, new StreamCallback() {
-                    @Override
-                    public void process(InputStream in, OutputStream out) throws IOException {
-                        out.write(objectMapper.writeValueAsBytes(captureMap));
-                    }
-                });
+                FlowFile conFlowfile = session.write(flowFile, outputStream -> objectMapper.writeValue(outputStream, captureMap));
                 conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
                 session.getProvenanceReporter().modifyContent(conFlowfile, "Replaced content with parsed Grok fields and values", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                 session.transfer(conFlowfile, REL_MATCH);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java
index bc62cb4c8e..54969150c3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java
@@ -46,10 +46,22 @@ public class TestExtractGrok {
         testRunner.assertNotValid();
     }
 
+    @Test
+    public void testExtractGrokPatternsProperty() {
+        testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{USERNAME:username} - %{DATA}");
+        testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "USERNAME [a-zA-Z0-9._-]+");
+        testRunner.enqueue("admin - 127.0.0.1");
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH);
+        final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0);
+
+        matched.assertAttributeEquals("grok.username","admin");
+    }
+
     @Test
     public void testExtractGrokWithMatchedContent() throws IOException {
         testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}");
-        testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
+        testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "src/test/resources/TestExtractGrok/patterns");
         testRunner.enqueue(GROK_LOG_INPUT);
         testRunner.run();
         testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH);
@@ -66,7 +78,7 @@ public class TestExtractGrok {
     }
 
     @Test
-    public void testExtractGrokKeepEmptyCaptures() throws Exception {
+    public void testExtractGrokKeepEmptyCaptures()  {
         String expression = "%{NUMBER}|%{NUMBER}";
         testRunner.setProperty(ExtractGrok.GROK_EXPRESSION,expression);
         testRunner.enqueue("-42");
@@ -77,7 +89,7 @@ public class TestExtractGrok {
     }
 
     @Test
-    public void testExtractGrokDoNotKeepEmptyCaptures() throws Exception {
+    public void testExtractGrokDoNotKeepEmptyCaptures() {
         String expression = "%{NUMBER}|%{NUMBER}";
         testRunner.setProperty(ExtractGrok.GROK_EXPRESSION,expression);
         testRunner.setProperty(ExtractGrok.KEEP_EMPTY_CAPTURES,"false");
@@ -92,7 +104,7 @@ public class TestExtractGrok {
     @Test
     public void testExtractGrokWithUnMatchedContent() throws IOException {
         testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{URI}");
-        testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
+        testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "src/test/resources/TestExtractGrok/patterns");
         testRunner.enqueue(GROK_TEXT_INPUT);
         testRunner.run();
         testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_NO_MATCH);
@@ -103,7 +115,7 @@ public class TestExtractGrok {
     @Test
     public void testExtractGrokWithNotFoundPatternFile() throws IOException {
         testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}");
-        testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/toto_file");
+        testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "file:///src/test/resources/TestExtractGrok/file-not-found");
         testRunner.enqueue(GROK_LOG_INPUT);
         testRunner.assertNotValid();
     }
@@ -111,7 +123,7 @@ public class TestExtractGrok {
     @Test
     public void testExtractGrokWithBadGrokExpression() throws IOException {
         testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{TOTO");
-        testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
+        testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "src/test/resources/TestExtractGrok/patterns");
         testRunner.enqueue(GROK_LOG_INPUT);
         testRunner.assertNotValid();
     }
@@ -119,7 +131,7 @@ public class TestExtractGrok {
     @Test
     public void testExtractGrokWithNamedCapturesOnly() throws IOException {
         testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}");
-        testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns");
+        testRunner.setProperty(ExtractGrok.GROK_PATTERNS, "src/test/resources/TestExtractGrok/patterns");
         testRunner.setProperty(ExtractGrok.NAMED_CAPTURES_ONLY, "true");
         testRunner.enqueue(GROK_LOG_INPUT);
         testRunner.run();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java
index 7894b33835..0458cf07e5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java
@@ -21,38 +21,29 @@ import io.krakens.grok.api.GrokCompiler;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.resource.ResourceReference;
 
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
 
 public class GrokExpressionValidator implements Validator {
-    private GrokCompiler grokCompiler;
-    private String patternFileName;
+    private final GrokCompiler grokCompiler;
+    private final ResourceReference patternsReference;
 
-    public GrokExpressionValidator(String patternFileName, GrokCompiler compiler) {
-        this.patternFileName = patternFileName;
+    public GrokExpressionValidator(ResourceReference patternsReference, GrokCompiler compiler) {
+        this.patternsReference = patternsReference;
         this.grokCompiler = compiler;
     }
 
-    public GrokExpressionValidator() {
-        this.grokCompiler = GrokCompiler.newInstance();
-    }
-
     @Override
     public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
         try {
-            try (final InputStream in = getClass().getResourceAsStream(GrokReader.DEFAULT_PATTERN_NAME);
-                 final Reader reader = new InputStreamReader(in)) {
+            try (final InputStream in = getClass().getResourceAsStream(GrokReader.DEFAULT_PATTERN_NAME)) {
                 grokCompiler.register(in);
             }
 
-            if (patternFileName != null) {
-                try (final InputStream in = new FileInputStream(new File(patternFileName));
-                     final Reader reader = new InputStreamReader(in)) {
-                    grokCompiler.register(reader);
+            if (patternsReference != null) {
+                try (final InputStream patterns = patternsReference.read()) {
+                    grokCompiler.register(patterns);
                 }
             }
             grokCompiler.compile(input);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index 6d4aa182c0..80528281da 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -94,12 +94,13 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
             + "The schema will also include a `stackTrace` field, and a `_raw` field containing the input line string."
     );
 
-    static final PropertyDescriptor PATTERN_FILE = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor GROK_PATTERNS = new PropertyDescriptor.Builder()
         .name("Grok Pattern File")
-        .description("Path to a file that contains Grok Patterns to use for parsing logs. If not specified, a built-in default Pattern file "
-            + "will be used. If specified, all patterns in the given pattern file will override the default patterns. See the Controller Service's "
+        .displayName("Grok Patterns")
+        .description("Grok Patterns to use for parsing logs. If not specified, a built-in default Pattern file "
+            + "will be used. If specified, all patterns specified will override the default patterns. See the Controller Service's "
             + "Additional Details for a list of pre-defined patterns.")
-        .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
+        .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.URL, ResourceType.TEXT)
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .required(false)
         .build();
@@ -130,7 +131,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
-        properties.add(PATTERN_FILE);
+        properties.add(GROK_PATTERNS);
         properties.add(GROK_EXPRESSION);
         properties.add(NO_MATCH_BEHAVIOR);
         return properties;
@@ -144,10 +145,9 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
             grokCompiler.register(defaultPatterns);
         }
 
-        if (context.getProperty(PATTERN_FILE).isSet()) {
-            try (final InputStream in = context.getProperty(PATTERN_FILE).evaluateAttributeExpressions().asResource().read();
-                 final Reader reader = new InputStreamReader(in)) {
-                grokCompiler.register(reader);
+        if (context.getProperty(GROK_PATTERNS).isSet()) {
+            try (final InputStream patterns = context.getProperty(GROK_PATTERNS).evaluateAttributeExpressions().asResource().read()) {
+                grokCompiler.register(patterns);
             }
         }
 
@@ -191,8 +191,8 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
                     .build());
         }
 
-        final String patternFileName = validationContext.getProperty(PATTERN_FILE).evaluateAttributeExpressions().getValue();
-        final GrokExpressionValidator validator = new GrokExpressionValidator(patternFileName, grokCompiler);
+        final ResourceReference patternsReference = validationContext.getProperty(GROK_PATTERNS).evaluateAttributeExpressions().asResource();
+        final GrokExpressionValidator validator = new GrokExpressionValidator(patternsReference, grokCompiler);
 
         try {
             final List<String> grokExpressions = readGrokExpressions(validationContext);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
index 0a5f3fcea9..1094eec71c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/grok/TestGrokReader.java
@@ -112,7 +112,7 @@ public class TestGrokReader {
 
         final GrokReader grokReader = new GrokReader();
         runner.addControllerService(GrokReader.class.getSimpleName(), grokReader);
-        runner.setProperty(grokReader, GrokReader.PATTERN_FILE, grokPatternFile);
+        runner.setProperty(grokReader, GrokReader.GROK_PATTERNS, grokPatternFile);
         runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, grokExpression);
         runner.enableControllerService(grokReader);
 
@@ -171,4 +171,34 @@ public class TestGrokReader {
 
         assertNull(recordReader.nextRecord());
     }
+
+    @Test
+    public void testPatternsProperty() throws InitializationException, IOException, SchemaNotFoundException, MalformedRecordException {
+        final String program = "NiFi";
+        final String level = "INFO";
+        final String message = "Processing Started";
+
+        final String logs = String.format("%s %s %s%n", program, level, message);
+        final byte[] bytes = logs.getBytes(StandardCharsets.UTF_8);
+
+        final String matchingExpression = "%{PROGRAM:program} %{LOGLEVEL:level} %{GREEDYDATA:message}";
+
+        final GrokReader grokReader = new GrokReader();
+        runner.addControllerService(GrokReader.class.getSimpleName(), grokReader);
+        runner.setProperty(grokReader, GrokReader.GROK_PATTERNS, "PROGRAM [a-zA-Z]+");
+        runner.setProperty(grokReader, GrokReader.GROK_EXPRESSION, matchingExpression);
+        runner.setProperty(grokReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, GrokReader.STRING_FIELDS_FROM_GROK_EXPRESSION);
+        runner.enableControllerService(grokReader);
+
+        final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+        final RecordReader recordReader = grokReader.createRecordReader(Collections.emptyMap(), inputStream, bytes.length, runner.getLogger());
+
+        final Record firstRecord = recordReader.nextRecord();
+        assertNotNull(firstRecord);
+        assertEquals(program, firstRecord.getValue(PROGRAM_FIELD));
+        assertEquals(level, firstRecord.getValue(LEVEL_FIELD));
+        assertEquals(message, firstRecord.getValue(MESSAGE_FIELD));
+
+        assertNull(recordReader.nextRecord());
+    }
 }