You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/01/28 18:43:44 UTC

[nifi] branch main updated: NIFI-9341 Added CEF RecordReader

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 49978cd  NIFI-9341 Added CEF RecordReader
49978cd is described below

commit 49978cdd9167d12fd235888a17b45d5a5a2ee42a
Author: Bence Simon <si...@gmail.com>
AuthorDate: Wed Nov 17 17:45:12 2021 +0100

    NIFI-9341 Added CEF RecordReader
    
    This closes #5530
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../nifi/processors/standard/TestParseCEF.java     |   5 +-
 .../nifi-record-serialization-services/pom.xml     |  37 ++
 .../nifi/cef/CEFCustomExtensionTypeResolver.java   |  38 ++
 .../main/java/org/apache/nifi/cef/CEFReader.java   | 241 ++++++++++
 .../java/org/apache/nifi/cef/CEFRecordReader.java  | 215 +++++++++
 .../java/org/apache/nifi/cef/CEFRecordSource.java  |  71 +++
 .../org/apache/nifi/cef/CEFSchemaInference.java    | 119 +++++
 .../apache/nifi/cef/CEFSchemaInferenceBuilder.java |  51 +++
 .../java/org/apache/nifi/cef/CEFSchemaUtil.java    | 100 +++++
 .../java/org/apache/nifi/cef/ValidateLocale.java   |  49 ++
 .../org/apache/nifi/csv/CSVSchemaInference.java    |  55 +--
 .../org/apache/nifi/util/SchemaInferenceUtil.java  |  86 ++++
 .../nifi/xml/inference/XmlSchemaInference.java     |  47 +-
 .../org.apache.nifi.controller.ControllerService   |   2 +
 .../additionalDetails.html                         | 199 +++++++++
 .../java/org/apache/nifi/cef/TestCEFReader.java    | 335 ++++++++++++++
 .../org/apache/nifi/cef/TestCEFRecordReader.java   | 497 +++++++++++++++++++++
 .../apache/nifi/cef/TestCEFSchemaInference.java    | 286 ++++++++++++
 .../test/java/org/apache/nifi/cef/TestCEFUtil.java | 128 ++++++
 .../apache/nifi/util/TestSchemaInferenceUtil.java  |  93 ++++
 .../src/test/resources/cef/empty-row.txt           |   0
 .../src/test/resources/cef/misformatted-row.txt    |   3 +
 ...ltiple-rows-decreasing-number-of-extensions.txt |   2 +
 ...ltiple-rows-increasing-number-of-extensions.txt |   2 +
 .../cef/multiple-rows-starting-with-empty-row.txt  |   8 +
 .../multiple-rows-with-different-custom-types.txt  |   2 +
 .../cef/multiple-rows-with-empty-rows.txt          |   4 +
 .../src/test/resources/cef/multiple-rows.txt       |   3 +
 .../cef/single-row-header-fields-only.txt          |   1 +
 .../cef/single-row-with-custom-extensions.txt      |   1 +
 .../single-row-with-empty-custom-extensions.txt    |   1 +
 .../cef/single-row-with-empty-extension.txt        |   1 +
 .../resources/cef/single-row-with-extensions.txt   |   1 +
 ...single-row-with-incorrect-custom-extensions.txt |   1 +
 .../cef/single-row-with-incorrect-header-field.txt |   1 +
 35 files changed, 2586 insertions(+), 99 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseCEF.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseCEF.java
index 9ec2e87..3e1945d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseCEF.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseCEF.java
@@ -346,7 +346,7 @@ public class TestParseCEF {
     @Test
     public void testAcceptEmptyExtensions() throws Exception {
         String sample3 = "CEF:0|TestVendor|TestProduct|TestVersion|TestEventClassID|TestName|Low|" +
-                "rt=Feb 09 2015 00:27:43 UTC cn3Label=Test Long cn3= " +
+                "rt=Feb 09 2015 00:27:43 UTC cn3Label= cn3= " +
                 "cfp1=1.234 cfp1Label=Test FP Number smac=00:00:0c:07:ac:00 " +
                 "c6a3=2001:cdba::3257:9652 c6a3Label=Test IPv6 cs1Label=Test String cs1=test test test chocolate " +
                 "destinationTranslatedAddress=123.123.123.123 " +
@@ -372,6 +372,9 @@ public class TestParseCEF {
         JsonNode extensions = results.get("extension");
         Assert.assertTrue(extensions.has("cn3"));
         Assert.assertTrue(extensions.get("cn3").isNull());
+
+        Assert.assertTrue(extensions.has("cn3Label"));
+        Assert.assertTrue(extensions.get("cn3Label").asText().isEmpty());
     }
 
     @Test
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index ae733ef..d245bab 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -120,10 +120,30 @@
             <version>2.8.1</version>
         </dependency>
         <dependency>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
+            <version>2.0.1.Final</version>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>27.0.1-jre</version>
         </dependency>
+        <dependency>
+            <groupId>com.fluenda</groupId>
+            <artifactId>parcefone</artifactId>
+            <version>2.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.bval</groupId>
+            <artifactId>bval-jsr</artifactId>
+            <version>2.0.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
@@ -143,6 +163,23 @@
                         <exclude>src/test/resources/avro/multiple-types.avsc</exclude>
                         <exclude>src/test/resources/avro/simple.avsc</exclude>
                         <exclude>src/test/resources/avro/recursive.avsc</exclude>
+
+                        <exclude>src/test/resources/cef/empty-row.txt</exclude>
+                        <exclude>src/test/resources/cef/misformatted-row.txt</exclude>
+                        <exclude>src/test/resources/cef/multiple-rows.txt</exclude>
+                        <exclude>src/test/resources/cef/multiple-rows-decreasing-number-of-extensions.txt</exclude>
+                        <exclude>src/test/resources/cef/multiple-rows-increasing-number-of-extensions.txt</exclude>
+                        <exclude>src/test/resources/cef/multiple-rows-starting-with-empty-row.txt</exclude>
+                        <exclude>src/test/resources/cef/multiple-rows-with-different-custom-types.txt</exclude>
+                        <exclude>src/test/resources/cef/multiple-rows-with-empty-rows.txt</exclude>
+                        <exclude>src/test/resources/cef/single-row-header-fields-only.txt</exclude>
+                        <exclude>src/test/resources/cef/single-row-with-custom-extensions.txt</exclude>
+                        <exclude>src/test/resources/cef/single-row-with-empty-extension.txt</exclude>
+                        <exclude>src/test/resources/cef/single-row-with-empty-custom-extensions.txt</exclude>
+                        <exclude>src/test/resources/cef/single-row-with-extensions.txt</exclude>
+                        <exclude>src/test/resources/cef/single-row-with-incorrect-custom-extensions.txt</exclude>
+                        <exclude>src/test/resources/cef/single-row-with-incorrect-header-field.txt</exclude>
+
                         <exclude>src/test/resources/csv/extra-white-space.csv</exclude>
                         <exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
                         <exclude>src/test/resources/csv/single-bank-account.csv</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFCustomExtensionTypeResolver.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFCustomExtensionTypeResolver.java
new file mode 100644
index 0000000..bc1bbe1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFCustomExtensionTypeResolver.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.SchemaInferenceUtil;
+
+/**
+ * Provides strategy for resolving data type for custom extensions.
+ */
+interface CEFCustomExtensionTypeResolver {
+
+    /**
+     * @param value String representation of the field value.
+     *
+     * @return The resolved data type matches the given value most, based on the implemented strategy.
+     */
+    DataType resolve(String value);
+
+    CEFCustomExtensionTypeResolver STRING_RESOLVER = value -> RecordFieldType.STRING.getDataType();
+    CEFCustomExtensionTypeResolver SIMPLE_RESOLVER = value -> SchemaInferenceUtil.getDataType(value);
+    CEFCustomExtensionTypeResolver SKIPPING_RESOLVER = value -> null;
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFReader.java
new file mode 100644
index 0000000..391201c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFReader.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.bval.jsr.ApacheValidationProvider;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import javax.validation.Validation;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
+
+@Tags({"cef", "record", "reader", "parser"})
+@CapabilityDescription("Parses CEF (Common Event Format) events, returning each row as a record. "
+    + "This reader allows for inferring a schema based on the first event in the FlowFile or providing an explicit schema for interpreting the values.")
+public final class CEFReader extends SchemaRegistryService implements RecordReaderFactory {
+
+    static final AllowableValue HEADERS_ONLY = new AllowableValue("headers-only", "Headers only", "Includes only CEF header fields into the inferred schema.");
+    static final AllowableValue HEADERS_AND_EXTENSIONS = new AllowableValue("headers-and-extensions", "Headers and extensions",
+            "Includes the CEF header and extension fields to the schema, but not the custom extensions.");
+    static final AllowableValue CUSTOM_EXTENSIONS_AS_STRINGS = new AllowableValue("custom-extensions-as-string", "With custom extensions as strings",
+            "Includes all fields into the inferred schema, involving custom extension fields as string values.");
+    static final AllowableValue CUSTOM_EXTENSIONS_INFERRED = new AllowableValue("custom-extensions-inferred", "With custom extensions inferred",
+            "Includes all fields into the inferred schema, involving custom extension fields with inferred data types. " +
+            "The inference works based on the values in the FlowFile. In some scenarios this might result unsatisfiable behaviour. " +
+            "In these cases it is suggested to use \"" + CUSTOM_EXTENSIONS_AS_STRINGS.getDisplayName() + "\" Inference Strategy or predefined schema.");
+
+    static final PropertyDescriptor INFERENCE_STRATEGY = new PropertyDescriptor.Builder()
+            .name("inference-strategy")
+            .displayName("Inference Strategy")
+            .description("Defines the set of fields should be included in the schema and the way the fields are being interpreted.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .allowableValues(HEADERS_ONLY, HEADERS_AND_EXTENSIONS, CUSTOM_EXTENSIONS_AS_STRINGS, CUSTOM_EXTENSIONS_INFERRED)
+            .defaultValue(CUSTOM_EXTENSIONS_INFERRED.getValue())
+            .dependsOn(SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA)
+            .build();
+
+    static final PropertyDescriptor RAW_FIELD = new PropertyDescriptor.Builder()
+            .name("raw-message-field")
+            .displayName("Raw Message Field")
+            .description("If set the raw message will be added to the record using the property value as field name. This is not the same as the \"rawEvent\" extension field!")
+            .addValidator(new ValidateRawField())
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INVALID_FIELD = new PropertyDescriptor.Builder()
+            .name("invalid-message-field")
+            .displayName("Invalid Field")
+            .description("Used when a line in the FlowFile cannot be parsed by the CEF parser. " +
+                    "If set, instead of failing to process the FlowFile, a record is being added with one field. " +
+                    "This record contains one field with the name specified by the property and the raw message as value.")
+            .addValidator(new ValidateRawField())
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor DATETIME_REPRESENTATION = new PropertyDescriptor.Builder()
+            .name("datetime-representation")
+            .displayName("DateTime Locale")
+            .description("The IETF BCP 47 representation of the Locale to be used when parsing date " +
+                    "fields with long or short month names (e.g. may <en-US> vs. mai. <fr-FR>. The default" +
+                    "value is generally safe. Only change if having issues parsing CEF messages")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(new ValidateLocale())
+            .defaultValue("en-US")
+            .build();
+
+    static final PropertyDescriptor ACCEPT_EMPTY_EXTENSIONS = new PropertyDescriptor.Builder()
+            .name("accept-empty-extensions")
+            .displayName("Accept empty extensions")
+            .description("If set to true, empty extensions will be accepted and will be associated to a null value.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .required(true)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
+    private final javax.validation.Validator validator = Validation.byProvider(ApacheValidationProvider.class).configure().buildValidatorFactory().getValidator();
+    private final CEFParser parser = new CEFParser(validator);
+
+    private volatile String rawMessageField;
+    private volatile String invalidField;
+    private volatile Locale parcefoneLocale;
+    private volatile boolean includeCustomExtensions;
+    private volatile boolean acceptEmptyExtensions;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(RAW_FIELD);
+        properties.add(INVALID_FIELD);
+        properties.add(DATETIME_REPRESENTATION);
+        properties.add(INFERENCE_STRATEGY);
+
+        properties.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(SCHEMA_CACHE)
+                .dependsOn(SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA)
+                .build());
+
+        properties.add(ACCEPT_EMPTY_EXTENSIONS);
+        return properties;
+    }
+
+    @Override
+    protected List<AllowableValue> getSchemaAccessStrategyValues() {
+        final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
+        allowableValues.add(SchemaInferenceUtil.INFER_SCHEMA);
+        return allowableValues;
+    }
+
+    @Override
+    protected AllowableValue getDefaultSchemaAccessStrategy() {
+        return SchemaInferenceUtil.INFER_SCHEMA;
+    }
+
+    @Override
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
+        if (strategy.equals(SchemaInferenceUtil.INFER_SCHEMA.getValue())) {
+            final String inferenceStrategy = context.getProperty(INFERENCE_STRATEGY).getValue();
+            final CEFSchemaInferenceBuilder builder = new CEFSchemaInferenceBuilder();
+
+            if (inferenceStrategy.equals(HEADERS_AND_EXTENSIONS.getValue())) {
+                builder.withExtensions();
+            } else if (inferenceStrategy.equals(CUSTOM_EXTENSIONS_AS_STRINGS.getValue())) {
+                builder.withCustomExtensions(CEFCustomExtensionTypeResolver.STRING_RESOLVER);
+            } else if (inferenceStrategy.equals(CUSTOM_EXTENSIONS_INFERRED.getValue())) {
+                builder.withCustomExtensions(CEFCustomExtensionTypeResolver.SIMPLE_RESOLVER);
+            }
+
+            if (rawMessageField != null) {
+                builder.withRawMessage(rawMessageField);
+            }
+
+            if (invalidField != null) {
+                builder.withInvalidField(invalidField);
+            }
+
+            final boolean failFast = invalidField == null || invalidField.isEmpty();
+            final CEFSchemaInference inference = builder.build();
+            return SchemaInferenceUtil.getSchemaAccessStrategy(
+                strategy,
+                context,
+                getLogger(),
+                (variables, in) -> new CEFRecordSource(in, parser, parcefoneLocale, acceptEmptyExtensions, failFast),
+                () -> inference,
+                () -> super.getSchemaAccessStrategy(strategy, schemaRegistry, context));
+        }
+
+        return super.getSchemaAccessStrategy(strategy, schemaRegistry, context);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        rawMessageField = context.getProperty(RAW_FIELD).evaluateAttributeExpressions().getValue();
+        invalidField = context.getProperty(INVALID_FIELD).evaluateAttributeExpressions().getValue();
+        parcefoneLocale = Locale.forLanguageTag(context.getProperty(DATETIME_REPRESENTATION).evaluateAttributeExpressions().getValue());
+
+        final String inferenceStrategy = context.getProperty(INFERENCE_STRATEGY).getValue();
+        final boolean inferenceNeedsCustomExtensions = !inferenceStrategy.equals(HEADERS_ONLY.getValue()) && !inferenceStrategy.equals(HEADERS_AND_EXTENSIONS.getValue());
+        final boolean isInferSchema =  context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue().equals(SchemaInferenceUtil.INFER_SCHEMA.getValue());
+
+        includeCustomExtensions = !isInferSchema || inferenceNeedsCustomExtensions;
+        acceptEmptyExtensions = context.getProperty(ACCEPT_EMPTY_EXTENSIONS).asBoolean();
+    }
+
+    @Override
+    public RecordReader createRecordReader(
+        final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger
+    ) throws MalformedRecordException, IOException, SchemaNotFoundException {
+        final RecordSchema schema = getSchema(variables, in, null);
+        return new CEFRecordReader(in, schema, parser, logger, parcefoneLocale, rawMessageField, invalidField, includeCustomExtensions, acceptEmptyExtensions);
+    }
+
+    private static class ValidateRawField implements Validator {
+        private final Set<String> headerFields = CEFSchemaUtil.getHeaderFields().stream().map(r -> r.getFieldName()).collect(Collectors.toSet());
+        private final Set<String> extensionFields = CEFSchemaUtil.getExtensionTypeMapping().keySet().stream().flatMap(fields -> fields.stream()).collect(Collectors.toSet());
+
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            if (headerFields.contains(input)) {
+                return new ValidationResult.Builder().subject(subject).input(input).valid(false)
+                        .explanation(input + " is one of the CEF header fields.").build();
+            }
+
+            if (extensionFields.contains(input)) {
+                return new ValidationResult.Builder().subject(subject).input(input).valid(false)
+                        .explanation(input + " is one of the CEF extension fields.").build();
+            }
+
+            // Field names are not part of the specified CEF field names are accepted, just like null or empty value
+            return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFRecordReader.java
new file mode 100644
index 0000000..10aa55b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFRecordReader.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.event.CEFHandlingException;
+import com.fluenda.parcefone.event.CommonEvent;
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.text.DateFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * CEF (Common Event Format) based implementation for {@code org.apache.nifi.serialization.RecordReader}. The implementation
+ * builds on top of the {@code com.fluenda.parcefone.parser.CEFParser} which is responsible to deal with the textual representation
+ * of the events. This implementation intends to fill the gap between the CEF parser's {@code com.fluenda.parcefone.event.CommonEvent}
+ * data objects and the NiFi's internal Record representation.
+ */
+final class CEFRecordReader implements RecordReader {
+    /**
+     * Currently these are not used but set the way it follows the expected CEF format.
+     */
+    private static final String DATE_FORMAT = "MMM dd yyyy";
+    private static final String TIME_FORMAT = "HH:mm:ss";
+    private static final String DATETIME_FORMAT = DATE_FORMAT + " " + TIME_FORMAT;
+
+    private static final Supplier<DateFormat> DATE_FORMAT_SUPPLIER = () -> DataTypeUtils.getDateFormat(DATE_FORMAT);
+    private static final Supplier<DateFormat> TIME_FORMAT_SUPPLIER = () -> DataTypeUtils.getDateFormat(TIME_FORMAT);
+    private static final Supplier<DateFormat> DATETIME_FORMAT_SUPPLIER = () -> DataTypeUtils.getDateFormat(DATETIME_FORMAT);
+
+    private final RecordSchema schema;
+    private final BufferedReader reader;
+    private final CEFParser parser;
+    private final ComponentLog logger;
+    private final Locale locale;
+    private final String rawMessageField;
+    private final String invalidField;
+
+    /**
+     * It would not cause any functional drawback to acquire the custom extensions all the time, but there are some cases
+     * (inferred schema when custom extension fields are not included) where we can be sure about that these fields are not
+     * required. For better performance, in these cases we do not work with these fields.
+     */
+    private final boolean includeCustomExtensions;
+    private final boolean acceptEmptyExtensions;
+
+    CEFRecordReader(
+        final InputStream inputStream,
+        final RecordSchema recordSchema,
+        final CEFParser parser,
+        final ComponentLog logger,
+        final Locale locale,
+        final String rawMessageField,
+        final String invalidField,
+        final boolean includeCustomExtensions,
+        final boolean acceptEmptyExtensions
+    ) {
+        this.reader = new BufferedReader(new InputStreamReader(inputStream));
+        this.schema = recordSchema;
+        this.parser = parser;
+        this.logger = logger;
+        this.locale = locale;
+        this.rawMessageField = rawMessageField;
+        this.invalidField = invalidField;
+        this.includeCustomExtensions = includeCustomExtensions;
+        this.acceptEmptyExtensions = acceptEmptyExtensions;
+    }
+
+    @Override
+    public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
+        final String line = nextLine();
+
+        if (line == null) {
+            return null;
+        }
+
+        final CommonEvent event = parser.parse(line, true, acceptEmptyExtensions, locale);
+
+        if (event == null) {
+            logger.debug("Event parsing resulted no event");
+
+            if (invalidField != null && !invalidField.isEmpty()) {
+                return new MapRecord(schema, Collections.singletonMap(invalidField, line));
+            } else {
+                throw new MalformedRecordException("The following line could not be parsed by the CEF parser: " + line);
+            }
+        }
+
+        final Map<String, Object> values = new HashMap<>();
+
+        try {
+            event.getHeader().entrySet().forEach(field -> values.put(field.getKey(), convertValue(field.getKey(), field.getValue(), coerceTypes)));
+            event.getExtension(true, includeCustomExtensions).entrySet().forEach(field -> values.put(field.getKey(), convertValue(field.getKey() ,field.getValue(), coerceTypes)));
+
+            for (final String fieldName : schema.getFieldNames()) {
+                if (!values.containsKey(fieldName)) {
+                    values.put(fieldName, null);
+                }
+            }
+
+        } catch (final CEFHandlingException e) {
+            throw new MalformedRecordException("Error during extracting information from CEF event", e);
+        }
+
+        if (rawMessageField != null) {
+            values.put(rawMessageField, line);
+        }
+
+        return new MapRecord(schema, values, true, dropUnknownFields);
+    }
+
+    private String nextLine() throws IOException {
+        String line;
+
+        while((line = reader.readLine()) != null) {
+            if (!line.isEmpty()) {
+                break;
+            }
+        }
+
+        return line;
+    }
+
+    private Object convertValue(final String fieldName, final Object fieldValue, final boolean coerceType) {
+        final DataType dataType = schema.getDataType(fieldName).orElse(RecordFieldType.STRING.getDataType());
+
+        return coerceType
+            ? convert(fieldValue, dataType, fieldName)
+            : convertSimpleIfPossible(fieldValue, dataType, fieldName);
+    }
+
+    private Object convert(final Object value, final DataType dataType, final String fieldName) {
+        return DataTypeUtils.convertType(prepareValue(value), dataType, DATE_FORMAT_SUPPLIER, TIME_FORMAT_SUPPLIER, DATETIME_FORMAT_SUPPLIER, fieldName);
+    }
+
+    private Object convertSimpleIfPossible(final Object value, final DataType dataType, final String fieldName) {
+        if (dataType == null || value == null) {
+            return value;
+        }
+
+        final Object preparedValue = prepareValue(value);
+
+        switch (dataType.getFieldType()) {
+            case STRING:
+                return preparedValue;
+            case BOOLEAN:
+            case INT:
+            case LONG:
+            case FLOAT:
+            case DOUBLE:
+            case DECIMAL:
+                if (DataTypeUtils.isCompatibleDataType(preparedValue, dataType)) {
+                    return DataTypeUtils.convertType(preparedValue, dataType, DATE_FORMAT_SUPPLIER, TIME_FORMAT_SUPPLIER, DATETIME_FORMAT_SUPPLIER, fieldName);
+                }
+                break;
+            case TIMESTAMP:
+                if (DataTypeUtils.isTimestampTypeCompatible(preparedValue, DATETIME_FORMAT)) {
+                    return DataTypeUtils.convertType(preparedValue, dataType, DATE_FORMAT_SUPPLIER, TIME_FORMAT_SUPPLIER, DATETIME_FORMAT_SUPPLIER, fieldName);
+                }
+                break;
+        }
+
+        return value;
+    }
+
+    private Object prepareValue(final Object value) {
+        // InetAddress event fields are mapped as string values internally. Inet4Address and Inet6Address fields are handled by this too.
+        if (value instanceof InetAddress) {
+            return ((InetAddress) value).getHostAddress();
+        }
+
+        return value;
+    }
+
+    @Override
+    public RecordSchema getSchema() throws MalformedRecordException {
+        return schema;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFRecordSource.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFRecordSource.java
new file mode 100644
index 0000000..505ad57
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFRecordSource.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.event.CommonEvent;
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.nifi.schema.inference.RecordSource;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Locale;
+
+final class CEFRecordSource implements RecordSource<CommonEvent> {
+    private final CEFParser parser;
+    private final BufferedReader reader;
+    private final Locale locale;
+    private final boolean acceptEmptyExtensions;
+    private final boolean failFast;
+
+    CEFRecordSource(final InputStream in, final CEFParser parser, final Locale locale, final boolean acceptEmptyExtensions, final boolean failFast) {
+        this.parser = parser;
+        this.reader = new BufferedReader(new InputStreamReader(in));
+        this.locale = locale;
+        this.acceptEmptyExtensions = acceptEmptyExtensions;
+        this.failFast = failFast;
+    }
+
+    @Override
+    public CommonEvent next() throws IOException {
+        final String line = nextLine();
+
+        if (line == null) {
+            return null;
+        }
+
+        final CommonEvent event = parser.parse(line, true, acceptEmptyExtensions, locale);
+
+        if (event == null && failFast) {
+            throw new IOException("Could not parse event");
+        }
+
+        return event;
+    }
+
+    private String nextLine() throws IOException {
+        String line;
+
+        while((line = reader.readLine()) != null) {
+            if (!line.isEmpty()) {
+                break;
+            }
+        }
+        return line;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFSchemaInference.java
new file mode 100644
index 0000000..a3348a6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFSchemaInference.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.event.CEFHandlingException;
+import com.fluenda.parcefone.event.CommonEvent;
+import org.apache.nifi.schema.inference.FieldTypeInference;
+import org.apache.nifi.schema.inference.RecordSource;
+import org.apache.nifi.schema.inference.SchemaInferenceEngine;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+final class CEFSchemaInference implements SchemaInferenceEngine<CommonEvent> {
+    private final boolean includeExtensions;
+    private final boolean includeCustomExtensions;
+    private final CEFCustomExtensionTypeResolver typeResolver;
+    private final String rawMessageField;
+    private final String invalidField;
+
+    CEFSchemaInference(
+            final boolean includeExtensions,
+            final boolean includeCustomExtensions,
+            final CEFCustomExtensionTypeResolver typeResolver,
+            final String rawMessageField,
+            final String invalidField) {
+        this.includeExtensions = includeExtensions;
+        this.includeCustomExtensions = includeCustomExtensions;
+        this.typeResolver = typeResolver;
+        this.rawMessageField = rawMessageField;
+        this.invalidField = invalidField;
+    }
+
+    @Override
+    public RecordSchema inferSchema(final RecordSource<CommonEvent> recordSource) throws IOException {
+        try {
+            CommonEvent event;
+
+            // Header fields are always part of the schema
+            final List<RecordField> fields = new ArrayList<>(CEFSchemaUtil.getHeaderFields());
+            final Set<String> extensionFields = new HashSet<>();
+            final Map<String, FieldTypeInference> customExtensionTypes = new LinkedHashMap<>();
+
+            // Even though we need to process this only when we want extensions, the records are being read in order to check if the are valid
+            while ((event = recordSource.next()) != null) {
+                if (includeExtensions) {
+                    for (final Map.Entry<String, Object> field : event.getExtension(true, includeCustomExtensions).entrySet()) {
+                        final Optional<RecordField> extensionField = getExtensionField(field.getKey());
+
+                        if (extensionField.isPresent()) {
+                            // We add extension field to the schema only at the first time
+                            if (!extensionFields.contains(field.getKey())) {
+                                extensionFields.add(field.getKey());
+                                fields.add(extensionField.get());
+                            }
+                        } else if (includeCustomExtensions) {
+                            // CommonEvent will always return custom extensions as String values
+                            final FieldTypeInference typeInference = customExtensionTypes.computeIfAbsent(field.getKey(), key -> new FieldTypeInference());
+                            typeInference.addPossibleDataType(typeResolver.resolve((String) field.getValue()));
+                        }
+                    }
+                }
+            }
+
+            final List<RecordField> customExtensionFields = new ArrayList<>(customExtensionTypes.size());
+            customExtensionTypes.forEach((fieldName, type) -> customExtensionFields.add(new RecordField(fieldName, type.toDataType(), true)));
+            fields.addAll(customExtensionFields);
+
+            if (rawMessageField != null) {
+                fields.add(new RecordField(rawMessageField, RecordFieldType.STRING.getDataType()));
+            }
+
+            if (invalidField != null) {
+                fields.add(new RecordField(invalidField, RecordFieldType.STRING.getDataType()));
+            }
+
+            return new SimpleRecordSchema(fields);
+        } catch (final CEFHandlingException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private Optional<RecordField> getExtensionField(final String fieldName) {
+        for (final Map.Entry<Set<String>, DataType> entry : CEFSchemaUtil.getExtensionTypeMapping().entrySet()) {
+            if (entry.getKey().contains(fieldName)) {
+                return Optional.of(new RecordField(fieldName, entry.getValue()));
+            }
+        }
+
+        return Optional.empty();
+    }
+
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFSchemaInferenceBuilder.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFSchemaInferenceBuilder.java
new file mode 100644
index 0000000..4502eab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFSchemaInferenceBuilder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+final class CEFSchemaInferenceBuilder {
+    private String rawMessageField = null;
+    private String invalidField = null;
+    private boolean includeExtensions = false;
+    private boolean includeCustomExtensions = false;
+    private CEFCustomExtensionTypeResolver dataTypeResolver = CEFCustomExtensionTypeResolver.SKIPPING_RESOLVER;
+
+    CEFSchemaInferenceBuilder withRawMessage(final String rawMessageField) {
+        this.rawMessageField = rawMessageField;
+        return this;
+    }
+
+    CEFSchemaInferenceBuilder withInvalidField(final String invalidField) {
+        this.invalidField = invalidField;
+        return this;
+    }
+
+    CEFSchemaInferenceBuilder withExtensions() {
+        this.includeExtensions = true;
+        return this;
+    }
+
+    CEFSchemaInferenceBuilder withCustomExtensions(final CEFCustomExtensionTypeResolver dataTypeResolver) {
+        withExtensions();
+        this.includeCustomExtensions = true;
+        this.dataTypeResolver = dataTypeResolver;
+        return this;
+    }
+
+    CEFSchemaInference build() {
+        return new CEFSchemaInference(includeExtensions, includeCustomExtensions, dataTypeResolver, rawMessageField, invalidField);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFSchemaUtil.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFSchemaUtil.java
new file mode 100644
index 0000000..5e5e406
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFSchemaUtil.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+final class CEFSchemaUtil {
+
+    private static final List<RecordField> HEADER_FIELDS = new ArrayList<>();
+
+    static {
+        // Reference states that severity might be represented by integer values (0-10) and string values (like High) too
+        HEADER_FIELDS.add(new RecordField("version", RecordFieldType.INT.getDataType()));
+        HEADER_FIELDS.add(new RecordField("deviceVendor", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("deviceProduct", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("deviceVersion", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("deviceEventClassId", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("severity", RecordFieldType.STRING.getDataType()));
+    }
+
+    // Fields known by the CEF Extension Dictionary in version 23
+    private static final Set<String> EXTENSIONS_STRING = new HashSet<>(Arrays.asList("act", "app", "c6a1Label", "c6a2Label", "c6a3Label",
+            "c6a4Label", "cfp1Label", "cfp2Label", "cfp3Label", "cfp4Label", "cn1Label", "cn2Label", "cn3Label", "cs1", "cs1Label",
+            "cs2", "cs2Label", "cs3", "cs3Label", "cs4", "cs4Label", "cs5", "cs5Label", "cs6", "cs6Label", "destinationDnsDomain",
+            "destinationServiceName", "destinationTranslatedAddress", "destinationTranslatedPort", "deviceCustomDate1Label", "deviceCustomDate2Label",
+            "deviceDnsDomain", "deviceExternalId", "deviceFacility", "deviceInboundInterface", "deviceNtDomain", "deviceOutboundInterface",
+            "deviceProcessName", "dhost", "dntdom", "dpriv", "dproc", "dtz", "duid", "duser", "dvchost", "externalId", "fileHash", "field", "filePath",
+            "filePermission", "fileType", "flexDate1Label", "flexNumber1Label", "flexNumber2Label", "flexString1", "flexString1Label",
+            "flexString2", "flexString2Label", "fname", "msg", "oldFileHash", "oldField", "oldFileName", "oldFilePath", "oldFilePermission",
+            "oldFileType", "outcome", "proto", "reason", "request", "requestClientApplication", "requestContext", "requestCookies", "requestMethod",
+            "shost", "sntdom", "sourceDnsDomain", "sourceServiceName", "spriv", "sproc", "suid", "suser", "agentDnsDomain", "agentNtDomain",
+            "agentTranslatedZoneExternalID", "agentTranslatedZoneURI", "ahost", "aid", "at", "atz", "av", "cat", "customerExternalID", "customerURI",
+            "destinationTranslatedZoneExternalID", "destinationTranslatedZoneURI", "destinationZoneExternalID", "destinationZoneURI",
+            "deviceTranslatedZoneExternalID", "deviceTranslatedZoneURI", "deviceZoneExternalID", "deviceZoneURI", "rawEvent", "sourceTranslatedZoneExternalID",
+            "sourceTranslatedZoneURI", "sourceZoneExternalID", "sourceZoneURI"));
+    private static final Set<String> EXTENSIONS_INTEGER = new HashSet<>(Arrays.asList("cnt", "deviceDirection", "dpid", "dpt", "dvcpid", "fsize", "in",
+            "oldFileSize", "out", "sourceTranslatedPort", "spid", "spt", "type"));
+    private static final Set<String> EXTENSIONS_LONG = new HashSet<>(Arrays.asList("cn1", "cn2", "cn3", "flexNumber1", "flexNumber2", "eventId"));
+    private static final Set<String> EXTENSIONS_FLOAT = new HashSet<>(Arrays.asList("cfp1", "cfp2", "cfp3", "cfp4"));
+    private static final Set<String> EXTENSIONS_DOUBLE = new HashSet<>(Arrays.asList("dlat", "dlong", "slat", "slong"));
+    private static final Set<String> EXTENSIONS_INET_ADDRESS = new HashSet<>(Arrays.asList("agt"));
+    private static final Set<String> EXTENSIONS_IPV4_ADDRESS = new HashSet<>(Arrays.asList("deviceTranslatedAddress", "dst", "dvc", "sourceTranslatedAddress", "src",
+            "agentTranslatedAddress"));
+    private static final Set<String> EXTENSIONS_IPV6_ADDRESS = new HashSet<>(Arrays.asList("c6a1", "c6a2", "c6a3", "c6a4"));
+    private static final Set<String> EXTENSIONS_MAC_ADDRESS = new HashSet<>(Arrays.asList("dmac", "dvcmac", "smac", "amac"));
+    private static final Set<String> EXTENSIONS_TIMESTAMP = new HashSet<>(Arrays.asList("deviceCustomDate1", "deviceCustomDate2", "end", "fileCreatedTime",
+            "fileModificationTime", "flexDate1", "oldFileCreateTime", "oldFileModificationTime", "rt", "start", "art"));
+
+    private static final Map<Set<String>, DataType> EXTENSION_TYPE_MAPPING = new HashMap<>();
+
+    static {
+        EXTENSION_TYPE_MAPPING.put(EXTENSIONS_STRING, RecordFieldType.STRING.getDataType());
+        EXTENSION_TYPE_MAPPING.put(EXTENSIONS_INTEGER, RecordFieldType.INT.getDataType());
+        EXTENSION_TYPE_MAPPING.put(EXTENSIONS_LONG, RecordFieldType.LONG.getDataType());
+        EXTENSION_TYPE_MAPPING.put(EXTENSIONS_FLOAT, RecordFieldType.FLOAT.getDataType());
+        EXTENSION_TYPE_MAPPING.put(EXTENSIONS_DOUBLE, RecordFieldType.DOUBLE.getDataType());
+        EXTENSION_TYPE_MAPPING.put(EXTENSIONS_INET_ADDRESS, RecordFieldType.STRING.getDataType());
+        EXTENSION_TYPE_MAPPING.put(EXTENSIONS_IPV4_ADDRESS, RecordFieldType.STRING.getDataType());
+        EXTENSION_TYPE_MAPPING.put(EXTENSIONS_IPV6_ADDRESS, RecordFieldType.STRING.getDataType());
+        EXTENSION_TYPE_MAPPING.put(EXTENSIONS_MAC_ADDRESS, RecordFieldType.STRING.getDataType());
+        EXTENSION_TYPE_MAPPING.put(EXTENSIONS_TIMESTAMP, RecordFieldType.TIMESTAMP.getDataType());
+    }
+
+    static List<RecordField> getHeaderFields() {
+        return HEADER_FIELDS;
+    }
+
+    static Map<Set<String>, DataType> getExtensionTypeMapping() {
+        return EXTENSION_TYPE_MAPPING;
+    }
+
+    private CEFSchemaUtil() {
+        // Util class, not to be instantiated
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/ValidateLocale.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/ValidateLocale.java
new file mode 100644
index 0000000..72549fa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/ValidateLocale.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+import java.util.Locale;
+
+/**
+ * This class is identical to {@code org.apache.nifi.processors.standard.ParseCEF.ValidateLocale}.
+ */
+public class ValidateLocale implements Validator {
+    @Override
+    public ValidationResult validate(String subject, String input, ValidationContext context) {
+        if (null == input || input.isEmpty()) {
+            return new ValidationResult.Builder().subject(subject).input(input).valid(false)
+                    .explanation(subject + " cannot be empty").build();
+        }
+        final Locale testLocale = Locale.forLanguageTag(input);
+        final Locale[] availableLocales = Locale.getAvailableLocales();
+
+        // Check if the provided Locale is valid by checking against the first value of the array (i.e. "null" locale)
+        if (availableLocales[0].equals(testLocale)) {
+            // Locale matches the "null" locale so it is treated as invalid
+            return new ValidationResult.Builder().subject(subject).input(input).valid(false)
+                    .explanation(input + " is not a valid locale format.").build();
+        } else {
+            return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
index 5d195f3..7da3b61 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVSchemaInference.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.csv;
 
 import org.apache.commons.csv.CSVRecord;
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.nifi.schema.inference.FieldTypeInference;
 import org.apache.nifi.schema.inference.RecordSource;
 import org.apache.nifi.schema.inference.SchemaInferenceEngine;
@@ -25,15 +24,14 @@ import org.apache.nifi.schema.inference.TimeValueInference;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.SchemaInferenceUtil;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecordAndFieldNames> {
 
@@ -43,7 +41,6 @@ public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecordAndFie
         this.timeValueInference = timeValueInference;
     }
 
-
     @Override
     public RecordSchema inferSchema(final RecordSource<CSVRecordAndFieldNames> recordSource) throws IOException {
         final Map<String, FieldTypeInference> typeMap = new LinkedHashMap<>();
@@ -67,7 +64,6 @@ public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecordAndFie
         return createSchema(typeMap);
     }
 
-
     private void inferSchema(final CSVRecordAndFieldNames recordAndFieldNames, final Map<String, FieldTypeInference> typeMap) {
         final CSVRecord csvRecord = recordAndFieldNames.getRecord();
         for (final String fieldName : recordAndFieldNames.getFieldNames()) {
@@ -78,7 +74,7 @@ public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecordAndFie
 
             final FieldTypeInference typeInference = typeMap.computeIfAbsent(fieldName, key -> new FieldTypeInference());
             final String trimmed = trim(value);
-            final DataType dataType = getDataType(trimmed);
+            final DataType dataType = SchemaInferenceUtil.getDataType(trimmed, timeValueInference);
             typeInference.addPossibleDataType(dataType);
         }
     }
@@ -87,56 +83,9 @@ public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecordAndFie
         return (value.length() > 1) && value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
     }
 
-
-    private DataType getDataType(final String value) {
-        if (value == null || value.isEmpty()) {
-            return null;
-        }
-
-        if (NumberUtils.isParsable(value)) {
-            if (value.contains(".")) {
-                try {
-                    final double doubleValue = Double.parseDouble(value);
-
-                    if (doubleValue == Double.POSITIVE_INFINITY || doubleValue == Double.NEGATIVE_INFINITY) {
-                        return RecordFieldType.DECIMAL.getDecimalDataType(value.length() - 1, value.length() - 1 - value.indexOf("."));
-                    }
-
-                    if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
-                        return RecordFieldType.DOUBLE.getDataType();
-                    }
-
-                    return RecordFieldType.FLOAT.getDataType();
-                } catch (final NumberFormatException nfe) {
-                    return RecordFieldType.STRING.getDataType();
-                }
-            }
-
-            try {
-                final long longValue = Long.parseLong(value);
-                if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE) {
-                    return RecordFieldType.LONG.getDataType();
-                }
-
-                return RecordFieldType.INT.getDataType();
-            } catch (final NumberFormatException nfe) {
-                return RecordFieldType.STRING.getDataType();
-            }
-        }
-
-        if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) {
-            return RecordFieldType.BOOLEAN.getDataType();
-        }
-
-        final Optional<DataType> timeDataType = timeValueInference.getDataType(value);
-        return timeDataType.orElse(RecordFieldType.STRING.getDataType());
-    }
-
-
     private RecordSchema createSchema(final Map<String, FieldTypeInference> inferences) {
         final List<RecordField> recordFields = new ArrayList<>(inferences.size());
         inferences.forEach((fieldName, type) -> recordFields.add(new RecordField(fieldName, type.toDataType(), true)));
         return new SimpleRecordSchema(recordFields);
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/util/SchemaInferenceUtil.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/util/SchemaInferenceUtil.java
new file mode 100644
index 0000000..0a8e08c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/util/SchemaInferenceUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.nifi.schema.inference.TimeValueInference;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.Optional;
+
+public final class SchemaInferenceUtil {
+    private SchemaInferenceUtil() {
+        // Not intended to be instantiated
+    }
+
+    public static DataType getDataType(final String value) {
+        return getDataType(value, Optional.empty());
+    }
+
+    public static DataType getDataType(final String value, final TimeValueInference timeValueInference) {
+        return getDataType(value, Optional.of(timeValueInference));
+    }
+
+    private static DataType getDataType(final String value, final Optional<TimeValueInference> timeValueInference) {
+        if (value == null || value.isEmpty()) {
+            return null;
+        }
+
+        if (NumberUtils.isParsable(value)) {
+            if (value.contains(".")) {
+                try {
+                    final double doubleValue = Double.parseDouble(value);
+
+                    if (doubleValue == Double.POSITIVE_INFINITY || doubleValue == Double.NEGATIVE_INFINITY) {
+                        return RecordFieldType.DECIMAL.getDecimalDataType(value.length() - 1, value.length() - 1 - value.indexOf("."));
+                    }
+
+                    if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
+                        return RecordFieldType.DOUBLE.getDataType();
+                    }
+
+                    return RecordFieldType.FLOAT.getDataType();
+                } catch (final NumberFormatException nfe) {
+                    return RecordFieldType.STRING.getDataType();
+                }
+            }
+
+            try {
+                final long longValue = Long.parseLong(value);
+                if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE) {
+                    return RecordFieldType.LONG.getDataType();
+                }
+
+                return RecordFieldType.INT.getDataType();
+            } catch (final NumberFormatException nfe) {
+                return RecordFieldType.STRING.getDataType();
+            }
+        }
+
+        if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) {
+            return RecordFieldType.BOOLEAN.getDataType();
+        }
+
+        if (timeValueInference.isPresent()) {
+            final Optional<DataType> timeDataType = timeValueInference.get().getDataType(value);
+            return timeDataType.orElse(RecordFieldType.STRING.getDataType());
+        } else {
+            return RecordFieldType.STRING.getDataType();
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java
index c4e2e13..72cb259 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/inference/XmlSchemaInference.java
@@ -16,15 +16,13 @@
  */
 package org.apache.nifi.xml.inference;
 
-import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.nifi.schema.inference.HierarchicalSchemaInference;
 import org.apache.nifi.schema.inference.TimeValueInference;
 import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.SchemaInferenceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Optional;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -49,50 +47,9 @@ public class XmlSchemaInference extends HierarchicalSchemaInference<XmlNode> {
     }
 
     public DataType inferTextualDataType(final String text) {
-        if (text == null || text.isEmpty()) {
-            return null;
-        }
-
-        if (NumberUtils.isParsable(text)) {
-            if (text.contains(".")) {
-                try {
-                    final double doubleValue = Double.parseDouble(text);
-
-                    if (doubleValue == Double.POSITIVE_INFINITY || doubleValue == Double.NEGATIVE_INFINITY) {
-                        return RecordFieldType.DECIMAL.getDecimalDataType(text.length() - 1, text.length() - 1 - text.indexOf("."));
-                    }
-
-                    if (doubleValue > Float.MAX_VALUE || doubleValue < Float.MIN_VALUE) {
-                        return RecordFieldType.DOUBLE.getDataType();
-                    }
-
-                    return RecordFieldType.FLOAT.getDataType();
-                } catch (final NumberFormatException nfe) {
-                    return RecordFieldType.STRING.getDataType();
-                }
-            }
-
-            try {
-                final long longValue = Long.parseLong(text);
-                if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE) {
-                    return RecordFieldType.LONG.getDataType();
-                }
-
-                return RecordFieldType.INT.getDataType();
-            } catch (final NumberFormatException nfe) {
-                return RecordFieldType.STRING.getDataType();
-            }
-        }
-
-        if (text.equalsIgnoreCase("true") || text.equalsIgnoreCase("false")) {
-            return RecordFieldType.BOOLEAN.getDataType();
-        }
-
-        final Optional<DataType> timeDataType = timeValueInference.getDataType(text);
-        return timeDataType.orElse(RecordFieldType.STRING.getDataType());
+        return SchemaInferenceUtil.getDataType(text, timeValueInference);
     }
 
-
     @Override
     protected boolean isObject(final XmlNode value) {
         return value.getNodeType() == XmlNodeType.CONTAINER;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 88745e3..ecc42bf 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -26,6 +26,8 @@ org.apache.nifi.lookup.ReaderLookup
 org.apache.nifi.csv.CSVReader
 org.apache.nifi.csv.CSVRecordSetWriter
 
+org.apache.nifi.cef.CEFReader
+
 org.apache.nifi.grok.GrokReader
 
 org.apache.nifi.text.FreeFormTextRecordSetWriter
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.cef.CEFReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.cef.CEFReader/additionalDetails.html
new file mode 100644
index 0000000..96bc9c7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.cef.CEFReader/additionalDetails.html
@@ -0,0 +1,199 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8"/>
+        <title>CEFReader</title>
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    </head>
+
+	<body>
+
+        <p>
+            The CEFReader Controller Service serves as a mean to read and interpret CEF messages. Contrary to the ParseCEF Processor the
+            CEFReader is intended to let the users access the whole range of record processing tools of NiFi, making it easy to work with a
+            wide amount of incoming CEF messages.
+        </p>
+
+        <p>
+            The reader supports CEF Version 23. The expected format and the extension fields known by the Extension Dictionary are defined by
+            the description of the ArcSight Common Event Format. The reader allows to work with Syslog prefixes and custom extensions. A couple
+            of CEF message examples the reader can work with:
+        </p>
+
+        <code><pre>
+CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|cn1Label=userid spt=46117 cn1=99999 cfp1=1.23  dst=127.0.0.1 c6a1=2345:0425:2CA1:0000:0000:0567:5673:23b5 dmac=00:0D:60:AF:1B:61 start=1479152665000 end=Jan 12 2017 12:23:45 dlat=456.789 loginsequence=123
+        </pre></code>
+
+        <h3>Raw message</h3>
+
+        <p>
+            It is possible to preserve the original message in the produced record. This comes in handy when the message contains a Syslog
+            prefix which is not part of the Record instance. In order to preserve the raw message, the "Raw Message Field" property must be set.
+            The reader will use the value of this property as field name and will add the raw message as custom extension field. The value of the
+            "Raw Message Field" must differ from the header fields and the extension fields known by the CEF Extension Dictionary. If the property is
+            empty, the raw message will not be added.
+        </p>
+
+        <p>
+            When using predefined schema, the field defined by the "Raw Message Field" must appear in it as a STRING record field. In case of the
+            schema is inferred, the field will be automatically added as an additional custom extension, regardless of the Inference Strategy.
+        </p>
+
+        <h2>Schemas and Type Coercion</h2>
+
+        <p>
+            When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the
+            configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in
+            the schema, that field is omitted from the Record. If the field is found in the schema, the data type of the received data
+            is compared against the data type specified in the schema. If the types match, the value of that field is used as-is. If the
+            schema indicates that the field should be of a different type, then the Controller Service will attempt to coerce the data
+            into the type specified by the schema. If the field cannot be coerced into the specified type, an Exception will be thrown.
+        </p>
+
+        <p>
+            The following rules apply when attempting to coerce a field value from one data type to another:
+        </p>
+
+        <ul>
+            <li>Any data type can be coerced into a String type.</li>
+            <li>Any numeric data type (Byte, Short, Int, Long, Float, Double) can be coerced into any other numeric data type.</li>
+            <li>Any numeric value can be coerced into a Date, Time, or Timestamp type, by assuming that the Long value is the number of
+                milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
+            <li>A String value can be coerced into a Date, Time, or Timestamp type, if its format matches the configured "Date Format," "Time Format,"
+                or "Timestamp Format."</li>
+            <li>A String value can be coerced into a numeric value if the value is of the appropriate type. For example, the String value
+                <code>8</code> can be coerced into any numeric type. However, the String value <code>8.2</code> can be coerced into a Double or Float
+                type but not an Integer.</li>
+            <li>A String value of "true" or "false" (regardless of case) can be coerced into a Boolean value.</li>
+            <li>A String value that is not empty can be coerced into a Char type. If the String contains more than 1 character, the first character is used
+                and the rest of the characters are ignored.</li>
+            <li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
+            <li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
+            <li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
+                property (Date Format, Time Format, Timestamp Format property).</li>
+        </ul>
+
+        <p>
+            If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception
+            will be thrown.
+        </p>
+
+        <h2>Schema inference</h2>
+
+        <p>
+            While NiFi's Record API does require that each Record have a schema, it is often convenient to infer the schema based on the values in the data,
+            rather than having to manually create a schema. This is accomplished by selecting a value of "Infer Schema" for the "Schema Access Strategy" property.
+            When using this strategy, the Reader will determine the schema by first parsing all data in the FlowFile, keeping track of all fields that it has encountered
+            and the type of each field. Once all data has been parsed, a schema is formed that encompasses all fields that have been encountered.
+        </p>
+
+        <p>
+            A common concern when inferring schemas is how to handle the condition of two values that have different types. For example, a custom extension field might
+            have a Float value in one record and String in an other. In these cases, the inferred will contain a CHOICE data type with FLOAT and STRING options. Records will
+            be allowed to have either value for the particular field.
+        </p>
+
+        <p>
+            CEF format comes with specification not only to the message format but also has directives for the content. Because of this, the data type of some
+            fields are not determined by the actual value(s) in the FlowFile but by the CEF format. This includes header fields, which always have to appear and
+            comply to the data types defined in the CEF format. Also, extension fields from the Extension Dictionary might or might not appear in the generated
+            schema based on the FlowFile content but in case an extension field is added it's data type is bound by the CEF format. Custom extensions have no similar
+            restrictions, their presence in the schema is completely depending on the FlowFile content.
+        </p>
+
+        <p>
+            Schema inference in CEFReader supports multiple possible strategies for convenience. These strategies determine which fields should be included to the schema
+            from the incoming CEF messages. With this, one might filter out custom extensions for example. It is important to mention that this will have serious effect on
+            every further steps in the record procession. For example using an Inference Strategy which omits fields together with ConvertRecord Processor will result Records
+            with only the part of the original fields.
+        </p>
+
+        <h3>Headers only</h3>
+
+        <p>
+            Using this strategy will result a schema which contains only the header fields from the incoming message. All other fields (standard of custom
+            extensions) will be ignored. The type of these fields are defined by the CEF format and regardless of the content of the message used as a template,
+            their data type is also defined by the format.
+        </p>
+
+        <h3>Headers and extensions</h3>
+
+        <p>
+            Additionally to the header fields, this strategy will include standard extensions from the messages in the FlowFile. This means, not all
+            standard extensions will be part of the outgoing Record but the ones the Schema Inference found in the incoming messages. The data type of these Record
+            fields are determined by the CEF format, ignoring the actual value in the observed field.
+        </p>
+
+        <h3>With custom extensions inferred</h3>
+
+        <p>
+            While the type of the header and standard extension fields are bound by the CEF format, it is possible to add further fields to the message called
+            "custom extensions". These fields are not part of the "Extension Dictionary", thus their data type is not predefined. Using "With custom extensions inferred"
+            Inference Strategy, the CEFReader tries to determine the possible data type for these custom extension fields based on their value.
+        </p>
+
+        <h3>With custom extensions as strings</h3>
+
+        <p>
+            In some cases it is undesirable to let the Reader determine the type of the custom extensions. For convenience CEFReader provides an Inference Strategy which
+            regardless of their value, consider custom extension fields as String data. Otherwise this strategy behaves like the "With custom extensions inferred".
+        </p>
+
+        <h2>Caching of Inferred Schemas</h2>
+
+        <p>
+            This Record Reader requires that if a schema is to be inferred, that all records be read in order to ensure that the schema that gets inferred is applicable for all
+            records in the FlowFile. However, this can become expensive, especially if the data undergoes many different transformations. To alleviate the cost of inferring schemas,
+            the Record Reader can be configured with a "Schema Inference Cache" by populating the property with that name. This is a Controller Service that can be shared by Record
+            Readers and Record Writers.
+        </p>
+
+        <p>
+            Whenever a Record Writer is used to write data, if it is configured with a "Schema Cache," it will also add the schema to the Schema Cache. This will result in an
+            identifier for that schema being added as an attribute to the FlowFile.
+        </p>
+
+        <p>
+            Whenever a Record Reader is used to read data, if it is configured with a "Schema Inference Cache", it will first look for a "schema.cache.identifier" attribute on the FlowFile.
+            If the attribute exists, it will use the value of that attribute to lookup the schema in the schema cache. If it is able to find a schema in the cache with that identifier,
+            then it will use that schema instead of reading, parsing, and analyzing the data to infer the schema. If the attribute is not available on the FlowFile, or if the attribute is
+            available but the cache does not have a schema with that identifier, then the Record Reader will proceed to infer the schema as described above.
+        </p>
+
+        <p>
+            The end result is that users are able to chain together many different Processors to operate on Record-oriented data. Typically, only the first such Processor in the chain will
+            incur the "penalty" of inferring the schema. For all other Processors in the chain, the Record Reader is able to simply lookup the schema in the Schema Cache by identifier.
+            This allows the Record Reader to infer a schema accurately, since it is inferred based on all data in the FlowFile, and still allows this to happen efficiently since the schema
+            will typically only be inferred once, regardless of how many Processors handle the data.
+        </p>
+
+        <h2>Handling invalid events</h2>
+
+        <p>
+            An event is considered invalid if it is malformed in a way that the underlying CEF parser cannot read it properly. CEFReader has two ways to deal with malformed events determined by
+            the usage of the property "Invalid Field". If the property is not set, the reading will fail at the time of reading the first invalid event. If the property is set, a product of the read
+            will be a record with single field. The field is named based on the property and the value of the field will be the original event text. By default, the "Invalid Field" property is not set.
+        </p>
+
+        <p>
+            When the "Invalid Field" property is set, the read records might contain both records representing well formed CEF events and malformed ones as well. As of this, further steps might be needed
+            in order to separate these before further processing.
+        </p>
+	</body>
+</html>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFReader.java
new file mode 100644
index 0000000..7a1a5b4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFReader.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestCEFReader {
+    private TestRunner runner;
+    private TestCEFProcessor processor;
+    private CEFReader reader;
+
+    @BeforeEach
+    public void setUp() {
+        runner = null;
+        processor = null;
+        reader = null;
+    }
+
+    @Test
+    public void testValidatingReaderWhenRawFieldValueIsInvalid() throws Exception {
+        setUpReader();
+        setRawField("dst"); // Invalid because there is an extension with the same name
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+
+        assertReaderIsInvalid();
+    }
+
+    @Test
+    public void testReadingSingleRowWithHeaderFields() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleRowWithHeaderFieldsAndRaw() throws Exception {
+        setUpReader();
+        setRawField(TestCEFUtil.RAW_FIELD);
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, Collections.singletonMap(TestCEFUtil.RAW_FIELD, TestCEFUtil.RAW_VALUE));
+    }
+
+    @Test
+    public void testReadingSingleRowWithExtensionFieldsWhenSchemaIsHeadersOnly() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleRowWithCustomExtensionFieldsAsStrings() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_AS_STRINGS);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES, Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, "123"));
+    }
+
+    @Test
+    public void testMisformattedRowsWithoutInvalidFieldIsBeingSet() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_INFERRED);
+        enableReader();
+
+        triggerProcessorWithError(TestCEFUtil.INPUT_MISFORMATTED_ROW);
+    }
+
+    @Test
+    public void testMisformattedRowsWithInvalidFieldIsSet() throws Exception {
+        setUpReader();
+        setInvalidField();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_INFERRED);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_MISFORMATTED_ROW);
+
+        assertNumberOfResults(3);
+        assertFieldIsSet(1, "invalid", "Oct 12 04:16:11 localhost CEF:0|nxlog.org|nxlog|2.7.1243|");
+    }
+
+    @Test
+    public void testReadingSingleRowWithCustomExtensionFields() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_INFERRED);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES, Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, 123));
+    }
+
+    @Test
+    public void testReadingMultipleRows() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_MULTIPLE_IDENTICAL_ROWS);
+
+        assertNumberOfResults(3);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingMultipleRowsWithEmptyRows() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_MULTIPLE_ROWS_WITH_EMPTY_ROWS);
+
+        assertNumberOfResults(3);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingMultipleRowsStartingWithEmptyRow() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_MULTIPLE_ROWS_STARTING_WITH_EMPTY_ROW);
+
+        assertNumberOfResults(3);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testWithPredefinedSchema() throws Exception {
+        setUpReader();
+        setSchema(CEFSchemaUtil.getHeaderFields());
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleRowWithEmptyExtensionFields() throws Exception {
+        setUpReader();
+        setAcceptEmptyExtensions();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_INFERRED);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EMPTY_CUSTOM_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES, Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, ""));
+    }
+
+    private void setUpReader() throws InitializationException {
+        processor = new TestCEFProcessor();
+        runner = TestRunners.newTestRunner(processor);
+        reader = new CEFReader();
+        runner.addControllerService("reader", reader);
+        runner.setProperty(TestCEFProcessor.READER, "reader");
+    }
+
+    private void setAcceptEmptyExtensions() {
+        runner.setProperty(reader, CEFReader.ACCEPT_EMPTY_EXTENSIONS, "true");
+    }
+
+    private void setSchemaIsInferred(final AllowableValue value) {
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
+        runner.setProperty(reader, CEFReader.INFERENCE_STRATEGY, value);
+    }
+
+    private void setInvalidField() {
+        runner.setProperty(reader, CEFReader.INVALID_FIELD, "invalid");
+    }
+
+    private void setSchema(List<RecordField> fields) throws InitializationException {
+        final MockSchemaRegistry registry = new MockSchemaRegistry();
+        registry.addSchema("predefinedSchema", new SimpleRecordSchema(fields));
+        runner.addControllerService("registry", registry);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_NAME, "predefinedSchema");
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
+        runner.enableControllerService(registry);
+    }
+
+    private void setRawField(final String value) {
+        runner.setProperty(reader, CEFReader.RAW_FIELD, value);
+    }
+
+    private void enableReader() {
+        runner.assertValid(reader);
+        runner.enableControllerService(reader);
+    }
+
+    private void triggerProcessor(final String input) throws FileNotFoundException {
+        runner.enqueue(new FileInputStream(input));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TestCEFProcessor.SUCCESS);
+    }
+
+    private void triggerProcessorWithError(final String input) throws FileNotFoundException {
+        runner.enqueue(new FileInputStream(input));
+        final AssertionError exception = Assertions.assertThrows(AssertionError.class, () -> runner.run());
+        Assertions.assertTrue(exception.getCause() instanceof RuntimeException);
+        Assertions.assertTrue(exception.getCause().getCause() instanceof IOException);
+    }
+
+    private void assertNumberOfResults(final int numberOfResults) {
+        Assertions.assertEquals(numberOfResults, processor.getRecords().size());
+    }
+
+    private void assertReaderIsInvalid() {
+        runner.assertNotValid(reader);
+    }
+
+    private void assertFieldIsSet(final int number, final String name, final String value) {
+        Assertions.assertEquals(value, processor.getRecords().get(number).getValue(name));
+    }
+
+    @SafeVarargs
+    private void assertFieldsAre(final Map<String, Object>... fieldGroups) {
+        final Map<String, Object> expectedFields = new HashMap<>();
+        Arrays.stream(fieldGroups).forEach(fieldGroup -> expectedFields.putAll(fieldGroup));
+        processor.getRecords().forEach(r -> TestCEFUtil.assertFieldsAre(r, expectedFields));
+    }
+
+    private static class TestCEFProcessor extends AbstractProcessor {
+        private final List<Record> records = new ArrayList<>();
+
+        static final Relationship SUCCESS = new Relationship.Builder().name("success").build();
+
+        static final PropertyDescriptor READER = new PropertyDescriptor.Builder()
+                .name("reader")
+                .identifiesControllerService(CEFReader.class)
+                .build();
+
+        @Override
+        public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+            final FlowFile flowFile = session.get();
+            final RecordReaderFactory readerFactory = context.getProperty(READER).asControllerService(RecordReaderFactory.class);
+
+            try (
+                final InputStream in = session.read(flowFile);
+                final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())
+            ) {
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    records.add(record);
+                }
+            } catch (final Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            session.transfer(flowFile, SUCCESS);
+        }
+
+        @Override
+        public Set<Relationship> getRelationships() {
+            return Collections.singleton(SUCCESS);
+        }
+
+        @Override
+        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+            return Collections.singletonList(READER);
+        }
+
+        public List<Record> getRecords() {
+            return records;
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFRecordReader.java
new file mode 100644
index 0000000..b236222
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFRecordReader.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.bval.jsr.ApacheValidationProvider;
+import org.apache.nifi.mock.MockComponentLogger;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import javax.validation.Validation;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+public class TestCEFRecordReader {
+    private final javax.validation.Validator validator = Validation.byProvider(ApacheValidationProvider.class).configure().buildValidatorFactory().getValidator();
+    private final CEFParser parser = new CEFParser(validator);
+
+    private RecordSchema schema;
+    private InputStream inputStream;
+    private CEFRecordReader testSubject;
+    private List<Record> results;
+    private boolean includeCustomExtensions;
+    private boolean acceptEmptyExtensions;
+    private boolean dropUnknownFields;
+    private String rawField;
+    private String invalidField;
+
+    @BeforeEach
+    public void setUp() {
+        schema = null;
+        inputStream = null;
+        testSubject = null;
+        results = null;
+        includeCustomExtensions = false;
+        acceptEmptyExtensions = false;
+        dropUnknownFields = true;
+        rawField = null;
+        invalidField = null;
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        if (inputStream != null) {
+            inputStream.close();
+        }
+    }
+
+    @Test
+    public void testReadingSingleLineWithHeaderFieldsUsingSchemaWithHeaderFields() throws Exception {
+        setSchema(CEFSchemaUtil.getHeaderFields());
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleLineWithExtensionFieldsUsingSchemaWithHeaderFields() throws Exception {
+        setSchema(CEFSchemaUtil.getHeaderFields());
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleLineWithCustomExtensionFieldsUsingSchemaWithHeaderFields() throws Exception {
+        setSchema(CEFSchemaUtil.getHeaderFields());
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleLineWithHeaderFieldsUsingSchemaWithExtensionFields() throws Exception {
+        setSchema(TestCEFUtil.getFieldWithExtensions());
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, withNulls(TestCEFUtil.EXPECTED_EXTENSION_VALUES));
+    }
+
+    @Test
+    public void testReadingSingleLineWithExtensionFieldsUsingSchemaWithExtensionFields() throws Exception {
+        setSchema(TestCEFUtil.getFieldWithExtensions());
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleLineWithCustomExtensionFieldsUsingSchemaWithExtensionFields() throws Exception {
+        setSchema(TestCEFUtil.getFieldWithExtensions());
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleLineWithHeaderFieldsUsingSchemaWithCustomExtensionFieldsAsStrings() throws Exception {
+        setSchema(TestCEFUtil.getFieldsWithCustomExtensions(TestCEFUtil.CUSTOM_EXTENSION_FIELD_AS_STRING));
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, withNulls(TestCEFUtil.EXPECTED_EXTENSION_VALUES), Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, null));
+    }
+
+    @Test
+    public void testReadingSingleLineWithExtensionFieldsUsingSchemaWithCustomExtensionFieldsAsStrings() throws Exception {
+        setSchema(TestCEFUtil.getFieldsWithCustomExtensions(TestCEFUtil.CUSTOM_EXTENSION_FIELD_AS_STRING));
+        setIncludeCustomExtensions();
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES, Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, null));
+    }
+
+    @Test
+    public void testReadingSingleLineWithCustomExtensionFieldsUsingSchemaWithCustomExtensionFieldsAsStrings() throws Exception {
+        setSchema(TestCEFUtil.getFieldsWithCustomExtensions(TestCEFUtil.CUSTOM_EXTENSION_FIELD_AS_STRING));
+        setIncludeCustomExtensions();
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES, Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, "123"));
+    }
+
+    @Test
+    public void testReadingSingleLineWithHeaderFieldsUsingSchemaWithCustomExtensionFields() throws Exception {
+        setSchema(TestCEFUtil.getFieldsWithCustomExtensions(TestCEFUtil.CUSTOM_EXTENSION_FIELD));
+        setIncludeCustomExtensions();
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, withNulls(TestCEFUtil.EXPECTED_EXTENSION_VALUES), Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, null));
+    }
+
+    @Test
+    public void testReadingSingleLineWithExtensionFieldsUsingSchemaWithCustomExtensionFields() throws Exception {
+        setSchema(TestCEFUtil.getFieldsWithCustomExtensions(TestCEFUtil.CUSTOM_EXTENSION_FIELD));
+        setIncludeCustomExtensions();
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES, Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, null));
+    }
+
+    @Test
+    public void testReadingSingleLineWithCustomExtensionFieldsUsingSchemaWithCustomExtensionFields() throws Exception {
+        setSchema(TestCEFUtil.getFieldsWithCustomExtensions(TestCEFUtil.CUSTOM_EXTENSION_FIELD));
+        setIncludeCustomExtensions();
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES, Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, 123));
+    }
+
+    @Test
+    public void testReadingContainingRawEvent() throws Exception {
+        setSchema(getFieldsWithHeaderAndRaw());
+        setRawMessageField(TestCEFUtil.RAW_FIELD);
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+
+        readRecords();
+
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, Collections.singletonMap(TestCEFUtil.RAW_FIELD, TestCEFUtil.RAW_VALUE));
+    }
+
+    @Test
+    public void testSingleLineWithHeaderFieldsAndNotDroppingUnknownFields() throws Exception {
+        setKeepUnknownFields();
+        setSchema(getFieldsWithExtensionsExpect("c6a1", "dmac"));
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+
+        readRecords();
+
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, without(TestCEFUtil.EXPECTED_EXTENSION_VALUES, "c6a1", "dmac"));
+        assertFieldIs("c6a1", TestCEFUtil.EXPECTED_EXTENSION_VALUES.get("c6a1"));
+        assertFieldIs("dmac", TestCEFUtil.EXPECTED_EXTENSION_VALUES.get("dmac"));
+    }
+
+    @Test
+    public void testReadingEmptyRow()  throws Exception {
+        setSchema(CEFSchemaUtil.getHeaderFields());
+        setReader(TestCEFUtil.INPUT_EMPTY_ROW);
+
+        readRecords();
+
+        assertNumberOfResults(0);
+    }
+
+    @Test
+    public void testReadingMisformattedRow()  throws Exception {
+        setSchema(CEFSchemaUtil.getHeaderFields());
+        setReader(TestCEFUtil.INPUT_MISFORMATTED_ROW);
+
+        Assertions.assertThrows(MalformedRecordException.class, this::readRecords);
+    }
+
+    @Test
+    public void testReadingMisformattedRowWhenInvalidFieldIsSet()  throws Exception {
+        setInvalidField();
+        setSchema(CEFSchemaUtil.getHeaderFields());
+        setReader(TestCEFUtil.INPUT_MISFORMATTED_ROW);
+
+        readRecords();
+
+        assertNumberOfResults(3);
+    }
+
+    @Test
+    public void testReadingMultipleIdenticalRows() throws Exception {
+        setSchema(CEFSchemaUtil.getHeaderFields());
+        setReader(TestCEFUtil.INPUT_MULTIPLE_IDENTICAL_ROWS);
+
+        readRecords();
+
+        assertNumberOfResults(3);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingMultipleIdenticalRowsWithEmptyOnes() throws Exception {
+        setSchema(CEFSchemaUtil.getHeaderFields());
+        setReader(TestCEFUtil.INPUT_MULTIPLE_ROWS_WITH_EMPTY_ROWS);
+
+        readRecords(5);
+
+        assertNumberOfResults(3);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingMultipleRowsWithDecreasingNumberOfExtensionFields() throws Exception {
+        setSchema(TestCEFUtil.getFieldWithExtensions());
+        setReader(TestCEFUtil.INPUT_MULTIPLE_ROWS_WITH_DECREASING_NUMBER_OF_EXTENSIONS);
+
+        readRecords();
+
+        assertNumberOfResults(2);
+        assertFieldsAre(0, TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES);
+        assertFieldsAre(1, TestCEFUtil.EXPECTED_HEADER_VALUES, withNulls(TestCEFUtil.EXPECTED_EXTENSION_VALUES, "c6a1", "dmac"));
+    }
+
+    @Test
+    public void testReadingMultipleRowsWithIncreasingNumberOfExtensionFields() throws Exception {
+        setSchema(getFieldsWithExtensionsExpect("c6a1", "dmac"));
+        setReader(TestCEFUtil.INPUT_MULTIPLE_ROWS_WITH_INCREASING_NUMBER_OF_EXTENSIONS);
+
+        readRecords();
+
+        assertNumberOfResults(2);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, without(TestCEFUtil.EXPECTED_EXTENSION_VALUES, "c6a1", "dmac"));
+    }
+
+    @Test
+    public void testReadingIncorrectHeaderField() throws Exception {
+        setSchema(CEFSchemaUtil.getHeaderFields());
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_INCORRECT_HEADER_FIELD);
+
+        Assertions.assertThrows(NumberFormatException.class, () -> readRecords());
+    }
+
+    @Test
+    public void testReadingIncorrectCustomVariableFormat() throws Exception {
+        setSchema(TestCEFUtil.getFieldsWithCustomExtensions(TestCEFUtil.CUSTOM_EXTENSION_FIELD));
+        setIncludeCustomExtensions();
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_INCORRECT_CUSTOM_EXTENSIONS);
+
+        Assertions.assertThrows(NumberFormatException.class, () -> readRecords());
+    }
+
+    @Test
+    public void testReadingEmptyValuesWhenAcceptingEmptyExtensions() throws Exception {
+        setSchema(TestCEFUtil.getFieldWithExtensions());
+        setAcceptEmptyExtensions();
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EMPTY_EXTENSION);
+
+        readRecords();
+
+        assertNumberOfResults(1);
+        assertSchemaIsSet();
+
+        final Map<String, Object> expectedExtensionValues = new HashMap<>();
+        expectedExtensionValues.putAll(TestCEFUtil.EXPECTED_EXTENSION_VALUES);
+        expectedExtensionValues.put("cn1", null);
+        expectedExtensionValues.put("cn1Label", "");
+
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, expectedExtensionValues);
+    }
+
+    @Test
+    public void testReadingEmptyValuesWhenNotAcceptingEmptyExtensions() throws Exception {
+        setSchema(TestCEFUtil.getFieldWithExtensions());
+        setReader(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EMPTY_EXTENSION);
+
+        // As empty is not accepted, number type fields will not be parsed properly
+        Assertions.assertThrows(NumberFormatException.class, () -> readRecords());
+    }
+
+    private static List<RecordField> getFieldsWithHeaderAndRaw() {
+        final List<RecordField> result = new ArrayList<>(CEFSchemaUtil.getHeaderFields());
+        result.add(new RecordField(TestCEFUtil.RAW_FIELD, RecordFieldType.STRING.getDataType()));
+        return result;
+    }
+
+    private List<RecordField> getFieldsWithExtensionsExpect(final String... fieldsToExclude) {
+        final List<RecordField> result = new ArrayList<>();
+        final Set<String> excludedFields = new HashSet<>();
+        Arrays.stream(fieldsToExclude).forEach(f -> excludedFields.add(f));
+
+        for (final RecordField record : TestCEFUtil.getFieldWithExtensions()) {
+            if (!excludedFields.contains(record.getFieldName())) {
+                result.add(record);
+            }
+        }
+
+        return result;
+    }
+
+    private void setSchema(final List<RecordField> fields) {
+        this.schema = new SimpleRecordSchema(fields);
+    }
+
+    private void setIncludeCustomExtensions() {
+        includeCustomExtensions = true;
+    }
+
+    private void setAcceptEmptyExtensions() {
+        acceptEmptyExtensions = true;
+    }
+
+    private void setRawMessageField(final String rawField) {
+        this.rawField = rawField;
+    }
+
+    private void setKeepUnknownFields() {
+        dropUnknownFields = false;
+    }
+
+    private void setInvalidField() {
+        invalidField = "invalid";
+    }
+
+    private void setReader(final String file) throws FileNotFoundException {
+        inputStream = new FileInputStream(file);
+        testSubject = new CEFRecordReader(inputStream, schema, parser, new MockComponentLogger(), Locale.US, rawField, invalidField, includeCustomExtensions, acceptEmptyExtensions);
+    }
+
+    private void readRecords() throws IOException, MalformedRecordException {
+        final List<Record> results = new ArrayList<>();
+        Record result;
+
+        while ((result = testSubject.nextRecord(true, dropUnknownFields)) != null) {
+            results.add(result);
+        }
+
+        this.results = results;
+    }
+
+    private void readRecords(final int count) throws IOException, MalformedRecordException {
+        final List<Record> results = new ArrayList<>();
+
+        for (int i=1; i<=count; i++) {
+            final Record record = testSubject.nextRecord(true, dropUnknownFields);
+
+            if (record != null) {
+                results.add(record);
+            }
+        }
+
+        this.results = results;
+    }
+
+    @SafeVarargs
+    private void assertFieldsAre(final Map<String, Object>... fieldGroups) {
+        final Map<String, Object> expectedFields = new HashMap<>();
+        Arrays.stream(fieldGroups).forEach(fieldGroup -> expectedFields.putAll(fieldGroup));
+        results.forEach(r -> TestCEFUtil.assertFieldsAre(r, expectedFields));
+    }
+
+    private void assertFieldsAre(final int number, final Map<String, Object>... fieldGroups) {
+        final Map<String, Object> expectedFields = new HashMap<>();
+        Arrays.stream(fieldGroups).forEach(fieldGroup -> expectedFields.putAll(fieldGroup));
+        TestCEFUtil.assertFieldsAre(results.get(number), expectedFields);
+    }
+
+    private void assertFieldIs(final String field, final Object expectedValue) {
+        results.forEach(r -> Assertions.assertEquals(expectedValue, r.getValue(field)));
+    }
+
+    private void assertNumberOfResults(final int numberOfResults) {
+        Assertions.assertEquals(numberOfResults, results.size());
+    }
+
+    private void assertSchemaIsSet() {
+        results.forEach(r -> Assertions.assertEquals(schema, r.getSchema()));
+    }
+
+    private static Map<String, Object> withNulls(final Map<String, Object> expectedFields) {
+        final Map<String, Object> result = new HashMap<>();
+        expectedFields.keySet().forEach(f -> result.put(f, null));
+        return result;
+    }
+
+    private static Map<String, Object> withNulls(final Map<String, Object> expectedFields, final String... fieldsWithNullValue) {
+        final Map<String, Object> result = new HashMap<>(expectedFields);
+        Arrays.stream(fieldsWithNullValue).forEach(field -> result.put(field, null));
+        return result;
+    }
+
+    private static Map<String, Object> without(final Map<String, Object> expectedFields, final String... fieldsToRemove) {
+        final Map<String, Object> result = new HashMap<>();
+        final Set<String> fieldsToNotInclude = new HashSet<>(Arrays.asList(fieldsToRemove));
+
+        for (final Map.Entry<String, Object> field : expectedFields.entrySet()) {
+            if (!fieldsToNotInclude.contains(field.getKey())) {
+                result.put(field.getKey(), field.getValue());
+            }
+        }
+
+        return result;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFSchemaInference.java
new file mode 100644
index 0000000..2661e20
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFSchemaInference.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.event.CommonEvent;
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.bval.jsr.ApacheValidationProvider;
+import org.apache.nifi.schema.inference.RecordSource;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.validation.Validation;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class TestCEFSchemaInference {
+
+    private final javax.validation.Validator validator = Validation.byProvider(ApacheValidationProvider.class).configure().buildValidatorFactory().getValidator();
+    private final CEFParser parser = new CEFParser(validator);
+
+    private boolean includeExtensions;
+    private boolean includeCustomExtensions;
+    private String rawMessageField;
+    private CEFSchemaInference testSubject;
+    private RecordSource<CommonEvent> recordSource;
+    private CEFCustomExtensionTypeResolver typeResolver;
+    private RecordSchema result;
+
+    @BeforeEach
+    public void setUp() {
+        includeExtensions = false;
+        includeCustomExtensions = false;
+        rawMessageField = null;
+        testSubject = null;
+        recordSource = null;
+        result = null;
+    }
+
+    @Test
+    public void testInferBasedOnHeaderFields() throws Exception {
+        setRecordSource(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+        setUpTestSubject();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(CEFSchemaUtil.getHeaderFields());
+    }
+
+    @Test
+    public void testInferBasedOnHeaderFieldsWhenIncludingExtensions() throws Exception {
+        setIncludeExtensions();
+        setRecordSource(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+        setUpTestSubject();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(CEFSchemaUtil.getHeaderFields());
+    }
+
+    @Test
+    public void testInferBasedOnHeaderFieldsWithRaw() throws Exception {
+        setRawMessageField("raw");
+        setRecordSource(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+        setUpTestSubject();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(CEFSchemaUtil.getHeaderFields(), Collections.singletonList(new RecordField("raw", RecordFieldType.STRING.getDataType())));
+    }
+
+    @Test
+    public void testInferBasedOnHeaderAndExtensionFieldsWhenNotIncludingExtensions() throws Exception {
+        setRecordSource(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+        setUpTestSubject();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(CEFSchemaUtil.getHeaderFields());
+    }
+
+    @Test
+    public void testInferBasedOnHeaderAndExtensionFieldsWhenIncludingExtensions() throws Exception {
+        setIncludeExtensions();
+        setRecordSource(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+        setUpTestSubject();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(TestCEFUtil.getFieldWithExtensions());
+    }
+
+    @Test
+    public void testInferBasedOnRowWithCustomExtensionsButSkipping() throws Exception {
+        setIncludeExtensions();
+        setRecordSource(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+        setUpTestSubject();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(TestCEFUtil.getFieldWithExtensions());
+    }
+
+    @Test
+    public void testInferBasedOnRowWithCustomExtensionsAsString() throws Exception {
+        setIncludeExtensions();
+        setIncludeCustomExtensions(CEFCustomExtensionTypeResolver.STRING_RESOLVER);
+        setRecordSource(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+        setUpTestSubject();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(TestCEFUtil.getFieldsWithCustomExtensions(TestCEFUtil.CUSTOM_EXTENSION_FIELD_AS_STRING));
+    }
+
+    @Test
+    public void testInferBasedOnRowWithCustomExtensions() throws Exception {
+        setIncludeExtensions();
+        setIncludeCustomExtensions(CEFCustomExtensionTypeResolver.SIMPLE_RESOLVER);
+        setRecordSource(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+        setUpTestSubject();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(TestCEFUtil.getFieldsWithCustomExtensions(TestCEFUtil.CUSTOM_EXTENSION_FIELD));
+    }
+
+    @Test
+    public void testInferBasedOnRowWithEmptyExtensions() throws Exception {
+        setIncludeExtensions();
+        setRecordSource(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EMPTY_EXTENSION);
+        setUpTestSubject();
+
+        inferSchema();
+
+        // As extension types are defined by the extension dictionary, even with empty value the data type can be inferred
+        assertSchemaConsistsOf(TestCEFUtil.getFieldWithExtensions());
+    }
+
+    @Test
+    public void testInferBasedOnRowWithEmptyCustomExtensions() throws Exception {
+        setIncludeExtensions();
+        setIncludeCustomExtensions(CEFCustomExtensionTypeResolver.SIMPLE_RESOLVER);
+        setRecordSource(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EMPTY_CUSTOM_EXTENSIONS);
+        setUpTestSubject();
+
+        inferSchema();
+
+        // As there is no value provided to infer based on, the empty custom field will be considered as string
+        assertSchemaConsistsOf(TestCEFUtil.getFieldsWithCustomExtensions(TestCEFUtil.CUSTOM_EXTENSION_FIELD_AS_STRING));
+    }
+
+    @Test
+    public void testInferWhenStartingWithEmptyRow() throws Exception {
+        setRecordSource(TestCEFUtil.INPUT_MULTIPLE_ROWS_STARTING_WITH_EMPTY_ROW);
+        setUpTestSubject();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(CEFSchemaUtil.getHeaderFields());
+    }
+
+    @Test
+    public void testInferWithDifferentCustomFieldTypes() throws Exception {
+        setIncludeExtensions();
+        setIncludeCustomExtensions(CEFCustomExtensionTypeResolver.SIMPLE_RESOLVER);
+        setRecordSource(TestCEFUtil.INPUT_MULTIPLE_ROWS_WITH_DIFFERENT_CUSTOM_TYPES);
+        setUpTestSubject();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(TestCEFUtil.getFieldsWithCustomExtensions(TestCEFUtil.CUSTOM_EXTENSION_FIELD_AS_CHOICE));
+    }
+
+    @Test
+    public void testInferBasedOnEmptyRow() throws Exception{
+        setRecordSource(TestCEFUtil.INPUT_EMPTY_ROW);
+        setUpTestSubject();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(CEFSchemaUtil.getHeaderFields()); // For CEF it is always expected to have a header
+    }
+
+    @Test
+    public void testInferBasedOnMisformattedRow() throws Exception{
+        setRecordSource(TestCEFUtil.INPUT_MISFORMATTED_ROW);
+        setUpTestSubject();
+
+        Assertions.assertThrows(IOException.class, this::inferSchema);
+    }
+
+    @Test
+    public void testInferBasedOnMisformattedRowWhenNonFailFast() throws Exception{
+        setRecordSourceWhenNonFailFast(TestCEFUtil.INPUT_MISFORMATTED_ROW);
+        setUpTestSubject();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(CEFSchemaUtil.getHeaderFields());
+    }
+
+    @Test
+    public void testInferBasedOnHeaderFieldsWithInvalid() throws Exception {
+        setRecordSource(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+        setUpTestSubjectWithInvalidField();
+
+        inferSchema();
+
+        assertSchemaConsistsOf(CEFSchemaUtil.getHeaderFields(), Collections.singletonList(new RecordField("invalid", RecordFieldType.STRING.getDataType())));
+    }
+
+    private void setIncludeExtensions() {
+        this.includeExtensions = true;
+    }
+
+    private void setIncludeCustomExtensions(final CEFCustomExtensionTypeResolver typeResolver) {
+        this.includeCustomExtensions = true;
+        this.typeResolver = typeResolver;
+    }
+
+    private void setUpTestSubject() {
+        this.testSubject = new CEFSchemaInference(includeExtensions, includeCustomExtensions, typeResolver, rawMessageField, null);
+    }
+
+    private void setUpTestSubjectWithInvalidField() {
+        this.testSubject = new CEFSchemaInference(includeExtensions, includeCustomExtensions, typeResolver, rawMessageField, "invalid");
+    }
+
+    private void setRawMessageField(final String rawMessageField) {
+        this.rawMessageField = rawMessageField;
+    }
+
+    private void setRecordSource(final String inputFile) throws FileNotFoundException {
+        setRecordSource(inputFile, true);
+    }
+
+    private void setRecordSource(final String inputFile, final boolean failFast) throws FileNotFoundException {
+        final FileInputStream inputStream = new FileInputStream(inputFile);
+        this.recordSource = new CEFRecordSource(inputStream, parser, Locale.US, true, failFast);
+    }
+
+    private void setRecordSourceWhenNonFailFast(final String inputFile) throws FileNotFoundException {
+        setRecordSource(inputFile, false);
+    }
+
+    private void inferSchema() throws IOException {
+        result = testSubject.inferSchema(recordSource);
+    }
+
+    @SafeVarargs
+    private void assertSchemaConsistsOf(final List<RecordField>... expectedFieldGroups) {
+        final List<RecordField> expectedFields = Arrays.stream(expectedFieldGroups).flatMap(group -> group.stream()).collect(Collectors.toList());
+        Assertions.assertEquals(expectedFields.size(), result.getFieldCount());
+
+        for (final RecordField expectedField : expectedFields) {
+            final Optional<RecordField> field = result.getField(expectedField.getFieldName());
+            Assertions.assertTrue(field.isPresent());
+            Assertions.assertEquals(expectedField, field.get());
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFUtil.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFUtil.java
new file mode 100644
index 0000000..3138202
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFUtil.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.junit.Assert;
+
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class TestCEFUtil {
+    static final String RAW_FIELD = "raw";
+    static final String RAW_VALUE = "Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|";
+
+    static final String INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY = "src/test/resources/cef/single-row-header-fields-only.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EXTENSIONS = "src/test/resources/cef/single-row-with-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EMPTY_EXTENSION = "src/test/resources/cef/single-row-with-empty-extension.txt";
+    static final String INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS = "src/test/resources/cef/single-row-with-custom-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EMPTY_CUSTOM_EXTENSIONS = "src/test/resources/cef/single-row-with-empty-custom-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_INCORRECT_HEADER_FIELD = "src/test/resources/cef/single-row-with-incorrect-header-field.txt";
+    static final String INPUT_SINGLE_ROW_WITH_INCORRECT_CUSTOM_EXTENSIONS = "src/test/resources/cef/single-row-with-incorrect-custom-extensions.txt";
+    static final String INPUT_EMPTY_ROW = "src/test/resources/cef/empty-row.txt";
+    static final String INPUT_MISFORMATTED_ROW = "src/test/resources/cef/misformatted-row.txt";
+    static final String INPUT_MULTIPLE_IDENTICAL_ROWS = "src/test/resources/cef/multiple-rows.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_DIFFERENT_CUSTOM_TYPES = "src/test/resources/cef/multiple-rows-with-different-custom-types.txt";
+    static final String INPUT_MULTIPLE_ROWS_STARTING_WITH_EMPTY_ROW = "src/test/resources/cef/multiple-rows-starting-with-empty-row.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_EMPTY_ROWS = "src/test/resources/cef/multiple-rows-with-empty-rows.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_DECREASING_NUMBER_OF_EXTENSIONS = "src/test/resources/cef/multiple-rows-decreasing-number-of-extensions.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_INCREASING_NUMBER_OF_EXTENSIONS = "src/test/resources/cef/multiple-rows-increasing-number-of-extensions.txt";
+
+    static final Map<String, Object> EXPECTED_HEADER_VALUES = new HashMap<>();
+    static final Map<String, Object> EXPECTED_EXTENSION_VALUES = new HashMap<>();
+
+    static final String CUSTOM_EXTENSION_FIELD_NAME = "loginsequence";
+    static final RecordField CUSTOM_EXTENSION_FIELD = new RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, RecordFieldType.INT.getDataType());
+    static final RecordField CUSTOM_EXTENSION_FIELD_AS_STRING = new RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, RecordFieldType.STRING.getDataType());
+    static final RecordField CUSTOM_EXTENSION_FIELD_AS_CHOICE = new RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, RecordFieldType.CHOICE.getChoiceDataType(
+            RecordFieldType.FLOAT.getDataType(), RecordFieldType.STRING.getDataType()
+    ));
+
+    static {
+        EXPECTED_HEADER_VALUES.put("version", Integer.valueOf(0));
+        EXPECTED_HEADER_VALUES.put("deviceVendor", "Company");
+        EXPECTED_HEADER_VALUES.put("deviceProduct", "Product");
+        EXPECTED_HEADER_VALUES.put("deviceVersion", "1.2.3");
+        EXPECTED_HEADER_VALUES.put("deviceEventClassId", "audit-login");
+        EXPECTED_HEADER_VALUES.put("name", "Successful login");
+        EXPECTED_HEADER_VALUES.put("severity", "3");
+
+        EXPECTED_EXTENSION_VALUES.put("cn1Label", "userid");
+        EXPECTED_EXTENSION_VALUES.put("spt", Integer.valueOf(46117));
+        EXPECTED_EXTENSION_VALUES.put("cn1", Long.valueOf(99999));
+        EXPECTED_EXTENSION_VALUES.put("cfp1", Float.valueOf(1.23F));
+        EXPECTED_EXTENSION_VALUES.put("dst", "127.0.0.1");
+        EXPECTED_EXTENSION_VALUES.put("c6a1", "2345:425:2ca1:0:0:567:5673:23b5"); // Lower case representation is expected and values with leading zeroes shortened
+        EXPECTED_EXTENSION_VALUES.put("dmac", "00:0d:60:af:1b:61"); // Lower case representation is expected
+
+        EXPECTED_EXTENSION_VALUES.put("start", new Timestamp(1479152665000L));
+
+        try {
+            final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MMM-dd HH:mm:ss.SSS", Locale.US);
+            EXPECTED_EXTENSION_VALUES.put("end", simpleDateFormat.parse("2017-Jan-12 12:23:45.000"));
+        } catch (ParseException e) {
+            throw new RuntimeException(e);
+        }
+
+        EXPECTED_EXTENSION_VALUES.put("dlat", Double.valueOf(456.789D));
+    }
+
+    /**
+     * The list of custom fields in the schema are expected to be generated based on the first event in the flow file. Because of this, it will not contain all the
+     * possible extension fields. Also because of this, the following rows are handled based on this schema as well.
+     */
+    static List<RecordField> getFieldWithExtensions() {
+        final List<RecordField> result = new ArrayList<>(CEFSchemaUtil.getHeaderFields());
+        result.add(new RecordField("cn1Label", RecordFieldType.STRING.getDataType()));
+        result.add(new RecordField("spt", RecordFieldType.INT.getDataType()));
+        result.add(new RecordField("cn1", RecordFieldType.LONG.getDataType()));
+        result.add(new RecordField("cfp1", RecordFieldType.FLOAT.getDataType()));
+        result.add(new RecordField("dst", RecordFieldType.STRING.getDataType()));
+        result.add(new RecordField("c6a1", RecordFieldType.STRING.getDataType()));
+        result.add(new RecordField("dmac", RecordFieldType.STRING.getDataType()));
+        result.add(new RecordField("start", RecordFieldType.TIMESTAMP.getDataType()));
+        result.add(new RecordField("end", RecordFieldType.TIMESTAMP.getDataType()));
+        result.add(new RecordField("dlat", RecordFieldType.DOUBLE.getDataType()));
+        return result;
+    }
+
+    static List<RecordField> getFieldsWithCustomExtensions(RecordField... customExtensions) {
+        final List<RecordField> result = TestCEFUtil.getFieldWithExtensions();
+
+        for (final RecordField customExtension : customExtensions) {
+            result.add(customExtension);
+        }
+
+        return result;
+    }
+
+    /*
+     * Record internally keeps the unknown fields even if they are marked for dropping. Thus direct comparison will not work as expected
+     */
+    static void assertFieldsAre(final Record record, final Map<String, Object> expectedValues) {
+        Assert.assertEquals(expectedValues.size(), record.getValues().length);
+        expectedValues.entrySet().forEach(field -> Assert.assertEquals("Field " + field.getKey() + " is incorrect" , field.getValue(), record.getValue(field.getKey())));
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/util/TestSchemaInferenceUtil.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/util/TestSchemaInferenceUtil.java
new file mode 100644
index 0000000..bacb21a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/util/TestSchemaInferenceUtil.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.schema.inference.TimeValueInference;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.math.BigDecimal;
+import java.util.stream.Stream;
+
+class TestSchemaInferenceUtil {
+    private static final String DATE_FORMAT = "yyyy-MM-dd";
+    private static final String TIME_FORMAT = "HH:mm:ss";
+    private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+
+    private final TimeValueInference timeInference = new TimeValueInference(DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT);
+
+    @ParameterizedTest
+    @MethodSource("data")
+    public void testWithoutTimeInference(final String input, final DataType expectedResult) {
+        Assertions.assertEquals(expectedResult, SchemaInferenceUtil.getDataType(input));
+    }
+
+    @ParameterizedTest
+    @MethodSource("dataForTimeInference")
+    public void testWithTimeInference(final String input, final DataType expectedResult) {
+        Assertions.assertEquals(expectedResult, SchemaInferenceUtil.getDataType(input, timeInference));
+    }
+
+    private static Stream<Arguments> data() {
+        return Stream.of(
+            Arguments.of(null, null),
+            Arguments.of("", null),
+
+            Arguments.of("true", RecordFieldType.BOOLEAN.getDataType()),
+            Arguments.of("TRUE", RecordFieldType.BOOLEAN.getDataType()),
+            Arguments.of("FALSE", RecordFieldType.BOOLEAN.getDataType()),
+            Arguments.of("tRUE", RecordFieldType.BOOLEAN.getDataType()),
+            Arguments.of("fALSE", RecordFieldType.BOOLEAN.getDataType()),
+
+            Arguments.of(new BigDecimal(Double.MAX_VALUE - 1).toPlainString() + ".01", RecordFieldType.DOUBLE.getDataType()),
+
+            Arguments.of(String.valueOf(1.1D), RecordFieldType.FLOAT.getDataType()),
+
+
+            Arguments.of(String.valueOf(1), RecordFieldType.INT.getDataType()),
+            Arguments.of(String.valueOf(Long.MAX_VALUE), RecordFieldType.LONG.getDataType()),
+
+            Arguments.of("c", RecordFieldType.STRING.getDataType()),
+            Arguments.of("string", RecordFieldType.STRING.getDataType())
+        );
+    }
+
+    private static Stream<Arguments> dataForTimeInference() {
+        return Stream.of(
+            Arguments.of("2017-03-19", RecordFieldType.DATE.getDataType(DATE_FORMAT)),
+            Arguments.of("2017-3-19", RecordFieldType.STRING.getDataType()),
+            Arguments.of("2017-44-55", RecordFieldType.STRING.getDataType()),
+            Arguments.of("2017.03.19", RecordFieldType.STRING.getDataType()),
+
+            Arguments.of("12:13:14", RecordFieldType.TIME.getDataType(TIME_FORMAT)),
+            Arguments.of("12:13:0", RecordFieldType.TIME.getDataType(TIME_FORMAT)),
+            Arguments.of("12:3:0", RecordFieldType.TIME.getDataType(TIME_FORMAT)),
+            Arguments.of("25:13:14", RecordFieldType.STRING.getDataType()),
+            Arguments.of("25::14", RecordFieldType.STRING.getDataType()),
+
+            Arguments.of("2019-01-07T14:59:27.817Z", RecordFieldType.TIMESTAMP.getDataType(TIMESTAMP_FORMAT)),
+            Arguments.of("2019-01-07T14:59:27.Z", RecordFieldType.STRING.getDataType()),
+            Arguments.of("2019-01-07T14:59:27Z", RecordFieldType.STRING.getDataType()),
+            Arguments.of("2019-01-07T14:59:27.817", RecordFieldType.STRING.getDataType()),
+            Arguments.of("2019-01-07 14:59:27.817Z", RecordFieldType.STRING.getDataType())
+        );
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/empty-row.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/empty-row.txt
new file mode 100644
index 0000000..e69de29
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/misformatted-row.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/misformatted-row.txt
new file mode 100644
index 0000000..7801520
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/misformatted-row.txt
@@ -0,0 +1,3 @@
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
+Oct 12 04:16:11 localhost CEF:0|nxlog.org|nxlog|2.7.1243|
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.4|audit-login|Successful login|1|
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-decreasing-number-of-extensions.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-decreasing-number-of-extensions.txt
new file mode 100644
index 0000000..4f37182
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-decreasing-number-of-extensions.txt
@@ -0,0 +1,2 @@
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|cn1Label=userid spt=46117 cn1=99999 cfp1=1.23 dst=127.0.0.1 c6a1=2345:0425:2CA1:0000:0000:0567:5673:23b5 dmac=00:0D:60:AF:1B:61 start=1479152665000 end=Jan 12 2017 12:23:45 dlat=456.789
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|cn1Label=userid spt=46117 cn1=99999 cfp1=1.23 dst=127.0.0.1 start=1479152665000 end=Jan 12 2017 12:23:45 dlat=456.789
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-increasing-number-of-extensions.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-increasing-number-of-extensions.txt
new file mode 100644
index 0000000..c70a816
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-increasing-number-of-extensions.txt
@@ -0,0 +1,2 @@
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|cn1Label=userid spt=46117 cn1=99999 cfp1=1.23 dst=127.0.0.1 start=1479152665000 end=Jan 12 2017 12:23:45 dlat=456.789
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|cn1Label=userid spt=46117 cn1=99999 cfp1=1.23 dst=127.0.0.1 c6a1=2345:0425:2CA1:0000:0000:0567:5673:23b5 dmac=00:0D:60:AF:1B:61 start=1479152665000 end=Jan 12 2017 12:23:45 dlat=456.789
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-starting-with-empty-row.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-starting-with-empty-row.txt
new file mode 100644
index 0000000..8fece0d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-starting-with-empty-row.txt
@@ -0,0 +1,8 @@
+
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
+
+
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
+
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
+
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-with-different-custom-types.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-with-different-custom-types.txt
new file mode 100644
index 0000000..6e98cf4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-with-different-custom-types.txt
@@ -0,0 +1,2 @@
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|cn1Label=userid spt=46117 cn1=99999 cfp1=1.23  dst=127.0.0.1 c6a1=2345:0425:2CA1:0000:0000:0567:5673:23b5 dmac=00:0D:60:AF:1B:61 start=1479152665000 end=1484220225000 dlat=456.789 loginsequence=123.12
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|cn1Label=userid spt=46117 cn1=99999 cfp1=1.23  dst=127.0.0.1 c6a1=2345:0425:2CA1:0000:0000:0567:5673:23b5 dmac=00:0D:60:AF:1B:61 start=1479152665000 end=1484220225000 dlat=456.789 loginsequence=undetermined
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-with-empty-rows.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-with-empty-rows.txt
new file mode 100644
index 0000000..fe97934
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows-with-empty-rows.txt
@@ -0,0 +1,4 @@
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
+
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows.txt
new file mode 100644
index 0000000..b3231af
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/multiple-rows.txt
@@ -0,0 +1,3 @@
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-header-fields-only.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-header-fields-only.txt
new file mode 100644
index 0000000..f99d429
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-header-fields-only.txt
@@ -0,0 +1 @@
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-custom-extensions.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-custom-extensions.txt
new file mode 100644
index 0000000..bb569bb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-custom-extensions.txt
@@ -0,0 +1 @@
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|cn1Label=userid spt=46117 cn1=99999 cfp1=1.23  dst=127.0.0.1 c6a1=2345:0425:2CA1:0000:0000:0567:5673:23b5 dmac=00:0D:60:AF:1B:61 start=1479152665000 end=Jan 12 2017 12:23:45 dlat=456.789 loginsequence=123
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-empty-custom-extensions.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-empty-custom-extensions.txt
new file mode 100644
index 0000000..544158e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-empty-custom-extensions.txt
@@ -0,0 +1 @@
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|cn1Label=userid spt=46117 cn1=99999 cfp1=1.23  dst=127.0.0.1 c6a1=2345:0425:2CA1:0000:0000:0567:5673:23b5 dmac=00:0D:60:AF:1B:61 start=1479152665000 end=Jan 12 2017 12:23:45 dlat=456.789 loginsequence=
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-empty-extension.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-empty-extension.txt
new file mode 100644
index 0000000..8012c47
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-empty-extension.txt
@@ -0,0 +1 @@
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|cn1Label= spt=46117 cn1= cfp1=1.23 dst=127.0.0.1 c6a1=2345:0425:2CA1:0000:0000:0567:5673:23b5 dmac=00:0D:60:AF:1B:61 start=1479152665000 end=Jan 12 2017 12:23:45 dlat=456.789
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-extensions.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-extensions.txt
new file mode 100644
index 0000000..c7a7040
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-extensions.txt
@@ -0,0 +1 @@
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|cn1Label=userid spt=46117 cn1=99999 cfp1=1.23 dst=127.0.0.1 c6a1=2345:0425:2CA1:0000:0000:0567:5673:23b5 dmac=00:0D:60:AF:1B:61 start=1479152665000 end=Jan 12 2017 12:23:45 dlat=456.789
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-incorrect-custom-extensions.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-incorrect-custom-extensions.txt
new file mode 100644
index 0000000..ea6d896
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-incorrect-custom-extensions.txt
@@ -0,0 +1 @@
+Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|cn1Label=userid spt=46117 cn1=99999 cfp1=1.23  dst=127.0.0.1 c6a1=2345:0425:2CA1:0000:0000:0567:5673:23b5 dmac=00:0D:60:AF:1B:61 start=1479152665000 end=1484220225000 dlat=456.789 loginsequence=123.12
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-incorrect-header-field.txt b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-incorrect-header-field.txt
new file mode 100644
index 0000000..c212feb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/cef/single-row-with-incorrect-header-field.txt
@@ -0,0 +1 @@
+Oct 12 04:16:11 localhost CEF:Unknown|Company|Product|1.2.3|audit-login|Successful login|3|
\ No newline at end of file