You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/11/17 16:49:38 UTC

[GitHub] [nifi] simonbence opened a new pull request #5530: NIFI-9341 Adding record reader for CEF events

simonbence opened a new pull request #5530:
URL: https://github.com/apache/nifi/pull/5530


   [CFM-2310](https://jira.cloudera.com/browse/CFM-2310)
   
   Adding CEF message support for Records. This includes:
   - Record reader
   - Schema inference
   
   Most of the design decisions are based on the similar CSV and Grok readers, with adjusting the solution to fit the CEF specific needs (especially to the different field categories)
   
   For helping with testing, [here](https://gist.github.com/simonbence/ed2741cbedb03f369b54ee0fd89b7eb6) is a simple template with example data.
   
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   _Enables X functionality; fixes bug NIFI-YYYY._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#discussion_r792577922



##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/ValidateLocale.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+import java.util.Locale;
+
+/**
+ * This class is identical to {@code org.apache.nifi.processors.standard.ParseCEF.ValidateLocale}.

Review comment:
       I share your concerns with this and I was thinking heavily how to approach. Finally I decided this way considering the amount of duplicated code. The remark is more like something to keep in mind for now but I think in time they may evolve in different ways




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #5530:
URL: https://github.com/apache/nifi/pull/5530


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#discussion_r793681389



##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/ValidateLocale.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+import java.util.Locale;
+
+/**
+ * This class is identical to {@code org.apache.nifi.processors.standard.ParseCEF.ValidateLocale}.

Review comment:
       Thanks for reply, that seems reasonable given the narrow scope of this particular class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#discussion_r780558536



##########
File path: nifi-commons/nifi-record/pom.xml
##########
@@ -15,6 +15,14 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.7</version>

Review comment:
       Recommend upgrading the more recent version 3.12.0:
   ```suggestion
               <version>3.12.0</version>
   ```

##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
##########
@@ -124,6 +124,21 @@
             <artifactId>guava</artifactId>
             <version>27.0.1-jre</version>
         </dependency>
+        <dependency>
+            <groupId>com.fluenda</groupId>
+            <artifactId>parcefone</artifactId>
+            <version>2.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.bval</groupId>
+            <artifactId>bval-jsr</artifactId>
+            <version>1.1.2</version>

Review comment:
       This particular version is several years old, the current version is 2.0.5, is it possible to use that version?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-1007342283


   Hey @simonbence - I gave this another try and I'm a bit concerned on how this would work when processing a flow file containing a lot of CEF logs but with one line not being correctly formatted according to CEF specifications. Right now, if a single line cannot be parsed, the whole flow file would fail (when using a processor such as ConvertRecord). Can we think of a solution to always successfully parse a line but parse it in a specific way so that filtering between "good" and "bad" records can be done? I'm wondering if, similarly to the property RAW_FIELD, we could have an INVALID_FIELD, and if specified, the line would not be parsed but a record with a single field containing the raw line would be produced. This way, when using a processor like QueryRecord processor, it'd be super easy to do the conversion from CEF to JSON while also filtering bad and good records in two distinct flow files. Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-1012067908


   Yep, case 1 is what I was expecting but I was surprised by the behavior of case 2 :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
simonbence commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-1024400114


   > Thanks for the updates and relocating to the DataType inference utilities @simonbence! This looks ready to merge pending a clean build. Will run through one more round of verification.
   
   Thanks! As for data type: my original intention was to eliminate the dupliated code and _DataTypeUtils_ seemed like a good place for that. But due to it's specific use (for schema inferring) it is better to keep it around the readers. With this we also can keep the well established number checking (instead of adding some with limited capabilities)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] ottobackwards commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
ottobackwards commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-972188171


   Wow, this is really nice.  When I reviewed it, the first thing that came to mind is heterogeneous fields on a record by record basis. 
   
   I see that you have covered that in your excellent additionalDetails, but I think that you may want to find a way to briefly mention it in the property doc string.
   
   I can see folks being really surprised at what they get if they don't go to details first.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
simonbence commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-1007774608


   > 
   
   Thank you for your invested time and insights! With most of the design decisions I followed the existing patterns but your concerns are valid. Based on your suggestions, I created a possible solution, please take a look. I added some new test case as well which might make it easier to understand the possible implications of this change. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#discussion_r780558536



##########
File path: nifi-commons/nifi-record/pom.xml
##########
@@ -15,6 +15,14 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.7</version>

Review comment:
       Recommend upgrading the more recent version 3.12.0:
   ```suggestion
               <version>3.12.0</version>
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#discussion_r780559141



##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
##########
@@ -124,6 +124,21 @@
             <artifactId>guava</artifactId>
             <version>27.0.1-jre</version>
         </dependency>
+        <dependency>
+            <groupId>com.fluenda</groupId>
+            <artifactId>parcefone</artifactId>
+            <version>2.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.bval</groupId>
+            <artifactId>bval-jsr</artifactId>
+            <version>1.1.2</version>

Review comment:
       This particular version is several years old, the current version is 2.0.5, is it possible to use that version?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#discussion_r785769553



##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
##########
@@ -124,6 +124,21 @@
             <artifactId>guava</artifactId>
             <version>27.0.1-jre</version>
         </dependency>
+        <dependency>
+            <groupId>com.fluenda</groupId>
+            <artifactId>parcefone</artifactId>
+            <version>2.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.bval</groupId>
+            <artifactId>bval-jsr</artifactId>
+            <version>1.1.2</version>

Review comment:
       You are right in both cases: I used versions from the original CEF processor implementation. Note: BVAL needed validation dependency as well, as new BVAL needs Java Validation API 2.0




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#discussion_r793722602



##########
File path: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
##########
@@ -2306,4 +2307,47 @@ public static boolean isFittingNumberType(final Object value, final RecordFieldT
 
         return false;
     }
+
+    public static DataType inferSimpleDataType(final String value) {
+        if (value == null || value.isEmpty()) {
+            return null;
+        }
+
+        if (NumberUtils.isParsable(value)) {

Review comment:
       As mentioned in relation to the commons-lang3 dependency declaration, this reference should be replaced with a direct check.  Perhaps implementing a simple check for null and empty, together with moving the Boolean string check prior to this one would be sufficient?  It would also be helpful to add a unit test for this method.

##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFUtil.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.junit.Assert;
+
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class TestCEFUtil {
+    static final String RAW_FIELD = "raw";
+    static final String RAW_VALUE = "Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|";
+
+    static final String INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY = "src/test/resources/cef/single-row-header-fields-only.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EXTENSIONS = "src/test/resources/cef/single-row-with-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EMPTY_EXTENSION = "src/test/resources/cef/single-row-with-empty-extension.txt";
+    static final String INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS = "src/test/resources/cef/single-row-with-custom-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EMPTY_CUSTOM_EXTENSIONS = "src/test/resources/cef/single-row-with-empty-custom-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_INCORRECT_HEADER_FIELD = "src/test/resources/cef/single-row-with-incorrect-header-field.txt";
+    static final String INPUT_SINGLE_ROW_WITH_INCORRECT_CUSTOM_EXTENSIONS = "src/test/resources/cef/single-row-with-incorrect-custom-extensions.txt";
+    static final String INPUT_EMPTY_ROW = "src/test/resources/cef/empty-row.txt";
+    static final String INPUT_MISFORMATTED_ROW = "src/test/resources/cef/misformatted-row.txt";
+    static final String INPUT_MULTIPLE_IDENTICAL_ROWS = "src/test/resources/cef/multiple-rows.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_DIFFERENT_CUSTOM_TYPES = "src/test/resources/cef/multiple-rows-with-different-custom-types.txt";
+    static final String INPUT_MULTIPLE_ROWS_STARTING_WITH_EMPTY_ROW = "src/test/resources/cef/multiple-rows-starting-with-empty-row.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_EMPTY_ROWS = "src/test/resources/cef/multiple-rows-with-empty-rows.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_DECREASING_NUMBER_OF_EXTENSIONS = "src/test/resources/cef/multiple-rows-decreasing-number-of-extensions.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_INCREASING_NUMBER_OF_EXTENSIONS = "src/test/resources/cef/multiple-rows-increasing-number-of-extensions.txt";
+
+    static final Map<String, Object> EXPECTED_HEADER_VALUES = new HashMap<>();
+    static final Map<String, Object> EXPECTED_EXTENSION_VALUES = new HashMap<>();
+
+    static final String CUSTOM_EXTENSION_FIELD_NAME = "loginsequence";
+    static final RecordField CUSTOM_EXTENSION_FIELD = new RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, RecordFieldType.INT.getDataType());
+    static final RecordField CUSTOM_EXTENSION_FIELD_AS_STRING = new RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, RecordFieldType.STRING.getDataType());
+    static final RecordField CUSTOM_EXTENSION_FIELD_AS_CHOICE = new RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, RecordFieldType.CHOICE.getChoiceDataType(
+            RecordFieldType.FLOAT.getDataType(), RecordFieldType.STRING.getDataType()
+    ));
+
+    static {
+        EXPECTED_HEADER_VALUES.put("version", Integer.valueOf(0));

Review comment:
       `Integer.valueOf()` is unnecessary as the compiler will handle boxing.

##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFRecordReader.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.event.CEFHandlingException;
+import com.fluenda.parcefone.event.CommonEvent;
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.text.DateFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * CEF (Common Event Format) based implementation for {@code org.apache.nifi.serialization.RecordReader}. The implementation
+ * builds on top of the {@code com.fluenda.parcefone.parser.CEFParser} which is responsible to deal with the textual representation
+ * of the events. This implementation intends to fill the gap between the CEF parser's {@code com.fluenda.parcefone.event.CommonEvent}
+ * data objects and the NiFi's internal Record representation.
+ */
+final class CEFRecordReader implements RecordReader {
+    /**
+     * Currently these are not used but set the way it follows the expected CEF format.
+     */
+    private static final String DATE_FORMAT = "MMM dd yyyy";
+    private static final String TIME_FORMAT = "HH:mm:ss";
+    private static final String DATETIME_FORMAT = DATE_FORMAT + " " + TIME_FORMAT;
+
+    private static final Supplier<DateFormat> DATE_FORMAT_SUPPLIER = () -> DataTypeUtils.getDateFormat(DATE_FORMAT);
+    private static final Supplier<DateFormat> TIME_FORMAT_SUPPLIER = () -> DataTypeUtils.getDateFormat(TIME_FORMAT);
+    private static final Supplier<DateFormat> DATETIME_FORMAT_SUPPLIER = () -> DataTypeUtils.getDateFormat(DATETIME_FORMAT);
+
+    private final RecordSchema schema;
+    private final BufferedReader reader;
+    private final CEFParser parser;
+    private final ComponentLog logger;
+    private final Locale locale;
+    private final String rawMessageField;
+    private final String invalidField;
+
+    /**
+     * It would not cause any functional drawback to acquire the custom extensions all the time, but there are some cases
+     * (inferred schema when custom extension fields are not included) where we can be sure about that these fields are not
+     * required. For better performance, in these cases we do not work with these fields.
+     */
+    private final boolean includeCustomExtensions;
+    private final boolean acceptEmptyExtensions;
+
+    CEFRecordReader(
+        final InputStream inputStream,
+        final RecordSchema recordSchema,
+        final CEFParser parser,
+        final ComponentLog logger,
+        final Locale locale,
+        final String rawMessageField,
+        final String invalidField,
+        final boolean includeCustomExtensions,
+        final boolean acceptEmptyExtensions
+    ) {
+        this.reader = new BufferedReader(new InputStreamReader(inputStream));
+        this.schema = recordSchema;
+        this.parser = parser;
+        this.logger = logger;
+        this.locale = locale;
+        this.rawMessageField = rawMessageField;
+        this.invalidField = invalidField;
+        this.includeCustomExtensions = includeCustomExtensions;
+        this.acceptEmptyExtensions = acceptEmptyExtensions;
+    }
+
+    @Override
+    public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
+        final String line = nextLine();
+
+        if (line == null) {
+            return null;
+        }
+
+        final CommonEvent event = parser.parse(line, true, acceptEmptyExtensions, locale);
+
+        if (event == null) {
+            logger.debug("Event parsing resulted no event");
+
+            if (invalidField != null && !invalidField.isEmpty()) {
+                return new MapRecord(schema, Collections.singletonMap(invalidField, line));
+            } else {
+                throw new MalformedRecordException("The following line could not be parsed by the CEF parser: " + line);
+            }
+        }
+
+        final Map<String, Object> values = new HashMap<>();
+
+        try {
+            event.getHeader().entrySet().forEach(field -> values.put(field.getKey(), convertValue(field.getKey(), field.getValue(), coerceTypes)));
+            event.getExtension(true, includeCustomExtensions).entrySet().forEach(field -> values.put(field.getKey(), convertValue(field.getKey() ,field.getValue(), coerceTypes)));
+
+            for (final String fieldName : schema.getFieldNames()) {
+                if (!values.containsKey(fieldName)) {
+                    values.put(fieldName, null);
+                }
+            }
+
+        } catch (final CEFHandlingException e) {
+            throw new MalformedRecordException("Error during extracting information from CEF event", e);
+        }
+
+        if (rawMessageField != null) {
+            values.put(rawMessageField, line);
+        }
+
+        return new MapRecord(schema, values, true, dropUnknownFields);
+    }
+
+    private String nextLine() throws IOException {
+        String line;
+
+        while((line = reader.readLine()) != null) {
+            if (!line.isEmpty()) {
+                break;
+            }
+        }
+
+        return line;
+    }
+
+    private Object convertValue(final String fieldName, final Object fieldValue, final boolean coerceType) {
+        final DataType dataType = schema.getDataType(fieldName).orElse(RecordFieldType.STRING.getDataType());
+
+        return coerceType
+            ? convert(fieldValue, dataType, fieldName)
+            : convertSimpleIfPossible(fieldValue, dataType, fieldName);
+    }
+
+    private final Object convert(final Object value, final DataType dataType, final String fieldName) {

Review comment:
       The `final` keyword is unnecessary on private methods.
   ```suggestion
       private Object convert(final Object value, final DataType dataType, final String fieldName) {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFReader.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestCEFReader {
+    private TestRunner runner;
+    private TestCEFProcessor processor;
+    private CEFReader reader;
+
+    @BeforeEach
+    public void setUp() {
+        runner = null;
+        processor = null;
+        reader = null;
+    }
+
+    @Test
+    public void testValidatingReaderWhenRawFieldValueIsInvalid() throws Exception {
+        setUpReader();
+        setRawField("dst"); // Invalid because there is an extension with the same name
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+
+        assertReaderIsInvalid();
+    }
+
+    @Test
+    public void testReadingSingleRowWithHeaderFields() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleRowWithHeaderFieldsAndRaw() throws Exception {
+        setUpReader();
+        setRawField(TestCEFUtil.RAW_FIELD);
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, Collections.singletonMap(TestCEFUtil.RAW_FIELD, TestCEFUtil.RAW_VALUE));
+    }
+
+    @Test
+    public void testReadingSingleRowWithExtensionFieldsWhenSchemaIsHeadersOnly() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleRowWithCustomExtensionFieldsAsStrings() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_AS_STRINGS);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES, Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, "123"));
+    }
+
+    @Test
+    public void testMisformattedRowsWithoutInvalidFieldIsBeingSet() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_INFERRED);
+        enableReader();
+
+        triggerProcessorWithError(TestCEFUtil.INPUT_MISFORMATTED_ROW);
+    }
+
+    @Test
+    public void testMisformattedRowsWithInvalidFieldIsSet() throws Exception {
+        setUpReader();
+        setInvalidField();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_INFERRED);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_MISFORMATTED_ROW);
+
+        assertNumberOfResults(3);
+        assertFieldIsSet(1, "invalid", "Oct 12 04:16:11 localhost CEF:0|nxlog.org|nxlog|2.7.1243|");
+    }
+
+    @Test
+    public void testReadingSingleRowWithCustomExtensionFields() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_INFERRED);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES, Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, 123));
+    }
+
+    @Test
+    public void testReadingMultipleRows() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_MULTIPLE_IDENTICAL_ROWS);
+
+        assertNumberOfResults(3);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingMultipleRowsWithEmptyRows() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_MULTIPLE_ROWS_WITH_EMPTY_ROWS);
+
+        assertNumberOfResults(3);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingMultipleRowsStartingWithEmptyRow() throws Exception {
+        setUpReader();
+        setSchemaIsInferred(CEFReader.HEADERS_ONLY);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_MULTIPLE_ROWS_STARTING_WITH_EMPTY_ROW);
+
+        assertNumberOfResults(3);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testWithPredefinedSchema() throws Exception {
+        setUpReader();
+        setSchema(CEFSchemaUtil.getHeaderFields());
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES);
+    }
+
+    @Test
+    public void testReadingSingleRowWithEmptyExtensionFields() throws Exception {
+        setUpReader();
+        setAcceptEmptyExtensions();
+        setSchemaIsInferred(CEFReader.CUSTOM_EXTENSIONS_INFERRED);
+        enableReader();
+
+        triggerProcessor(TestCEFUtil.INPUT_SINGLE_ROW_WITH_EMPTY_CUSTOM_EXTENSIONS);
+
+        assertNumberOfResults(1);
+        assertFieldsAre(TestCEFUtil.EXPECTED_HEADER_VALUES, TestCEFUtil.EXPECTED_EXTENSION_VALUES, Collections.singletonMap(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, ""));
+    }
+
+    private void setUpReader() throws InitializationException {
+        processor = new TestCEFProcessor();
+        runner = TestRunners.newTestRunner(processor);
+        reader = new CEFReader();
+        runner.addControllerService("reader", reader);
+        runner.setProperty(TestCEFProcessor.READER, "reader");
+    }
+
+    private void setAcceptEmptyExtensions() {
+        runner.setProperty(reader, CEFReader.ACCEPT_EMPTY_EXTENSIONS, "true");
+    }
+
+    private void setSchemaIsInferred(final AllowableValue value) {
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
+        runner.setProperty(reader, CEFReader.INFERENCE_STRATEGY, value);
+    }
+
+    private void setInvalidField() {
+        runner.setProperty(reader, CEFReader.INVALID_FIELD, "invalid");
+    }
+
+    private void setSchema(List<RecordField> fields) throws InitializationException {
+        final MockSchemaRegistry registry = new MockSchemaRegistry();
+        registry.addSchema("predefinedSchema", new SimpleRecordSchema(fields));
+        runner.addControllerService("registry", registry);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY);
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_NAME, "predefinedSchema");
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
+        runner.enableControllerService(registry);
+    }
+
+    private void setRawField(final String value) {
+        runner.setProperty(reader, CEFReader.RAW_FIELD, value);
+    }
+
+    private void enableReader() {
+        runner.assertValid(reader);
+        runner.enableControllerService(reader);
+    }
+
+    private void triggerProcessor(final String input) throws FileNotFoundException {
+        runner.enqueue(new FileInputStream(input));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TestCEFProcessor.SUCCESS);
+    }
+
+    private void triggerProcessorWithError(final String input) {
+        try {
+            runner.enqueue(new FileInputStream(input));
+            runner.run();
+            Assertions.fail();
+        } catch (final Throwable e) {
+            // the TestCEFProcessor wraps the original exception into a RuntimeException
+            Assertions.assertTrue(e.getCause() instanceof RuntimeException);
+            Assertions.assertTrue(e.getCause().getCause() instanceof IOException);
+        }

Review comment:
       This should be changed to use `assertThrows`, additional checks on the returned exception can use other assertions.
   
   ```suggestion
           runner.enqueue(new FileInputStream(input));
           final RuntimeException exception = assertThrows(RuntimeException.class, () -> runner.run());
   ```

##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFReader.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.bval.jsr.ApacheValidationProvider;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import javax.validation.Validation;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
+
+@Tags({"cef", "record", "reader", "parser"})
+@CapabilityDescription("Parses CEF (Common Event Format) events, returning each row as a record. "
+    + "This reader allows for inferring a schema based on the first event in the FlowFile or providing an explicit schema for interpreting the values.")
+public final class CEFReader extends SchemaRegistryService implements RecordReaderFactory {
+
+    static final AllowableValue HEADERS_ONLY = new AllowableValue("headers-only", "Headers only", "Includes only CEF header fields into the inferred schema.");
+    static final AllowableValue HEADERS_AND_EXTENSIONS = new AllowableValue("headers-and-extensions", "Headers and extensions",
+            "Includes the CEF header and extension fields to the schema, but not the custom extensions.");
+    static final AllowableValue CUSTOM_EXTENSIONS_AS_STRINGS = new AllowableValue("custom-extensions-as-string", "With custom extensions as strings",
+            "Includes all fields into the inferred schema, involving custom extension fields as string values.");
+    static final AllowableValue CUSTOM_EXTENSIONS_INFERRED = new AllowableValue("custom-extensions-inferred", "With custom extensions inferred",
+            "Includes all fields into the inferred schema, involving custom extension fields with inferred data types. " +
+            "The inference works based on the values in the FlowFile. In some scenarios this might result unsatisfiable behaviour. " +
+            "In these cases it is suggested to use \"" + CUSTOM_EXTENSIONS_AS_STRINGS.getDisplayName() + "\" Inference Strategy or predefined schema.");
+
+    static final PropertyDescriptor INFERENCE_STRATEGY = new PropertyDescriptor.Builder()
+            .name("inference-strategy")
+            .displayName("Inference Strategy")
+            .description("Defines the set of fields should be included in the schema and the way the fields are being interpreted.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .allowableValues(HEADERS_ONLY, HEADERS_AND_EXTENSIONS, CUSTOM_EXTENSIONS_AS_STRINGS, CUSTOM_EXTENSIONS_INFERRED)
+            .defaultValue(CUSTOM_EXTENSIONS_INFERRED.getValue())
+            .dependsOn(SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA)
+            .build();
+
+    static final PropertyDescriptor RAW_FIELD = new PropertyDescriptor.Builder()
+            .name("raw-message-field")
+            .displayName("Raw Message Field")
+            .description("If set the raw message will be added to the record using the property value as field name. This is not the same as the \"rawEvent\" extension field!")
+            .addValidator(new ValidateRawField())
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INVALID_FIELD = new PropertyDescriptor.Builder()
+            .name("invalid-message-field")
+            .displayName("Invalid Field")
+            .description("Used when a line in the FlowFile cannot be parsed by the CEF parser. " +
+                    "If set, instead of failing to process the FlowFile, a record is being added with one field. " +
+                    "This record contains one field with the name specified by the property and the raw message as value.")
+            .addValidator(new ValidateRawField())
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor DATETIME_REPRESENTATION = new PropertyDescriptor.Builder()
+            .name("datetime-representation")
+            .displayName("DateTime Locale")
+            .description("The IETF BCP 47 representation of the Locale to be used when parsing date " +
+                    "fields with long or short month names (e.g. may <en-US> vs. mai. <fr-FR>. The default" +
+                    "value is generally safe. Only change if having issues parsing CEF messages")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(new ValidateLocale())
+            .defaultValue("en-US")
+            .build();
+
+    static final PropertyDescriptor ACCEPT_EMPTY_EXTENSIONS = new PropertyDescriptor.Builder()
+            .name("accept-empty-extensions")
+            .displayName("Accept empty extensions")
+            .description("If set to true, empty extensions will be accepted and will be associated to a null value.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .required(true)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
+    private final javax.validation.Validator validator = Validation.byProvider(ApacheValidationProvider.class).configure().buildValidatorFactory().getValidator();
+    private final CEFParser parser = new CEFParser(validator);
+
+    private volatile String rawMessageField;
+    private volatile String invalidField;
+    private volatile Locale parcefoneLocale;
+    private volatile boolean includeCustomExtensions;
+    private volatile boolean acceptEmptyExtensions;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(RAW_FIELD);
+        properties.add(INVALID_FIELD);
+        properties.add(DATETIME_REPRESENTATION);
+        properties.add(INFERENCE_STRATEGY);
+
+        properties.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(SCHEMA_CACHE)
+                .dependsOn(SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA)
+                .build());
+
+        properties.add(ACCEPT_EMPTY_EXTENSIONS);
+        return properties;
+    }
+
+    @Override
+    protected List<AllowableValue> getSchemaAccessStrategyValues() {
+        final List<AllowableValue> allowableValues = new ArrayList<>();
+        allowableValues.addAll(super.getSchemaAccessStrategyValues());
+        allowableValues.add(SchemaInferenceUtil.INFER_SCHEMA);
+        return allowableValues;
+    }
+
+    @Override
+    protected AllowableValue getDefaultSchemaAccessStrategy() {
+        return SchemaInferenceUtil.INFER_SCHEMA;
+    }
+
+    @Override
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
+        if (strategy.equals(SchemaInferenceUtil.INFER_SCHEMA.getValue())) {
+            final String inferenceStrategy = context.getProperty(INFERENCE_STRATEGY).getValue();
+            final CEFSchemaInferenceBuilder builder = new CEFSchemaInferenceBuilder();
+
+            if (inferenceStrategy.equals(HEADERS_AND_EXTENSIONS.getValue())) {
+                builder.withExtensions();
+            } else if (inferenceStrategy.equals(CUSTOM_EXTENSIONS_AS_STRINGS.getValue())) {
+                builder.withCustomExtensions(CEFCustomExtensionTypeResolver.STRING_RESOLVER);
+            } else if (inferenceStrategy.equals(CUSTOM_EXTENSIONS_INFERRED.getValue())) {
+                builder.withCustomExtensions(CEFCustomExtensionTypeResolver.SIMPLE_RESOLVER);
+            }
+
+            if (rawMessageField != null) {
+                builder.withRawMessage(rawMessageField);
+            }
+
+            if (invalidField != null) {
+                builder.withInvalidField(invalidField);
+            }
+
+            final boolean failFast = invalidField == null || invalidField.isEmpty();
+            final CEFSchemaInference inference = builder.build();
+            return SchemaInferenceUtil.getSchemaAccessStrategy(
+                strategy,
+                context,
+                getLogger(),
+                (variables, in) -> new CEFRecordSource(in, parser, parcefoneLocale, acceptEmptyExtensions, failFast),
+                () -> inference,
+                () -> super.getSchemaAccessStrategy(strategy, schemaRegistry, context));
+        }
+
+        return super.getSchemaAccessStrategy(strategy, schemaRegistry, context);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        rawMessageField = context.getProperty(RAW_FIELD).evaluateAttributeExpressions().getValue();
+        invalidField = context.getProperty(INVALID_FIELD).evaluateAttributeExpressions().getValue();
+        parcefoneLocale = Locale.forLanguageTag(context.getProperty(DATETIME_REPRESENTATION).evaluateAttributeExpressions().getValue());
+
+        final String inferenceStrategy = context.getProperty(INFERENCE_STRATEGY).getValue();
+        final boolean inferenceNeedsCustomExtensions = !inferenceStrategy.equals(HEADERS_ONLY.getValue()) && !inferenceStrategy.equals(HEADERS_AND_EXTENSIONS.getValue());
+        final boolean isInferSchema =  context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue().equals(SchemaInferenceUtil.INFER_SCHEMA.getValue());
+        includeCustomExtensions = !isInferSchema || (isInferSchema && inferenceNeedsCustomExtensions);

Review comment:
       This can be simplified as follows:
   ```suggestion
           includeCustomExtensions = !isInferSchema || inferenceNeedsCustomExtensions;
   ```

##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFReader.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.bval.jsr.ApacheValidationProvider;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import javax.validation.Validation;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
+import static org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
+
+@Tags({"cef", "record", "reader", "parser"})
+@CapabilityDescription("Parses CEF (Common Event Format) events, returning each row as a record. "
+    + "This reader allows for inferring a schema based on the first event in the FlowFile or providing an explicit schema for interpreting the values.")
+public final class CEFReader extends SchemaRegistryService implements RecordReaderFactory {
+
+    static final AllowableValue HEADERS_ONLY = new AllowableValue("headers-only", "Headers only", "Includes only CEF header fields into the inferred schema.");
+    static final AllowableValue HEADERS_AND_EXTENSIONS = new AllowableValue("headers-and-extensions", "Headers and extensions",
+            "Includes the CEF header and extension fields to the schema, but not the custom extensions.");
+    static final AllowableValue CUSTOM_EXTENSIONS_AS_STRINGS = new AllowableValue("custom-extensions-as-string", "With custom extensions as strings",
+            "Includes all fields into the inferred schema, involving custom extension fields as string values.");
+    static final AllowableValue CUSTOM_EXTENSIONS_INFERRED = new AllowableValue("custom-extensions-inferred", "With custom extensions inferred",
+            "Includes all fields into the inferred schema, involving custom extension fields with inferred data types. " +
+            "The inference works based on the values in the FlowFile. In some scenarios this might result unsatisfiable behaviour. " +
+            "In these cases it is suggested to use \"" + CUSTOM_EXTENSIONS_AS_STRINGS.getDisplayName() + "\" Inference Strategy or predefined schema.");
+
+    static final PropertyDescriptor INFERENCE_STRATEGY = new PropertyDescriptor.Builder()
+            .name("inference-strategy")
+            .displayName("Inference Strategy")
+            .description("Defines the set of fields should be included in the schema and the way the fields are being interpreted.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .allowableValues(HEADERS_ONLY, HEADERS_AND_EXTENSIONS, CUSTOM_EXTENSIONS_AS_STRINGS, CUSTOM_EXTENSIONS_INFERRED)
+            .defaultValue(CUSTOM_EXTENSIONS_INFERRED.getValue())
+            .dependsOn(SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA)
+            .build();
+
+    static final PropertyDescriptor RAW_FIELD = new PropertyDescriptor.Builder()
+            .name("raw-message-field")
+            .displayName("Raw Message Field")
+            .description("If set the raw message will be added to the record using the property value as field name. This is not the same as the \"rawEvent\" extension field!")
+            .addValidator(new ValidateRawField())
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INVALID_FIELD = new PropertyDescriptor.Builder()
+            .name("invalid-message-field")
+            .displayName("Invalid Field")
+            .description("Used when a line in the FlowFile cannot be parsed by the CEF parser. " +
+                    "If set, instead of failing to process the FlowFile, a record is being added with one field. " +
+                    "This record contains one field with the name specified by the property and the raw message as value.")
+            .addValidator(new ValidateRawField())
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor DATETIME_REPRESENTATION = new PropertyDescriptor.Builder()
+            .name("datetime-representation")
+            .displayName("DateTime Locale")
+            .description("The IETF BCP 47 representation of the Locale to be used when parsing date " +
+                    "fields with long or short month names (e.g. may <en-US> vs. mai. <fr-FR>. The default" +
+                    "value is generally safe. Only change if having issues parsing CEF messages")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(new ValidateLocale())
+            .defaultValue("en-US")
+            .build();
+
+    static final PropertyDescriptor ACCEPT_EMPTY_EXTENSIONS = new PropertyDescriptor.Builder()
+            .name("accept-empty-extensions")
+            .displayName("Accept empty extensions")
+            .description("If set to true, empty extensions will be accepted and will be associated to a null value.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .required(true)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .build();
+
+    private final javax.validation.Validator validator = Validation.byProvider(ApacheValidationProvider.class).configure().buildValidatorFactory().getValidator();
+    private final CEFParser parser = new CEFParser(validator);
+
+    private volatile String rawMessageField;
+    private volatile String invalidField;
+    private volatile Locale parcefoneLocale;
+    private volatile boolean includeCustomExtensions;
+    private volatile boolean acceptEmptyExtensions;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(RAW_FIELD);
+        properties.add(INVALID_FIELD);
+        properties.add(DATETIME_REPRESENTATION);
+        properties.add(INFERENCE_STRATEGY);
+
+        properties.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(SCHEMA_CACHE)
+                .dependsOn(SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA)
+                .build());
+
+        properties.add(ACCEPT_EMPTY_EXTENSIONS);
+        return properties;
+    }
+
+    @Override
+    protected List<AllowableValue> getSchemaAccessStrategyValues() {
+        final List<AllowableValue> allowableValues = new ArrayList<>();
+        allowableValues.addAll(super.getSchemaAccessStrategyValues());

Review comment:
       This can be collapsed into one declaration:
   ```suggestion
           final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
   ```

##########
File path: nifi-commons/nifi-record/pom.xml
##########
@@ -15,6 +15,14 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.12.0</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>

Review comment:
       Reviewing the comments on this module, it should not have any external dependencies.  Although `NumberUtils.isParsable()` is convenient, the approach should be refactored to avoid this dependency.

##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFRecordReader.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import com.fluenda.parcefone.event.CEFHandlingException;
+import com.fluenda.parcefone.event.CommonEvent;
+import com.fluenda.parcefone.parser.CEFParser;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.text.DateFormat;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * CEF (Common Event Format) based implementation for {@code org.apache.nifi.serialization.RecordReader}. The implementation
+ * builds on top of the {@code com.fluenda.parcefone.parser.CEFParser} which is responsible to deal with the textual representation
+ * of the events. This implementation intends to fill the gap between the CEF parser's {@code com.fluenda.parcefone.event.CommonEvent}
+ * data objects and the NiFi's internal Record representation.
+ */
+final class CEFRecordReader implements RecordReader {
+    /**
+     * Currently these are not used but set the way it follows the expected CEF format.
+     */
+    private static final String DATE_FORMAT = "MMM dd yyyy";
+    private static final String TIME_FORMAT = "HH:mm:ss";
+    private static final String DATETIME_FORMAT = DATE_FORMAT + " " + TIME_FORMAT;
+
+    private static final Supplier<DateFormat> DATE_FORMAT_SUPPLIER = () -> DataTypeUtils.getDateFormat(DATE_FORMAT);
+    private static final Supplier<DateFormat> TIME_FORMAT_SUPPLIER = () -> DataTypeUtils.getDateFormat(TIME_FORMAT);
+    private static final Supplier<DateFormat> DATETIME_FORMAT_SUPPLIER = () -> DataTypeUtils.getDateFormat(DATETIME_FORMAT);
+
+    private final RecordSchema schema;
+    private final BufferedReader reader;
+    private final CEFParser parser;
+    private final ComponentLog logger;
+    private final Locale locale;
+    private final String rawMessageField;
+    private final String invalidField;
+
+    /**
+     * It would not cause any functional drawback to acquire the custom extensions all the time, but there are some cases
+     * (inferred schema when custom extension fields are not included) where we can be sure about that these fields are not
+     * required. For better performance, in these cases we do not work with these fields.
+     */
+    private final boolean includeCustomExtensions;
+    private final boolean acceptEmptyExtensions;
+
+    CEFRecordReader(
+        final InputStream inputStream,
+        final RecordSchema recordSchema,
+        final CEFParser parser,
+        final ComponentLog logger,
+        final Locale locale,
+        final String rawMessageField,
+        final String invalidField,
+        final boolean includeCustomExtensions,
+        final boolean acceptEmptyExtensions
+    ) {
+        this.reader = new BufferedReader(new InputStreamReader(inputStream));
+        this.schema = recordSchema;
+        this.parser = parser;
+        this.logger = logger;
+        this.locale = locale;
+        this.rawMessageField = rawMessageField;
+        this.invalidField = invalidField;
+        this.includeCustomExtensions = includeCustomExtensions;
+        this.acceptEmptyExtensions = acceptEmptyExtensions;
+    }
+
+    @Override
+    public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
+        final String line = nextLine();
+
+        if (line == null) {
+            return null;
+        }
+
+        final CommonEvent event = parser.parse(line, true, acceptEmptyExtensions, locale);
+
+        if (event == null) {
+            logger.debug("Event parsing resulted no event");
+
+            if (invalidField != null && !invalidField.isEmpty()) {
+                return new MapRecord(schema, Collections.singletonMap(invalidField, line));
+            } else {
+                throw new MalformedRecordException("The following line could not be parsed by the CEF parser: " + line);
+            }
+        }
+
+        final Map<String, Object> values = new HashMap<>();
+
+        try {
+            event.getHeader().entrySet().forEach(field -> values.put(field.getKey(), convertValue(field.getKey(), field.getValue(), coerceTypes)));
+            event.getExtension(true, includeCustomExtensions).entrySet().forEach(field -> values.put(field.getKey(), convertValue(field.getKey() ,field.getValue(), coerceTypes)));
+
+            for (final String fieldName : schema.getFieldNames()) {
+                if (!values.containsKey(fieldName)) {
+                    values.put(fieldName, null);
+                }
+            }
+
+        } catch (final CEFHandlingException e) {
+            throw new MalformedRecordException("Error during extracting information from CEF event", e);
+        }
+
+        if (rawMessageField != null) {
+            values.put(rawMessageField, line);
+        }
+
+        return new MapRecord(schema, values, true, dropUnknownFields);
+    }
+
+    private String nextLine() throws IOException {
+        String line;
+
+        while((line = reader.readLine()) != null) {
+            if (!line.isEmpty()) {
+                break;
+            }
+        }
+
+        return line;
+    }
+
+    private Object convertValue(final String fieldName, final Object fieldValue, final boolean coerceType) {
+        final DataType dataType = schema.getDataType(fieldName).orElse(RecordFieldType.STRING.getDataType());
+
+        return coerceType
+            ? convert(fieldValue, dataType, fieldName)
+            : convertSimpleIfPossible(fieldValue, dataType, fieldName);
+    }
+
+    private final Object convert(final Object value, final DataType dataType, final String fieldName) {
+        return DataTypeUtils.convertType(prepareValue(value), dataType, DATE_FORMAT_SUPPLIER, TIME_FORMAT_SUPPLIER, DATETIME_FORMAT_SUPPLIER, fieldName);
+    }
+
+    private final Object convertSimpleIfPossible(final Object value, final DataType dataType, final String fieldName) {

Review comment:
       The `final` keyword is unnecessary on private methods.
   ```suggestion
       private Object convertSimpleIfPossible(final Object value, final DataType dataType, final String fieldName) {
   ```

##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFUtil.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.junit.Assert;
+
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class TestCEFUtil {
+    static final String RAW_FIELD = "raw";
+    static final String RAW_VALUE = "Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|";
+
+    static final String INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY = "src/test/resources/cef/single-row-header-fields-only.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EXTENSIONS = "src/test/resources/cef/single-row-with-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EMPTY_EXTENSION = "src/test/resources/cef/single-row-with-empty-extension.txt";
+    static final String INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS = "src/test/resources/cef/single-row-with-custom-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EMPTY_CUSTOM_EXTENSIONS = "src/test/resources/cef/single-row-with-empty-custom-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_INCORRECT_HEADER_FIELD = "src/test/resources/cef/single-row-with-incorrect-header-field.txt";
+    static final String INPUT_SINGLE_ROW_WITH_INCORRECT_CUSTOM_EXTENSIONS = "src/test/resources/cef/single-row-with-incorrect-custom-extensions.txt";
+    static final String INPUT_EMPTY_ROW = "src/test/resources/cef/empty-row.txt";
+    static final String INPUT_MISFORMATTED_ROW = "src/test/resources/cef/misformatted-row.txt";
+    static final String INPUT_MULTIPLE_IDENTICAL_ROWS = "src/test/resources/cef/multiple-rows.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_DIFFERENT_CUSTOM_TYPES = "src/test/resources/cef/multiple-rows-with-different-custom-types.txt";
+    static final String INPUT_MULTIPLE_ROWS_STARTING_WITH_EMPTY_ROW = "src/test/resources/cef/multiple-rows-starting-with-empty-row.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_EMPTY_ROWS = "src/test/resources/cef/multiple-rows-with-empty-rows.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_DECREASING_NUMBER_OF_EXTENSIONS = "src/test/resources/cef/multiple-rows-decreasing-number-of-extensions.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_INCREASING_NUMBER_OF_EXTENSIONS = "src/test/resources/cef/multiple-rows-increasing-number-of-extensions.txt";
+
+    static final Map<String, Object> EXPECTED_HEADER_VALUES = new HashMap<>();
+    static final Map<String, Object> EXPECTED_EXTENSION_VALUES = new HashMap<>();
+
+    static final String CUSTOM_EXTENSION_FIELD_NAME = "loginsequence";
+    static final RecordField CUSTOM_EXTENSION_FIELD = new RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, RecordFieldType.INT.getDataType());
+    static final RecordField CUSTOM_EXTENSION_FIELD_AS_STRING = new RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, RecordFieldType.STRING.getDataType());
+    static final RecordField CUSTOM_EXTENSION_FIELD_AS_CHOICE = new RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, RecordFieldType.CHOICE.getChoiceDataType(
+            RecordFieldType.FLOAT.getDataType(), RecordFieldType.STRING.getDataType()
+    ));
+
+    static {
+        EXPECTED_HEADER_VALUES.put("version", Integer.valueOf(0));
+        EXPECTED_HEADER_VALUES.put("deviceVendor", "Company");
+        EXPECTED_HEADER_VALUES.put("deviceProduct", "Product");
+        EXPECTED_HEADER_VALUES.put("deviceVersion", "1.2.3");
+        EXPECTED_HEADER_VALUES.put("deviceEventClassId", "audit-login");
+        EXPECTED_HEADER_VALUES.put("name", "Successful login");
+        EXPECTED_HEADER_VALUES.put("severity", "3");
+
+        EXPECTED_EXTENSION_VALUES.put("cn1Label", "userid");
+        EXPECTED_EXTENSION_VALUES.put("spt", Integer.valueOf(46117));
+        EXPECTED_EXTENSION_VALUES.put("cn1", Long.valueOf(99999));
+        EXPECTED_EXTENSION_VALUES.put("cfp1", Float.valueOf(1.23F));

Review comment:
       The `valueOf()` wrapping calls are unnecessary since the compiler will handle boxing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on a change in pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on a change in pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#discussion_r791996926



##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFSchemaUtil.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+final class CEFSchemaUtil {
+
+    private static final List<RecordField> HEADER_FIELDS = new ArrayList<>();
+
+    static {
+        // Reference states that severity might be represented by integer values (0-10) and string values (like High) too
+        HEADER_FIELDS.add(new RecordField("version", RecordFieldType.INT.getDataType()));
+        HEADER_FIELDS.add(new RecordField("deviceVendor", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("deviceProduct", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("deviceVersion", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("deviceEventClassId", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("severity", RecordFieldType.STRING.getDataType()));
+    }
+
+    // Fields known by the CEF Extension Dictionary in version 23
+    private static final Set<String> EXTENSIONS_STRING = new HashSet<>(Arrays.asList("act", "app", "c6a1Label", "c6a2Label", "c6a3Label",

Review comment:
       It would be very helpful to declare these values one per line, ideally alphabetized.  Although it would take some effort, it would make it much easier to maintain. On the other hand, would it be better to make String the default fallback type and avoid having to declare all of these fields?

##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/ValidateLocale.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+import java.util.Locale;
+
+/**
+ * This class is identical to {@code org.apache.nifi.processors.standard.ParseCEF.ValidateLocale}.

Review comment:
       In light of this comment, did you consider creating a new module under `nifi-commons` to avoid duplicating this Validator class?

##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFReader.java
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestCEFReader {
+    private TestRunner runner;
+    private TestCEFProcessor processor;
+    private CEFReader reader;
+
+    @Before
+    public void setUp() {
+        runner = null;
+        processor = null;
+        reader = null;
+    }
+
+    @Test
+    public void testValidatingReaderWhenRawFieldValueIsInvalid() throws Exception {
+        // given
+        givenReaderSetUp();
+        givenRawFieldIs("dst"); // Invalid because there is an extension with the same name
+        givenSchemaIsInferred(CEFReader.HEADERS_ONLY);
+
+        // when & then
+        thenAssertInvalid();

Review comment:
       The method naming does not follow the usual test code conventions.  Although the quality and style of current tests varies widely, we should avoid introducing more `given / when / then` comments and method names.  Having reusable test methods for setup and higher level assertions is helpful, but the names should avoid the behavior-driven naming.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] exceptionfactory commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-1024268213


   Thanks for the updates and relocating to the DataType inference utilities @simonbence! This looks ready to merge pending a clean build. Will run through one more round of verification.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-1014770503


   Thanks @simonbence - the catch you describe is perfectly fine and is something I'm expecting to be honest. Users can easily remove this field if this is problematic for them.
   
   I made another round of testing and this is working perfectly (at least, compared to my expectations :)). A follow up improvement would be to submit a pull request against the third party library so that we can get the reason why the parsing failed for a given record (this is currently something we can only get by looking in the logs). Maybe I'll find the motivation to look into this in the future.
   
   From a pure feature point of view, I'm a +1 with the pull request given my tests. @exceptionfactory do you want to have another look at this? any recommendations/comments from a code perspective? are you with the dep. updates?
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#discussion_r792576830



##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/cef/CEFSchemaUtil.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+final class CEFSchemaUtil {
+
+    private static final List<RecordField> HEADER_FIELDS = new ArrayList<>();
+
+    static {
+        // Reference states that severity might be represented by integer values (0-10) and string values (like High) too
+        HEADER_FIELDS.add(new RecordField("version", RecordFieldType.INT.getDataType()));
+        HEADER_FIELDS.add(new RecordField("deviceVendor", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("deviceProduct", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("deviceVersion", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("deviceEventClassId", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+        HEADER_FIELDS.add(new RecordField("severity", RecordFieldType.STRING.getDataType()));
+    }
+
+    // Fields known by the CEF Extension Dictionary in version 23
+    private static final Set<String> EXTENSIONS_STRING = new HashSet<>(Arrays.asList("act", "app", "c6a1Label", "c6a2Label", "c6a3Label",

Review comment:
       The order is deliberate: it follows the list of possible extensions in the CEF Extension Dictionary. Because of the expectations of the CEF format (agains date types as well), falling back to String does not seems to be a good idea here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
simonbence commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-1007774608


   > 
   
   Thank you for your invested time and insights! With most of the design decisions I followed the existing patterns but your concerns are valid. Based on your suggestions, I created a possible solution, please take a look. I added some new test case as well which might make it easier to understand the possible implications of this change. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-1007342283


   Hey @simonbence - I gave this another try and I'm a bit concerned on how this would work when processing a flow file containing a lot of CEF logs but with one line not being correctly formatted according to CEF specifications. Right now, if a single line cannot be parsed, the whole flow file would fail (when using a processor such as ConvertRecord). Can we think of a solution to always successfully parse a line but parse it in a specific way so that filtering between "good" and "bad" records can be done? I'm wondering if, similarly to the property RAW_FIELD, we could have an INVALID_FIELD, and if specified, the line would not be parsed but a record with a single field containing the raw line would be produced. This way, when using a processor like QueryRecord processor, it'd be super easy to do the conversion from CEF to JSON while also filtering bad and good records in two distinct flow files. Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
simonbence commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-987258188


   > I gave this a try (I didn't look at the code though) and this is a really nice improvement! I'd recommend updating this PR to also include the changes added in #5555. Once done, I'll run more tests with some data I have on my systems.
   
   Thank you very much for bringing this to my attention, this is a valuabe addition indeed! I encorporated this to the reader's functionality. Let me also highlight two smaller details:
   - With "standard extensions" the behaviour of the scema inference might be unexpected but this is because of the reader mimics the extension dictionary
   - In case of String typed extensions the parser looks to be works somewhat different. I added tests regarding and I hope you do not mind that I extended your test with this discovery as well.
   
   Again, thank you for your invested time!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
simonbence commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-1014249231


   > Yep, case 1 is what I was expecting but I was surprised by the behavior of case 2 :)
   
   I refined the handling of the invalid (from CEF parser side) records and I think it is now in align with what you suggested. There is a catch however, I would like to highlight before closing the thread: if the invalid field is set, now became permanently part of the schema, which -depending on the writer- might result empty invalid fields in the outgoing record even for healty events. In your exampe, with default JsonRecordSetWriter settings this will happen. Of course changing the value of the "Suppress Null Values" will fix this, but this is something to know. Also, not every writer will be able to suppress this, like for example the CSV Writerr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pvillard31 commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-1011403675


   Hey @simonbence - gave this another try and I think there is a problem.
   
   I use a file where I have thousands of CEF events including some not correctly formatted.
   
   - 1st case: ConvertRecord (CEF Reader / JSON Writer). Raw property is set. Invalid property is not set. The ConvertRecord fails and the whole flow file goes to failure without any conversion.
   - 2nd case: ConvertRecord (CEF Reader / JSON Writer). Raw property is set. Invalid property is set. The ConvertRecord does not fail and the whole flow file goes to success with successful conversion in JSON. However there is no record containing the invalid field and all records are containing the raw field.
   
   I don't think this is the expected behavior, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
simonbence commented on pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#issuecomment-1012066630


   > 
   
   Hi @pvillard31 !
   
   Thanks for checking! Case 1 looks reasonable to me: as far as I aware, readers when facing with parsing issues will throw exception (AvroRecordReader looks like a good example as Avro has pretty bounded formatting just like CEF) which will be handled by the processor somehow. ConvertRecord inherits AbstractRecordProcessor where the basic behaviour for these cases is to propogate the exception wrapped into a ProcessException. So all in all this looks to the "usual behaviour" for readers. As for Case 2, it looks less fortunate, I need to check it. I will find you back with my findings.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #5530: NIFI-9341 Adding record reader for CEF events

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #5530:
URL: https://github.com/apache/nifi/pull/5530#discussion_r794301478



##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/cef/TestCEFUtil.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cef;
+
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.junit.Assert;
+
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class TestCEFUtil {
+    static final String RAW_FIELD = "raw";
+    static final String RAW_VALUE = "Oct 12 04:16:11 localhost CEF:0|Company|Product|1.2.3|audit-login|Successful login|3|";
+
+    static final String INPUT_SINGLE_ROW_HEADER_FIELDS_ONLY = "src/test/resources/cef/single-row-header-fields-only.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EXTENSIONS = "src/test/resources/cef/single-row-with-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EMPTY_EXTENSION = "src/test/resources/cef/single-row-with-empty-extension.txt";
+    static final String INPUT_SINGLE_ROW_WITH_CUSTOM_EXTENSIONS = "src/test/resources/cef/single-row-with-custom-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_EMPTY_CUSTOM_EXTENSIONS = "src/test/resources/cef/single-row-with-empty-custom-extensions.txt";
+    static final String INPUT_SINGLE_ROW_WITH_INCORRECT_HEADER_FIELD = "src/test/resources/cef/single-row-with-incorrect-header-field.txt";
+    static final String INPUT_SINGLE_ROW_WITH_INCORRECT_CUSTOM_EXTENSIONS = "src/test/resources/cef/single-row-with-incorrect-custom-extensions.txt";
+    static final String INPUT_EMPTY_ROW = "src/test/resources/cef/empty-row.txt";
+    static final String INPUT_MISFORMATTED_ROW = "src/test/resources/cef/misformatted-row.txt";
+    static final String INPUT_MULTIPLE_IDENTICAL_ROWS = "src/test/resources/cef/multiple-rows.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_DIFFERENT_CUSTOM_TYPES = "src/test/resources/cef/multiple-rows-with-different-custom-types.txt";
+    static final String INPUT_MULTIPLE_ROWS_STARTING_WITH_EMPTY_ROW = "src/test/resources/cef/multiple-rows-starting-with-empty-row.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_EMPTY_ROWS = "src/test/resources/cef/multiple-rows-with-empty-rows.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_DECREASING_NUMBER_OF_EXTENSIONS = "src/test/resources/cef/multiple-rows-decreasing-number-of-extensions.txt";
+    static final String INPUT_MULTIPLE_ROWS_WITH_INCREASING_NUMBER_OF_EXTENSIONS = "src/test/resources/cef/multiple-rows-increasing-number-of-extensions.txt";
+
+    static final Map<String, Object> EXPECTED_HEADER_VALUES = new HashMap<>();
+    static final Map<String, Object> EXPECTED_EXTENSION_VALUES = new HashMap<>();
+
+    static final String CUSTOM_EXTENSION_FIELD_NAME = "loginsequence";
+    static final RecordField CUSTOM_EXTENSION_FIELD = new RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, RecordFieldType.INT.getDataType());
+    static final RecordField CUSTOM_EXTENSION_FIELD_AS_STRING = new RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, RecordFieldType.STRING.getDataType());
+    static final RecordField CUSTOM_EXTENSION_FIELD_AS_CHOICE = new RecordField(TestCEFUtil.CUSTOM_EXTENSION_FIELD_NAME, RecordFieldType.CHOICE.getChoiceDataType(
+            RecordFieldType.FLOAT.getDataType(), RecordFieldType.STRING.getDataType()
+    ));
+
+    static {
+        EXPECTED_HEADER_VALUES.put("version", Integer.valueOf(0));
+        EXPECTED_HEADER_VALUES.put("deviceVendor", "Company");
+        EXPECTED_HEADER_VALUES.put("deviceProduct", "Product");
+        EXPECTED_HEADER_VALUES.put("deviceVersion", "1.2.3");
+        EXPECTED_HEADER_VALUES.put("deviceEventClassId", "audit-login");
+        EXPECTED_HEADER_VALUES.put("name", "Successful login");
+        EXPECTED_HEADER_VALUES.put("severity", "3");
+
+        EXPECTED_EXTENSION_VALUES.put("cn1Label", "userid");
+        EXPECTED_EXTENSION_VALUES.put("spt", Integer.valueOf(46117));
+        EXPECTED_EXTENSION_VALUES.put("cn1", Long.valueOf(99999));
+        EXPECTED_EXTENSION_VALUES.put("cfp1", Float.valueOf(1.23F));

Review comment:
       This is intentional: even if it is not necessary, it highlights what I wanted to test here, this communicates my intents somewhat more clrear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org