You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/02/23 05:50:38 UTC

[nifi] branch main updated: NIFI-8232 CSV Parsers optionally allow/reject duplicate header names

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 3cb26ae  NIFI-8232 CSV Parsers optionally allow/reject duplicate header names
3cb26ae is described below

commit 3cb26aec728b578a9ccf397a338b4c7fcc334d89
Author: Chris Sampson <ch...@digital.homeoffice.gov.uk>
AuthorDate: Wed Feb 17 21:45:32 2021 +0000

    NIFI-8232 CSV Parsers optionally allow/reject duplicate header names
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4828.
---
 .../main/java/org/apache/nifi/csv/CSVUtils.java    | 41 ++++++++----
 .../java/org/apache/nifi/csv/CSVUtilsTest.java     | 23 ++++---
 .../main/java/org/apache/nifi/csv/CSVReader.java   | 15 +++--
 .../apache/nifi/csv/JacksonCSVRecordReader.java    | 15 +++++
 .../nifi/csv/TestCSVHeaderSchemaStrategy.java      | 30 ++++++++-
 .../org/apache/nifi/csv/TestCSVRecordReader.java   | 77 ++++++++++++++++------
 .../nifi/csv/TestJacksonCSVRecordReader.java       | 57 +++++++++++++---
 7 files changed, 201 insertions(+), 57 deletions(-)

diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java
index da0eaef..bb45292 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java
@@ -33,7 +33,7 @@ import java.util.Map;
 
 public class CSVUtils {
 
-    private static Logger LOG = LoggerFactory.getLogger(CSVUtils.class);
+    private static final Logger LOG = LoggerFactory.getLogger(CSVUtils.class);
 
     public static final AllowableValue CUSTOM = new AllowableValue("custom", "Custom Format",
         "The format of the CSV is configured by using the properties of this Controller Service, such as Value Separator");
@@ -136,6 +136,20 @@ public class CSVUtils {
         .defaultValue("UTF-8")
         .required(true)
         .build();
+    public static final PropertyDescriptor ALLOW_DUPLICATE_HEADER_NAMES = new PropertyDescriptor.Builder()
+        .name("csvutils-allow-duplicate-header-names")
+        .displayName("Allow Duplicate Header Names")
+        .description("Whether duplicate header names are allowed. Header names are case-sensitive, for example \"name\" and \"Name\" are treated as separate fields. " +
+                "Handling of duplicate header names is CSV Parser specific (where applicable):\n" +
+                "* Apache Commons CSV - duplicate headers will result in column data \"shifting\" right with new fields " +
+                "created for \"unknown_field_index_X\" where \"X\" is the CSV column index number\n" +
+                "* Jackson CSV - duplicate headers will be de-duplicated with the field value being that of the right-most " +
+                "duplicate CSV column")
+        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(false)
+        .build();
 
     // CSV Format fields for writers only
     public static final AllowableValue QUOTE_ALL = new AllowableValue("ALL", "Quote All Values", "All values will be quoted using the configured quote character.");
@@ -177,6 +191,10 @@ public class CSVUtils {
         .required(true)
         .build();
 
+    private CSVUtils() {
+        // intentionally blank, prevents instantiation
+    }
+
     public static boolean isDynamicCSVFormat(final PropertyContext context) {
         final String formatName = context.getProperty(CSV_FORMAT).getValue();
         return formatName.equalsIgnoreCase(CUSTOM.getValue())
@@ -208,8 +226,8 @@ public class CSVUtils {
         }
     }
 
-    private static Character getCharUnescapedJava(final PropertyContext context, final PropertyDescriptor property, final Map<String, String> variables) {
-        String value = context.getProperty(property).evaluateAttributeExpressions(variables).getValue();
+    private static Character getValueSeparatorCharUnescapedJava(final PropertyContext context, final Map<String, String> variables) {
+        String value = context.getProperty(VALUE_SEPARATOR).evaluateAttributeExpressions(variables).getValue();
 
         if (value != null) {
             String unescaped = unescapeJava(value);
@@ -218,13 +236,9 @@ public class CSVUtils {
             }
         }
 
-        LOG.warn("'{}' property evaluated to an invalid value: \"{}\". It must be a single character. The property value will be ignored.", property.getName(), value);
+        LOG.warn("'{}' property evaluated to an invalid value: \"{}\". It must be a single character. The property value will be ignored.", VALUE_SEPARATOR.getName(), value);
 
-        if (property.getDefaultValue() != null) {
-            return property.getDefaultValue().charAt(0);
-        } else {
-            return null;
-        }
+        return VALUE_SEPARATOR.getDefaultValue().charAt(0);
     }
 
     private static Character getCharUnescaped(final PropertyContext context, final PropertyDescriptor property, final Map<String, String> variables) {
@@ -247,7 +261,7 @@ public class CSVUtils {
     }
 
     private static CSVFormat buildCustomFormat(final PropertyContext context, final Map<String, String> variables) {
-        final Character valueSeparator = getCharUnescapedJava(context, VALUE_SEPARATOR, variables);
+        final Character valueSeparator = getValueSeparatorCharUnescapedJava(context, variables);
         CSVFormat format = CSVFormat.newFormat(valueSeparator)
             .withAllowMissingColumnNames()
             .withIgnoreEmptyLines();
@@ -293,6 +307,11 @@ public class CSVUtils {
             format = format.withRecordSeparator(separator);
         }
 
+        final PropertyValue allowDuplicateHeaderNames = context.getProperty(ALLOW_DUPLICATE_HEADER_NAMES);
+        if (allowDuplicateHeaderNames != null && allowDuplicateHeaderNames.isSet()) {
+            format = format.withAllowDuplicateHeaderNames(allowDuplicateHeaderNames.asBoolean());
+        }
+
         return format;
     }
 
@@ -306,7 +325,7 @@ public class CSVUtils {
 
     public static String unescape(final String input) {
         if (input == null) {
-            return input;
+            return null;
         }
 
         return input.replace("\\t", "\t")
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/csv/CSVUtilsTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/csv/CSVUtilsTest.java
index b3f4e08..ca42eca 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/csv/CSVUtilsTest.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/csv/CSVUtilsTest.java
@@ -36,7 +36,7 @@ public class CSVUtilsTest {
 
     @Test
     public void testIsDynamicCSVFormatWithStaticProperties() {
-        PropertyContext context = createContext("|", "'", "^", "~");
+        PropertyContext context = createContext("|", "'", "^", "~", "true");
 
         boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);
 
@@ -45,7 +45,7 @@ public class CSVUtilsTest {
 
     @Test
     public void testIsDynamicCSVFormatWithDynamicValueSeparator() {
-        PropertyContext context = createContext("${csv.delimiter}", "'", "^", "~");
+        PropertyContext context = createContext("${csv.delimiter}", "'", "^", "~", "true");
 
         boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);
 
@@ -54,7 +54,7 @@ public class CSVUtilsTest {
 
     @Test
     public void testIsDynamicCSVFormatWithDynamicQuoteCharacter() {
-        PropertyContext context = createContext("|", "${csv.quote}", "^", "~");
+        PropertyContext context = createContext("|", "${csv.quote}", "^", "~", "true");
 
         boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);
 
@@ -63,7 +63,7 @@ public class CSVUtilsTest {
 
     @Test
     public void testIsDynamicCSVFormatWithDynamicEscapeCharacter() {
-        PropertyContext context = createContext("|", "'", "${csv.escape}", "~");
+        PropertyContext context = createContext("|", "'", "${csv.escape}", "~", "true");
 
         boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);
 
@@ -72,7 +72,7 @@ public class CSVUtilsTest {
 
     @Test
     public void testIsDynamicCSVFormatWithDynamicCommentMarker() {
-        PropertyContext context = createContext("|", "'", "^", "${csv.comment}");
+        PropertyContext context = createContext("|", "'", "^", "${csv.comment}", "true");
 
         boolean isDynamicCSVFormat = CSVUtils.isDynamicCSVFormat(context);
 
@@ -81,7 +81,7 @@ public class CSVUtilsTest {
 
     @Test
     public void testCustomFormat() {
-        PropertyContext context = createContext("|", "'", "^", "~");
+        PropertyContext context = createContext("|", "'", "^", "~", "true");
 
         CSVFormat csvFormat = CSVUtils.createCSVFormat(context, Collections.emptyMap());
 
@@ -89,11 +89,12 @@ public class CSVUtilsTest {
         assertEquals('\'', (char) csvFormat.getQuoteCharacter());
         assertEquals('^', (char) csvFormat.getEscapeCharacter());
         assertEquals('~', (char) csvFormat.getCommentMarker());
+        assertTrue(csvFormat.getAllowDuplicateHeaderNames());
     }
 
     @Test
     public void testCustomFormatWithEL() {
-        PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}");
+        PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}", "false");
 
         Map<String, String> attributes = new HashMap<>();
         attributes.put("csv.delimiter", "|");
@@ -107,11 +108,12 @@ public class CSVUtilsTest {
         assertEquals('\'', (char) csvFormat.getQuoteCharacter());
         assertEquals('^', (char) csvFormat.getEscapeCharacter());
         assertEquals('~', (char) csvFormat.getCommentMarker());
+        assertFalse(csvFormat.getAllowDuplicateHeaderNames());
     }
 
     @Test
     public void testCustomFormatWithELEmptyValues() {
-        PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}");
+        PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}", "true");
 
         CSVFormat csvFormat = CSVUtils.createCSVFormat(context, Collections.emptyMap());
 
@@ -123,7 +125,7 @@ public class CSVUtilsTest {
 
     @Test
     public void testCustomFormatWithELInvalidValues() {
-        PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}");
+        PropertyContext context = createContext("${csv.delimiter}", "${csv.quote}", "${csv.escape}", "${csv.comment}", "true");
 
         Map<String, String> attributes = new HashMap<>();
         attributes.put("csv.delimiter", "invalid");
@@ -139,13 +141,14 @@ public class CSVUtilsTest {
         assertNull(csvFormat.getCommentMarker());
     }
 
-    private PropertyContext createContext(String valueSeparator, String quoteChar, String escapeChar, String commentMarker) {
+    private PropertyContext createContext(String valueSeparator, String quoteChar, String escapeChar, String commentMarker, String allowDuplicateHeaderNames) {
         Map<PropertyDescriptor, String> properties = new HashMap<>();
 
         properties.put(CSVUtils.VALUE_SEPARATOR, valueSeparator);
         properties.put(CSVUtils.QUOTE_CHAR, quoteChar);
         properties.put(CSVUtils.ESCAPE_CHAR, escapeChar);
         properties.put(CSVUtils.COMMENT_MARKER, commentMarker);
+        properties.put(CSVUtils.ALLOW_DUPLICATE_HEADER_NAMES, allowDuplicateHeaderNames);
 
         return new MockConfigurationContext(properties, null);
     }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index a70ccf5..4774a17 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -110,6 +110,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
         properties.add(CSVUtils.NULL_STRING);
         properties.add(CSVUtils.TRIM_FIELDS);
         properties.add(CSVUtils.CHARSET);
+        properties.add(CSVUtils.ALLOW_DUPLICATE_HEADER_NAMES);
         return properties;
     }
 
@@ -146,17 +147,17 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
         final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(in), null);
         in.reset();
 
-        CSVFormat csvFormat;
+        final CSVFormat format;
         if (this.csvFormat != null) {
-            csvFormat = this.csvFormat;
+            format = this.csvFormat;
         } else {
-            csvFormat = CSVUtils.createCSVFormat(context, variables);
+            format = CSVUtils.createCSVFormat(context, variables);
         }
 
-        if(APACHE_COMMONS_CSV.getValue().equals(csvParser)) {
-            return new CSVRecordReader(in, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
-        } else if(JACKSON_CSV.getValue().equals(csvParser)) {
-            return new JacksonCSVRecordReader(in, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
+        if (APACHE_COMMONS_CSV.getValue().equals(csvParser)) {
+            return new CSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
+        } else if (JACKSON_CSV.getValue().equals(csvParser)) {
+            return new JacksonCSVRecordReader(in, logger, schema, format, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, charSet);
         } else {
             throw new IOException("Parser not supported");
         }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
index 975a9b1..f3c3acc 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
@@ -24,9 +24,11 @@ import java.io.Reader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.io.input.BOMInputStream;
@@ -49,6 +51,7 @@ import com.fasterxml.jackson.dataformat.csv.CsvSchema;
 public class JacksonCSVRecordReader extends AbstractCSVRecordReader {
     private final MappingIterator<String[]> recordStream;
     private List<String> rawFieldNames = null;
+    private boolean allowDuplicateHeaderNames;
 
     private volatile static CsvMapper mapper = new CsvMapper().enable(CsvParser.Feature.WRAP_AS_ARRAY);
 
@@ -75,6 +78,7 @@ public class JacksonCSVRecordReader extends AbstractCSVRecordReader {
                 csvSchemaBuilder = csvSchemaBuilder.setSkipFirstDataRow(true);
             }
         }
+        allowDuplicateHeaderNames = csvFormat.getAllowDuplicateHeaderNames();
 
         CsvSchema csvSchema = csvSchemaBuilder.build();
 
@@ -108,6 +112,17 @@ public class JacksonCSVRecordReader extends AbstractCSVRecordReader {
                     rawFieldNames = schema.getFieldNames();
                 } else {
                     rawFieldNames = Arrays.asList(csvRecord);
+                    if (rawFieldNames.size() > schema.getFieldCount() && !allowDuplicateHeaderNames) {
+                        final Set<String> deDupe = new HashSet<>(schema.getFieldCount());
+                        for (final String name : rawFieldNames) {
+                            if (!deDupe.add(name)) {
+                                    throw new IllegalArgumentException(String.format(
+                                            "The header contains a duplicate name: \"%s\" in %s. If this is valid then use CSVFormat.withAllowDuplicateHeaderNames().",
+                                            name, rawFieldNames
+                                    ));
+                            }
+                        }
+                    }
 
                     // Advance the stream to keep the record count correct
                     if (recordStream.hasNext()) {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java
index e61d2a6..8d057bf 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java
@@ -67,8 +67,36 @@ public class TestCSVHeaderSchemaStrategy {
     }
 
     @Test
+    public void testContainsDuplicateHeaderNames() throws SchemaNotFoundException, IOException {
+        final String headerLine = "a, a, b";
+        final byte[] headerBytes = headerLine.getBytes();
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(CSVUtils.CSV_FORMAT, CSVUtils.CUSTOM.getValue());
+        properties.put(CSVUtils.COMMENT_MARKER, "#");
+        properties.put(CSVUtils.VALUE_SEPARATOR, ",");
+        properties.put(CSVUtils.TRIM_FIELDS, "true");
+        properties.put(CSVUtils.QUOTE_CHAR, "\"");
+        properties.put(CSVUtils.ESCAPE_CHAR, "\\");
+
+        final ConfigurationContext context = new MockConfigurationContext(properties, null);
+        final CSVHeaderSchemaStrategy strategy = new CSVHeaderSchemaStrategy(context);
+
+        final RecordSchema schema;
+        try (final InputStream bais = new ByteArrayInputStream(headerBytes)) {
+            schema = strategy.getSchema(null, bais, null);
+        }
+
+        final List<String> expectedFieldNames = Arrays.asList("a", "b");
+        assertEquals(expectedFieldNames, schema.getFieldNames());
+
+        assertTrue(schema.getFields().stream()
+                .allMatch(field -> field.getDataType().equals(RecordFieldType.STRING.getDataType())));
+    }
+
+    @Test
     public void testWithEL() throws SchemaNotFoundException, IOException {
-        final String headerLine = "\'a\'; b; c; d; e^;z; f";
+        final String headerLine = "'a'; b; c; d; e^;z; f";
         final byte[] headerBytes = headerLine.getBytes();
 
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
index ca36430..33f350c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
@@ -32,7 +32,6 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.io.ByteArrayInputStream;
-import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -53,6 +52,7 @@ import java.util.TimeZone;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 
 public class TestCSVRecordReader {
     private final DataType doubleDataType = RecordFieldType.DOUBLE.getDataType();
@@ -71,12 +71,6 @@ public class TestCSVRecordReader {
             RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII");
     }
 
-    private CSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format,
-                                         final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
-        return new CSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false,
-                dateFormat, timeFormat, timestampFormat, "ASCII");
-    }
-
     @Test
     public void testUTF8() throws IOException, MalformedRecordException {
         final String text = "name\n黃凱揚";
@@ -178,7 +172,7 @@ public class TestCSVRecordReader {
 
             final Record record = reader.nextRecord(false, false);
             // When the values are not in the expected format, a String is returned unmodified
-            assertEquals("11/30/1983", (String)record.getValue("date"));
+            assertEquals("11/30/1983", record.getValue("date"));
         }
     }
 
@@ -195,7 +189,7 @@ public class TestCSVRecordReader {
                      null, RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
 
             final Record record = reader.nextRecord(false, false);
-            assertEquals("1983-01-01", (String)record.getValue("date"));
+            assertEquals("1983-01-01", record.getValue("date"));
         }
     }
 
@@ -212,7 +206,7 @@ public class TestCSVRecordReader {
                      "", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
 
             final Record record = reader.nextRecord(false, false);
-            assertEquals("1983-01-01", (String)record.getValue("date"));
+            assertEquals("1983-01-01", record.getValue("date"));
         }
     }
 
@@ -252,7 +246,7 @@ public class TestCSVRecordReader {
                      RecordFieldType.DATE.getDefaultFormat(), "HH-MM-SS", RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
 
             final Record record = reader.nextRecord(false, false);
-            assertEquals("01:02:03", (String)record.getValue("time"));
+            assertEquals("01:02:03", record.getValue("time"));
         }
     }
 
@@ -269,7 +263,7 @@ public class TestCSVRecordReader {
                      RecordFieldType.DATE.getDefaultFormat(), null, RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
 
             final Record record = reader.nextRecord(false, false);
-            assertEquals("01:02:03", (String)record.getValue("time"));
+            assertEquals("01:02:03", record.getValue("time"));
         }
     }
 
@@ -286,7 +280,7 @@ public class TestCSVRecordReader {
                      RecordFieldType.DATE.getDefaultFormat(), "", RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
 
             final Record record = reader.nextRecord(false, false);
-            assertEquals("01:02:03", (String)record.getValue("time"));
+            assertEquals("01:02:03", record.getValue("time"));
         }
     }
 
@@ -326,7 +320,7 @@ public class TestCSVRecordReader {
                      RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), "HH-MM-SS", "UTF-8")) {
 
             final Record record = reader.nextRecord(false, false);
-            assertEquals("01:02:03", (String)record.getValue("timestamp"));
+            assertEquals("01:02:03", record.getValue("timestamp"));
         }
     }
 
@@ -338,7 +332,7 @@ public class TestCSVRecordReader {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"));
+        try (final InputStream fis = new FileInputStream("src/test/resources/csv/single-bank-account.csv");
             final CSVRecordReader reader = createReader(fis, schema, format)) {
 
             final Object[] record = reader.nextRecord().getValues();
@@ -351,7 +345,7 @@ public class TestCSVRecordReader {
 
     @Test
     public void testExcelFormat() throws IOException, MalformedRecordException {
-        final List<RecordField> fields = new ArrayList<RecordField>();
+        final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("fieldA", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("fieldB", RecordFieldType.STRING.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
@@ -379,7 +373,7 @@ public class TestCSVRecordReader {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"));
+        try (final InputStream fis = new FileInputStream("src/test/resources/csv/multi-bank-account.csv");
             final CSVRecordReader reader = createReader(fis, schema, format)) {
 
             final Object[] firstRecord = reader.nextRecord().getValues();
@@ -401,7 +395,7 @@ public class TestCSVRecordReader {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"));
+        try (final InputStream fis = new FileInputStream("src/test/resources/csv/extra-white-space.csv");
             final CSVRecordReader reader = createReader(fis, schema, format)) {
 
             final Object[] firstRecord = reader.nextRecord().getValues();
@@ -558,7 +552,6 @@ public class TestCSVRecordReader {
 
             assertNull(reader.nextRecord());
         }
-
     }
 
     @Test
@@ -593,6 +586,50 @@ public class TestCSVRecordReader {
     }
 
     @Test
+    public void testDuplicateHeaderNames() throws IOException, MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, id, name, name, balance, BALANCE, address, city, state, zipCode, country";
+        final String inputRecord = "1, Another ID, John, Smith, 40.80, 10.20, 123 My Street, My City, MS, 11111, USA";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        // test nextRecord has shifted data columns right by 1 after the duplicate "id" & "name" header names
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final CSVRecordReader reader = createReader(bais, schema, format)) {
+
+            final Record record = reader.nextRecord(false, false);
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("Another ID", record.getValue("name"));
+            assertEquals("John", record.getValue("balance"));
+            assertEquals("Smith", record.getValue("BALANCE"));
+            assertEquals("40.80", record.getValue("address"));
+            assertEquals("10.20", record.getValue("city"));
+            assertEquals("123 My Street", record.getValue("state"));
+            assertEquals("My City", record.getValue("zipCode"));
+            assertEquals("MS", record.getValue("country"));
+            assertEquals("11111", record.getValue("unknown_field_index_9"));
+            assertEquals("USA", record.getValue("unknown_field_index_10"));
+
+            assertNull(reader.nextRecord(false, false));
+        }
+
+        // confirm duplicate headers cause an exception when requested
+        final CSVFormat disallowDuplicateHeadersFormat = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"').withAllowDuplicateHeaderNames(false);
+        try (final InputStream bais = new ByteArrayInputStream(inputData)) {
+            final IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> createReader(bais, schema, disallowDuplicateHeadersFormat));
+            assertEquals(
+                    "The header contains a duplicate name: \"id\" in [id, id, name, name, balance, BALANCE, address, city, state, zipCode, country]. " +
+                            "If this is valid then use CSVFormat.withAllowDuplicateHeaderNames().",
+                    iae.getMessage()
+            );
+        }
+    }
+
+    @Test
     public void testMultipleRecordsEscapedWithSpecialChar() throws IOException, MalformedRecordException {
 
         char delimiter = StringEscapeUtils.unescapeJava("\u0001").charAt(0);
@@ -603,7 +640,7 @@ public class TestCSVRecordReader {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account_escapedchar.csv"));
+        try (final InputStream fis = new FileInputStream("src/test/resources/csv/multi-bank-account_escapedchar.csv");
             final CSVRecordReader reader = createReader(fis, schema, format)) {
 
             final Object[] firstRecord = reader.nextRecord().getValues();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
index a9585ce..8026da0 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestJacksonCSVRecordReader.java
@@ -32,7 +32,6 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.io.ByteArrayInputStream;
-import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -45,6 +44,7 @@ import java.util.TimeZone;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 
 public class TestJacksonCSVRecordReader {
     private final DataType doubleDataType = RecordFieldType.DOUBLE.getDataType();
@@ -113,7 +113,7 @@ public class TestJacksonCSVRecordReader {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"));
+        try (final InputStream fis = new FileInputStream("src/test/resources/csv/single-bank-account.csv");
             final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
 
             final Object[] record = reader.nextRecord().getValues();
@@ -126,7 +126,7 @@ public class TestJacksonCSVRecordReader {
 
     @Test
     public void testExcelFormat() throws IOException, MalformedRecordException {
-        final List<RecordField> fields = new ArrayList<RecordField>();
+        final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("fieldA", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("fieldB", RecordFieldType.STRING.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
@@ -154,7 +154,7 @@ public class TestJacksonCSVRecordReader {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"));
+        try (final InputStream fis = new FileInputStream("src/test/resources/csv/multi-bank-account.csv");
             final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
 
             final Object[] firstRecord = reader.nextRecord().getValues();
@@ -176,7 +176,7 @@ public class TestJacksonCSVRecordReader {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"));
+        try (final InputStream fis = new FileInputStream("src/test/resources/csv/extra-white-space.csv");
             final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
 
             final Object[] firstRecord = reader.nextRecord().getValues();
@@ -333,7 +333,6 @@ public class TestJacksonCSVRecordReader {
 
             assertNull(reader.nextRecord());
         }
-
     }
 
     @Test
@@ -368,6 +367,48 @@ public class TestJacksonCSVRecordReader {
     }
 
     @Test
+    public void testDuplicateHeaderNames() throws IOException, MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, id, name, name, balance, BALANCE, address, city, state, zipCode, country";
+        final String inputRecord = "1, Another ID, John, Smith, 40.80, 10.20, 123 My Street, My City, MS, 11111, USA";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        // test nextRecord has ignored the first "id" and "name" columns
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final JacksonCSVRecordReader reader = createReader(bais, schema, format)) {
+
+            final Record record = reader.nextRecord(false, false);
+            assertNotNull(record);
+
+            assertEquals("Another ID", record.getValue("id"));
+            assertEquals("Smith", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertEquals("USA", record.getValue("country"));
+
+            assertNull(reader.nextRecord(false, false));
+        }
+
+        // confirm duplicate headers cause an exception when requested
+        final CSVFormat disallowDuplicateHeadersFormat = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"').withAllowDuplicateHeaderNames(false);
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final JacksonCSVRecordReader reader = createReader(bais, schema, disallowDuplicateHeadersFormat)) {
+            final IllegalArgumentException iae = assertThrows(IllegalArgumentException.class, () -> reader.nextRecord(false, false));
+            assertEquals(
+                    "The header contains a duplicate name: \"id\" in [id, id, name, name, balance, BALANCE, address, city, state, zipCode, country]. " +
+                            "If this is valid then use CSVFormat.withAllowDuplicateHeaderNames().",
+                    iae.getMessage()
+            );
+        }
+    }
+
+    @Test
     public void testMultipleRecordsEscapedWithSpecialChar() throws IOException, MalformedRecordException {
 
         char delimiter = StringEscapeUtils.unescapeJava("\u0001").charAt(0);
@@ -378,7 +419,7 @@ public class TestJacksonCSVRecordReader {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account_escapedchar.csv"));
+        try (final InputStream fis = new FileInputStream("src/test/resources/csv/multi-bank-account_escapedchar.csv");
             final JacksonCSVRecordReader reader = createReader(fis, schema, format)) {
 
             final Object[] firstRecord = reader.nextRecord().getValues();
@@ -400,7 +441,7 @@ public class TestJacksonCSVRecordReader {
 
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"));
+        try (final InputStream fis = new FileInputStream("src/test/resources/csv/single-bank-account.csv");
              final JacksonCSVRecordReader reader = createReader(fis, schema, formatWithNullRecordSeparator)) {
 
             final Object[] record = reader.nextRecord().getValues();