You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/09/08 08:46:46 UTC

[nifi] branch main updated: NIFI-7770 Add features to csv lookup services

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 73b7ff8  NIFI-7770 Add features to csv lookup services
73b7ff8 is described below

commit 73b7ff8fd4a5f4b0b4e7ff4bdf1b7f97728df9f5
Author: Tamás Bunth <bt...@gmail.com>
AuthorDate: Tue Aug 25 11:10:02 2020 +0200

    NIFI-7770 Add features to csv lookup services
    
    Add the following functionalities:
    - Custom value separator (default is comma)
    - Custom quote char to use (default is " i.e. quote sign)
    - Quote mode
    - Escape character to use (default is no escape character)
    - Comment marker
    - Trim fields
    - Character set to use
    
    The above features use a common implementation with "CSVReader".
    
    Also append a sentence to the capability description that first line of
    csv file is considered header. Setting custom header instead of using
    the first line is not supported (yet).
    
    Also, a minor refactor: CSVRecordLoopkupService and
    SimpleCsvFileLookupService now share common logic in implementation.
    
    CSV Format is extended to the same list as CSVReader. In addition,
    lookup services still have the "default" csv format for compatibility
    reasons.
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4494.
---
 .../nifi-lookup-services/pom.xml                   |   1 +
 .../nifi/lookup/AbstractCSVLookupService.java      | 150 +++++++++++++++++++++
 .../apache/nifi/lookup/CSVRecordLookupService.java | 109 ++-------------
 .../nifi/lookup/SimpleCsvFileLookupService.java    | 110 ++-------------
 .../nifi/lookup/TestCSVRecordLookupService.java    |  25 ++++
 .../lookup/TestSimpleCsvFileLookupService.java     |  23 ++++
 .../src/test/resources/test_sep_escape_comment.csv |   3 +
 7 files changed, 219 insertions(+), 202 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
index 5c258f5..2b1c303 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
@@ -186,6 +186,7 @@
                           <exclude>src/test/resources/complex.avsc</exclude>
                           <exclude>src/test/resources/simple.avsc</exclude>
                           <exclude>src/test/resources/test.csv</exclude>
+                          <exclude>src/test/resources/test_sep_escape_comment.csv</exclude>
                           <exclude>src/test/resources/test_Windows-31J.csv</exclude>
                           <exclude>src/test/resources/test.properties</exclude>
                           <exclude>src/test/resources/test.xml</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/AbstractCSVLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/AbstractCSVLookupService.java
new file mode 100644
index 0000000..e09c1d3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/AbstractCSVLookupService.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Stream;
+
+import org.apache.commons.csv.CSVFormat;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.csv.CSVUtils;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
+
+public abstract class AbstractCSVLookupService extends AbstractControllerService {
+
+    protected static final String KEY = "key";
+    public static final AllowableValue RFC4180 = new AllowableValue("RFC4180", "RFC4180",
+            "Same as RFC 4180. Available for compatibility reasons.");
+    public static final AllowableValue DEFAULT = new AllowableValue("default", "Default Format",
+            "Same as custom format. Available for compatibility reasons.");
+
+    public static final PropertyDescriptor CSV_FILE =
+            new PropertyDescriptor.Builder()
+                    .name("csv-file")
+                    .displayName("CSV File")
+                    .description("Path to a CSV File in which the key value pairs can be looked up.")
+                    .required(true)
+                    .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                    .build();
+
+    public static final PropertyDescriptor CHARSET =
+            new PropertyDescriptor.Builder()
+                    .fromPropertyDescriptor(CSVUtils.CHARSET)
+                    .name("Character Set")
+                    .description("The Character Encoding that is used to decode the CSV file.")
+                    .build();
+
+    public static final PropertyDescriptor CSV_FORMAT =
+            new PropertyDescriptor.Builder()
+                    .fromPropertyDescriptor(CSVUtils.CSV_FORMAT)
+                    .allowableValues(Stream.concat(
+                            CSVUtils.CSV_FORMAT.getAllowableValues().stream(),
+                            Stream.of(DEFAULT, RFC4180)).toArray(AllowableValue[]::new))
+                    .defaultValue(DEFAULT.getValue())
+                    .build();
+
+    public static final PropertyDescriptor LOOKUP_KEY_COLUMN =
+            new PropertyDescriptor.Builder()
+                    .name("lookup-key-column")
+                    .displayName("Lookup Key Column")
+                    .description("The field in the CSV file that will serve as the lookup key. " +
+                            "This is the field that will be matched against the property specified in the lookup processor.")
+                    .required(true)
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                    .build();
+
+    public static final PropertyDescriptor IGNORE_DUPLICATES =
+            new PropertyDescriptor.Builder()
+                    .name("ignore-duplicates")
+                    .displayName("Ignore Duplicates")
+                    .description("Ignore duplicate keys for records in the CSV file.")
+                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+                    .allowableValues("true", "false")
+                    .defaultValue("true")
+                    .required(true)
+                    .build();
+
+    protected List<PropertyDescriptor> properties;
+
+    protected volatile String csvFile;
+
+    protected volatile CSVFormat csvFormat;
+
+    protected volatile String charset;
+
+    protected volatile String lookupKeyColumn;
+
+    protected volatile boolean ignoreDuplicates;
+
+    protected volatile SynchronousFileWatcher watcher;
+
+    protected final ReentrantLock lock = new ReentrantLock();
+
+    protected abstract void loadCache() throws IllegalStateException, IOException;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected void init(final ControllerServiceInitializationContext context) {
+        this.properties = new ArrayList<>();
+        properties.add(CSV_FILE);
+        properties.add(CSV_FORMAT);
+        properties.add(CHARSET);
+        properties.add(LOOKUP_KEY_COLUMN);
+        properties.add(IGNORE_DUPLICATES);
+
+        properties.add(CSVUtils.VALUE_SEPARATOR);
+        properties.add(CSVUtils.QUOTE_CHAR);
+        properties.add(CSVUtils.QUOTE_MODE);
+        properties.add(CSVUtils.COMMENT_MARKER);
+        properties.add(CSVUtils.ESCAPE_CHAR);
+        properties.add(CSVUtils.TRIM_FIELDS);
+    }
+
+    public void onEnabled(final ConfigurationContext context) throws IOException, InitializationException {
+        this.csvFile = context.getProperty(CSV_FILE).evaluateAttributeExpressions().getValue();
+        if( context.getProperty(CSV_FORMAT).getValue().equalsIgnoreCase(RFC4180.getValue()) ) {
+            this.csvFormat = CSVFormat.RFC4180;
+        } else {
+            this.csvFormat = CSVUtils.createCSVFormat(context, Collections.emptyMap());
+        }
+        this.charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+        this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
+        this.ignoreDuplicates = context.getProperty(IGNORE_DUPLICATES).asBoolean();
+        this.watcher = new SynchronousFileWatcher(Paths.get(csvFile), new LastModifiedMonitor(), 30000L);
+    }
+}
+
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
index 086f9e9..388ae68 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/CSVRecordLookupService.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.lookup;
 
-import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
 import org.apache.commons.lang3.StringUtils;
@@ -24,12 +23,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.ControllerServiceInitializationContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
@@ -37,16 +32,12 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
-import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
 
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -55,92 +46,28 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 @Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value", "record"})
 @CapabilityDescription(
         "A reloadable CSV file-based lookup service. When the lookup key is found in the CSV file, " +
-        "the columns are returned as a Record. All returned fields will be strings."
+        "the columns are returned as a Record. All returned fields will be strings. The first line of the csv file " +
+        "is considered as header."
 )
-public class CSVRecordLookupService extends AbstractControllerService implements RecordLookupService {
-
-    private static final String KEY = "key";
+public class CSVRecordLookupService extends AbstractCSVLookupService implements RecordLookupService {
 
     private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
 
-    public static final PropertyDescriptor CSV_FILE =
-            new PropertyDescriptor.Builder()
-                    .name("csv-file")
-                    .displayName("CSV File")
-                    .description("A CSV file that will serve as the data source.")
-                    .required(true)
-                    .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-                    .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-                    .build();
-
     static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractCSVLookupService.CSV_FORMAT)
             .name("csv-format")
-            .displayName("CSV Format")
-            .description("Specifies which \"format\" the CSV data is in.")
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e -> e.toString()).collect(Collectors.toSet()))
-            .defaultValue(CSVFormat.Predefined.Default.toString())
-            .required(true)
             .build();
 
-    public static final PropertyDescriptor CHARSET =
-            new PropertyDescriptor.Builder()
-                    .name("Character Set")
-                    .description("The Character Encoding that is used to decode the CSV file.")
-                    .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-                    .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-                    .defaultValue("UTF-8")
-                    .required(true)
-                    .build();
-
-    public static final PropertyDescriptor LOOKUP_KEY_COLUMN =
-            new PropertyDescriptor.Builder()
-                    .name("lookup-key-column")
-                    .displayName("Lookup Key Column")
-                    .description("The field in the CSV file that will serve as the lookup key. " +
-                            "This is the field that will be matched against the property specified in the lookup processor.")
-                    .required(true)
-                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                    .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-                    .build();
-
-    public static final PropertyDescriptor IGNORE_DUPLICATES =
-            new PropertyDescriptor.Builder()
-                    .name("ignore-duplicates")
-                    .displayName("Ignore Duplicates")
-                    .description("Ignore duplicate keys for records in the CSV file.")
-                    .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-                    .allowableValues("true", "false")
-                    .defaultValue("true")
-                    .required(true)
-                    .build();
-
-    private List<PropertyDescriptor> properties;
-
     private volatile ConcurrentMap<String, Record> cache;
 
-    private volatile String csvFile;
-
-    private volatile CSVFormat csvFormat;
-
-    private volatile String charset;
-
-    private volatile String lookupKeyColumn;
-
-    private volatile boolean ignoreDuplicates;
-
-    private volatile SynchronousFileWatcher watcher;
-
-    private final ReentrantLock lock = new ReentrantLock();
-
-    private void loadCache() throws IllegalStateException, IOException {
+    @Override
+    protected void loadCache() throws IllegalStateException, IOException {
         if (lock.tryLock()) {
             try {
                 final ComponentLog logger = getLogger();
@@ -194,30 +121,10 @@ public class CSVRecordLookupService extends AbstractControllerService implements
         }
     }
 
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    protected void init(final ControllerServiceInitializationContext context) throws InitializationException {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(CSV_FILE);
-        properties.add(CSV_FORMAT);
-        properties.add(CHARSET);
-        properties.add(LOOKUP_KEY_COLUMN);
-        properties.add(IGNORE_DUPLICATES);
-        this.properties = Collections.unmodifiableList(properties);
-    }
-
     @OnEnabled
+    @Override
     public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException {
-        this.csvFile = context.getProperty(CSV_FILE).evaluateAttributeExpressions().getValue();
-        this.csvFormat = CSVFormat.Predefined.valueOf(context.getProperty(CSV_FORMAT).getValue()).getFormat();
-        this.charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
-        this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
-        this.ignoreDuplicates = context.getProperty(IGNORE_DUPLICATES).asBoolean();
-        this.watcher = new SynchronousFileWatcher(Paths.get(csvFile), new LastModifiedMonitor(), 30000L);
+        super.onEnabled(context);
         try {
             loadCache();
         } catch (final IllegalStateException e) {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
index c58d6bc..3a9ace3 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
@@ -17,26 +17,19 @@
 package org.apache.nifi.lookup;
 
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVRecord;
 import org.apache.commons.lang3.StringUtils;
 
@@ -44,63 +37,20 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
-import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
 
 @Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value"})
-@CapabilityDescription("A reloadable CSV file-based lookup service")
-public class SimpleCsvFileLookupService extends AbstractControllerService implements StringLookupService {
-
-    private static final String KEY = "key";
+@CapabilityDescription("A reloadable CSV file-based lookup service. The first line of the csv file is considered as " +
+        "header.")
+public class SimpleCsvFileLookupService extends AbstractCSVLookupService implements StringLookupService {
 
     private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
 
-    public static final PropertyDescriptor CSV_FILE =
-        new PropertyDescriptor.Builder()
-            .name("csv-file")
-            .displayName("CSV File")
-            .description("A CSV file.")
-            .required(true)
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-
-    static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder()
-        .name("CSV Format")
-        .description("Specifies which \"format\" the CSV data is in, or specifies if custom formatting should be used.")
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-        .allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e -> e.toString()).collect(Collectors.toSet()))
-        .defaultValue(CSVFormat.Predefined.Default.toString())
-        .required(true)
-        .build();
-
-    public static final PropertyDescriptor CHARSET =
-        new PropertyDescriptor.Builder()
-            .name("Character Set")
-            .description("The Character Encoding that is used to decode the CSV file.")
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-            .defaultValue("UTF-8")
-            .required(true)
-            .build();
-
-    public static final PropertyDescriptor LOOKUP_KEY_COLUMN =
-        new PropertyDescriptor.Builder()
-            .name("lookup-key-column")
-            .displayName("Lookup Key Column")
-            .description("Lookup key column.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-
     public static final PropertyDescriptor LOOKUP_VALUE_COLUMN =
         new PropertyDescriptor.Builder()
             .name("lookup-value-column")
@@ -111,38 +61,12 @@ public class SimpleCsvFileLookupService extends AbstractControllerService implem
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
-    public static final PropertyDescriptor IGNORE_DUPLICATES =
-        new PropertyDescriptor.Builder()
-            .name("ignore-duplicates")
-            .displayName("Ignore Duplicates")
-            .description("Ignore duplicate keys for records in the CSV file.")
-            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .required(true)
-            .build();
-
-    private List<PropertyDescriptor> properties;
-
     private volatile ConcurrentMap<String, String> cache;
 
-    private volatile String csvFile;
-
-    private volatile CSVFormat csvFormat;
-
-    private volatile String charset;
-
-    private volatile String lookupKeyColumn;
-
     private volatile String lookupValueColumn;
 
-    private volatile boolean ignoreDuplicates;
-
-    private volatile SynchronousFileWatcher watcher;
-
-    private final ReentrantLock lock = new ReentrantLock();
-
-    private void loadCache() throws IllegalStateException, IOException {
+    @Override
+    protected void loadCache() throws IllegalStateException, IOException {
         if (lock.tryLock()) {
             try {
                 final ComponentLog logger = getLogger();
@@ -181,31 +105,15 @@ public class SimpleCsvFileLookupService extends AbstractControllerService implem
     }
 
     @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    protected void init(final ControllerServiceInitializationContext context) throws InitializationException {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(CSV_FILE);
-        properties.add(CSV_FORMAT);
-        properties.add(CHARSET);
-        properties.add(LOOKUP_KEY_COLUMN);
+    protected void init(final ControllerServiceInitializationContext context) {
+        super.init(context);
         properties.add(LOOKUP_VALUE_COLUMN);
-        properties.add(IGNORE_DUPLICATES);
-        this.properties = Collections.unmodifiableList(properties);
     }
 
     @OnEnabled
-    public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, FileNotFoundException {
-        this.csvFile = context.getProperty(CSV_FILE).evaluateAttributeExpressions().getValue();
-        this.csvFormat = CSVFormat.Predefined.valueOf(context.getProperty(CSV_FORMAT).getValue()).getFormat();
-        this.charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
-        this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue();
+    public void onEnabled(final ConfigurationContext context) throws  IOException, InitializationException {
+        super.onEnabled(context);
         this.lookupValueColumn = context.getProperty(LOOKUP_VALUE_COLUMN).evaluateAttributeExpressions().getValue();
-        this.ignoreDuplicates = context.getProperty(IGNORE_DUPLICATES).asBoolean();
-        this.watcher = new SynchronousFileWatcher(Paths.get(csvFile), new LastModifiedMonitor(), 30000L);
         try {
             loadCache();
         } catch (final IllegalStateException e) {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
index 158842e..4ef84c6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestCSVRecordLookupService.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Optional;
 
+import org.apache.nifi.csv.CSVUtils;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.util.TestRunner;
@@ -29,6 +30,7 @@ import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
@@ -86,5 +88,28 @@ public class TestCSVRecordLookupService {
         assertThat(property1.get().getAsString("created_at"), is("2017-04-01"));
     }
 
+    @Test
+    public void testCsvRecordLookupServiceWithCustomSeparatorQuotedEscaped() throws InitializationException, LookupFailureException {
+        final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+        final CSVRecordLookupService service = new CSVRecordLookupService();
+
+        runner.addControllerService("csv-file-lookup-service", service);
+        runner.setProperty(service, SimpleCsvFileLookupService.CSV_FORMAT, "custom");
+        runner.setProperty(service, SimpleCsvFileLookupService.CSV_FILE, "src/test/resources/test_sep_escape_comment.csv");
+        runner.setProperty(service, SimpleCsvFileLookupService.LOOKUP_KEY_COLUMN, "key");
+        runner.setProperty(service, SimpleCsvFileLookupService.LOOKUP_VALUE_COLUMN, "value");
+        runner.setProperty(service, CSVUtils.VALUE_SEPARATOR, "|");
+        runner.setProperty(service, CSVUtils.QUOTE_CHAR, "\"");
+        runner.setProperty(service, CSVUtils.ESCAPE_CHAR, "%");
+        runner.setProperty(service, CSVUtils.COMMENT_MARKER, "#");
+        runner.setProperty(service, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_ALL);
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        final Optional<Record> my_key = service.lookup(Collections.singletonMap("key", "my_key"));
+        assertTrue(my_key.isPresent());
+        assertEquals("my_value with an escaped |.", my_key.get().getAsString("value"));
+    }
+
 
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
index 371942d..a7a90fe 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Optional;
 
+import org.apache.nifi.csv.CSVUtils;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -83,4 +84,26 @@ public class TestSimpleCsvFileLookupService {
         assertThat(property1.isPresent(), is(true));
         assertThat(property1.get(), is("this is property \uff11"));
     }
+
+    @Test
+    public void testSimpleCsvFileLookupServiceWithCustomSeparatorQuotedEscaped() throws InitializationException, LookupFailureException {
+        final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+        final SimpleCsvFileLookupService service = new SimpleCsvFileLookupService();
+
+        runner.addControllerService("csv-file-lookup-service", service);
+        runner.setProperty(service, SimpleCsvFileLookupService.CSV_FORMAT, "custom");
+        runner.setProperty(service, SimpleCsvFileLookupService.CSV_FILE, "src/test/resources/test_sep_escape_comment.csv");
+        runner.setProperty(service, SimpleCsvFileLookupService.LOOKUP_KEY_COLUMN, "key");
+        runner.setProperty(service, SimpleCsvFileLookupService.LOOKUP_VALUE_COLUMN, "value");
+        runner.setProperty(service, CSVUtils.VALUE_SEPARATOR, "|");
+        runner.setProperty(service, CSVUtils.QUOTE_CHAR, "\"");
+        runner.setProperty(service, CSVUtils.ESCAPE_CHAR, "%");
+        runner.setProperty(service, CSVUtils.COMMENT_MARKER, "#");
+        runner.setProperty(service, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_ALL);
+        runner.enableControllerService(service);
+        runner.assertValid(service);
+
+        final Optional<String> value = service.lookup(Collections.singletonMap("key", "my_key"));
+        assertEquals(Optional.of("my_value with an escaped |."), value);
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/resources/test_sep_escape_comment.csv b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/resources/test_sep_escape_comment.csv
new file mode 100644
index 0000000..6c2a82b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/resources/test_sep_escape_comment.csv
@@ -0,0 +1,3 @@
+"key"|"value"|"created_at"
+# This is a remarkably nice comment
+"my_key"|"my_value with an escaped %|."|"2020-07-30"