You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/03/10 00:08:01 UTC

[nifi] 02/02: NIFI-6047 Cleaned up code to allow tests to run against 1.13.0-snapshot Removed DMC. NIFI-6047 Started integrating changes from NIFI-6014. NIFI-6047 Added DMC tests. NIFI-6047 Added cache identifier recordpath test. NIFI-6047 Added additional details. NIFI-6047 Removed old additional details. NIFI-6047 made some changes requested in a follow up review. NIFI-6047 latest. NIFI-6047 Finished updates First round of code review cleanup Latest Removed EL from the dynamic properties. Finished cod [...]

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit df00cc6cb576c11ae3ef0f1c6f64454598298936
Author: Mike Thomsen <mt...@apache.org>
AuthorDate: Thu Oct 29 13:52:12 2020 -0400

    NIFI-6047 Cleaned up code to allow tests to run against 1.13.0-snapshot
    Removed DMC.
    NIFI-6047 Started integrating changes from NIFI-6014.
    NIFI-6047 Added DMC tests.
    NIFI-6047 Added cache identifier recordpath test.
    NIFI-6047 Added additional details.
    NIFI-6047 Removed old additional details.
    NIFI-6047 made some changes requested in a follow up review.
    NIFI-6047 latest.
    NIFI-6047 Finished updates
    First round of code review cleanup
    Latest
    Removed EL from the dynamic properties.
    Finished code review requested refactoring.
    Checkstyle fix.
    Removed a Java 11 API
    NIFI-6047 Renamed processor to DeduplicateRecord
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #4646
---
 ...DuplicateRecord.java => DeduplicateRecord.java} | 562 +++++++++++----------
 .../services/org.apache.nifi.processor.Processor   |   2 +-
 .../additionalDetails.html                         |  70 +++
 .../DetectDuplicateRecord/additionalDetails.html   |  96 ----
 .../processors/standard/MockCacheService.groovy    |  77 ---
 .../processors/standard/TestDeduplicateRecord.java | 321 ++++++++++++
 .../standard/TestDetectDuplicateRecord.java        | 209 --------
 7 files changed, 682 insertions(+), 655 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeduplicateRecord.java
similarity index 53%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicateRecord.java
rename to nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeduplicateRecord.java
index 191a675..b055a75 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeduplicateRecord.java
@@ -22,94 +22,111 @@ import com.google.common.hash.Funnels;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.codec.digest.MessageDigestAlgorithms;
-import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.*;
-import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
-import org.apache.nifi.distributed.cache.client.exception.SerializationException;
-import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.record.path.FieldValue;
 import org.apache.nifi.record.path.RecordPath;
 import org.apache.nifi.record.path.RecordPathResult;
 import org.apache.nifi.record.path.util.RecordPathCache;
-import org.apache.nifi.record.path.validation.RecordPathPropertyNameValidator;
 import org.apache.nifi.record.path.validation.RecordPathValidator;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.*;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
-import java.io.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.util.stream.Collectors.toList;
 import static org.apache.commons.codec.binary.StringUtils.getBytesUtf8;
-import static org.apache.commons.lang3.StringUtils.*;
 
 @EventDriven
 @SupportsBatching
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @SystemResourceConsideration(resource = SystemResource.MEMORY,
-    description = "The HashSet filter type will grow memory space proportionate to the number of unique records processed. " +
-        "The BloomFilter type will use constant memory regardless of the number of records processed.")
-@Tags({"text", "record", "update", "change", "replace", "modify", "distinct", "unique",
-    "filter", "hash", "dupe", "duplicate", "dedupe"})
-@CapabilityDescription("Caches records from each incoming FlowFile and determines if the record " +
-    "has already been seen. If so, routes the record to 'duplicate'. If the record is " +
-    "not determined to be a duplicate, it is routed to 'non-duplicate'."
+        description = "The HashSet filter type will grow memory space proportionate to the number of unique records processed. " +
+                "The BloomFilter type will use constant memory regardless of the number of records processed.")
+@SystemResourceConsideration(resource = SystemResource.CPU,
+        description = "If a more advanced hash algorithm is chosen, the amount of time required to hash any particular " +
+                "record could increase substantially."
 )
+@Tags({"text", "record", "update", "change", "replace", "modify", "distinct", "unique",
+        "filter", "hash", "dupe", "duplicate", "dedupe"})
+@CapabilityDescription("This processor attempts to deduplicate a record set in memory using either a hashset or a bloom filter. " +
+        "It operates on a per-file basis rather than across an entire data set that spans multiple files.")
 @WritesAttribute(attribute = "record.count", description = "The number of records processed.")
 @DynamicProperty(
-    name = "RecordPath",
-    value = "An expression language statement used to determine how the RecordPath is resolved. " +
-            "The following variables are availble: ${field.name}, ${field.value}, ${field.type}",
-    description = "The name of each user-defined property must be a valid RecordPath.")
+        name = "RecordPath",
+        value = "An expression language statement used to determine how the RecordPath is resolved. " +
+                "The following variables are availible: ${field.name}, ${field.value}, ${field.type}",
+        description = "The name of each user-defined property must be a valid RecordPath.")
 @SeeAlso(classNames = {
-    "org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService",
-    "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
-    "org.apache.nifi.processors.standard.DetectDuplicate"
+        "org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService",
+        "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
+        "org.apache.nifi.processors.standard.DetectDuplicate"
 })
-public class DetectDuplicateRecord extends AbstractProcessor {
+public class DeduplicateRecord extends AbstractProcessor {
+    public static final char JOIN_CHAR = '~';
 
     private static final String FIELD_NAME = "field.name";
     private static final String FIELD_VALUE = "field.value";
     private static final String FIELD_TYPE = "field.type";
 
     private volatile RecordPathCache recordPathCache;
-    private volatile List<String> recordPaths;
+    private volatile List<PropertyDescriptor> dynamicProperties;
 
     // VALUES
 
     static final AllowableValue NONE_ALGORITHM_VALUE = new AllowableValue("none", "None",
             "Do not use a hashing algorithm. The value of resolved RecordPaths will be combined with tildes (~) to form the unique record key. " +
                     "This may use significantly more storage depending on the size and shape or your data.");
-    static final AllowableValue MD5_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.MD5, "MD5",
-            "The MD5 message-digest algorithm.");
-    static final AllowableValue SHA1_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA_1, "SHA-1",
-            "The SHA-1 cryptographic hash algorithm.");
-    static final AllowableValue SHA256_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA3_256, "SHA-256",
+    static final AllowableValue SHA256_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA_256, "SHA-256",
             "The SHA-256 cryptographic hash algorithm.");
-    static final AllowableValue SHA512_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA3_512, "SHA-512",
+    static final AllowableValue SHA512_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA_512, "SHA-512",
             "The SHA-512 cryptographic hash algorithm.");
 
     static final AllowableValue HASH_SET_VALUE = new AllowableValue("hash-set", "HashSet",
@@ -139,56 +156,54 @@ public class DetectDuplicateRecord extends AbstractProcessor {
             .required(true)
             .build();
 
-    static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
-            .name("include-zero-record-flowfiles")
-            .displayName("Include Zero Record FlowFiles")
-            .description("When converting an incoming FlowFile, if the conversion results in no data, "
-                    + "this property specifies whether or not a FlowFile will be sent to the corresponding relationship")
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .required(true)
-            .build();
-
-    static final PropertyDescriptor CACHE_IDENTIFIER = new PropertyDescriptor.Builder()
-            .name("cache-the-entry-identifier")
-            .displayName("Cache The Entry Identifier")
-            .description("When true this cause the processor to check for duplicates and cache the Entry Identifier. When false, "
-                    + "the processor would only check for duplicates and not cache the Entry Identifier, requiring another "
-                    + "processor to add identifiers to the distributed cache.")
+    static final AllowableValue OPTION_SINGLE_FILE = new AllowableValue("single", "Single File");
+    static final AllowableValue OPTION_MULTIPLE_FILES = new AllowableValue("multiple", "Multiple Files");
+
+    static final PropertyDescriptor DEDUPLICATION_STRATEGY = new PropertyDescriptor.Builder()
+            .name("deduplication-strategy")
+            .displayName("Deduplication Strategy")
+            .description("The strategy to use for detecting and isolating duplicate records. The option for doing it " +
+                    "across a single data file will operate in memory, whereas the one for going across the enter repository " +
+                    "will require a distributed map cache.")
+            .allowableValues(OPTION_SINGLE_FILE, OPTION_MULTIPLE_FILES)
+            .defaultValue(OPTION_SINGLE_FILE.getValue())
             .required(true)
-            .allowableValues("true", "false")
-            .defaultValue("true")
             .build();
 
-    static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
-            .name("distributed-cache-service")
-            .displayName("Distributed Cache Service")
-            .description("The Controller Service that is used to cache unique records, used to determine duplicates")
-            .required(false)
+    static final PropertyDescriptor DISTRIBUTED_MAP_CACHE = new PropertyDescriptor.Builder()
+            .name("distributed-map-cache")
+            .displayName("Distributed Map Cache client")
+            .description("This configuration is required when the deduplication strategy is set to 'multiple files.' The map " +
+                    "cache will be used to check a data source such as HBase or Redis for entries indicating that a record has " +
+                    "been processed before. This option requires a downstream process that uses PutDistributedMapCache to write " +
+                    "an entry to the cache data source once the record has been processed to indicate that it has been handled before.")
             .identifiesControllerService(DistributedMapCacheClient.class)
+            .required(false)
+            .addValidator(Validator.VALID)
+            .dependsOn(DEDUPLICATION_STRATEGY, OPTION_MULTIPLE_FILES)
             .build();
 
-    static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
-            .name("cache-entry-identifier")
-            .displayName("Cache Entry Identifier")
-            .description(
-                    "A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated " +
-                            "against a FlowFile in order to determine the cached filter type value used to identify duplicates.")
-            .required(false)
-            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
-            .defaultValue("${hash.value}")
+    static final PropertyDescriptor CACHE_IDENTIFIER = new PropertyDescriptor.Builder()
+            .name("cache-identifier")
+            .displayName("Cache Identifier")
+            .description("This option defines a record path operation to use for defining the cache identifier. It can be used " +
+                    "in addition to the hash settings. This field will have the expression language attribute \"record.hash.value\" " +
+                    "available to it to use with it to generate the record path operation.")
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .addValidator(Validator.VALID)
+            .dependsOn(DEDUPLICATION_STRATEGY, OPTION_MULTIPLE_FILES)
             .build();
 
-    static final PropertyDescriptor AGE_OFF_DURATION = new PropertyDescriptor.Builder()
-            .name("age-off-duration")
-            .displayName("Age Off Duration")
-            .description("Time interval to age off cached filter entries. When the cache expires, the entire filter and its values " +
-                    "are destroyed. Leaving this value empty will cause the cached entries to never expire but may eventually be rotated " +
-                    "out when the cache servers rotation policy automatically expires entries.")
-            .required(false)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+    static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
+            .name("include-zero-record-flowfiles")
+            .displayName("Include Zero Record FlowFiles")
+            .description("When converting an incoming FlowFile, if the conversion results in no data, "
+                    + "this property specifies whether or not a FlowFile will be sent to the corresponding relationship")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
             .build();
 
     static final PropertyDescriptor RECORD_HASHING_ALGORITHM = new PropertyDescriptor.Builder()
@@ -197,12 +212,10 @@ public class DetectDuplicateRecord extends AbstractProcessor {
             .description("The algorithm used to hash the combined set of resolved RecordPath values for cache storage.")
             .allowableValues(
                     NONE_ALGORITHM_VALUE,
-                    MD5_ALGORITHM_VALUE,
-                    SHA1_ALGORITHM_VALUE,
                     SHA256_ALGORITHM_VALUE,
                     SHA512_ALGORITHM_VALUE
             )
-            .defaultValue(SHA1_ALGORITHM_VALUE.getValue())
+            .defaultValue(SHA256_ALGORITHM_VALUE.getValue())
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .required(true)
             .build();
@@ -210,12 +223,16 @@ public class DetectDuplicateRecord extends AbstractProcessor {
     static final PropertyDescriptor FILTER_TYPE = new PropertyDescriptor.Builder()
             .name("filter-type")
             .displayName("Filter Type")
-            .description("The filter used to determine whether a record has been seen before based on the matching RecordPath criteria.")
+            .description("The filter used to determine whether a record has been seen before based on the matching RecordPath " +
+                    "criteria. If hash set is selected, a Java HashSet object will be used to deduplicate all encountered " +
+                    "records. If the bloom filter option is selected, a bloom filter will be used. The bloom filter option is " +
+                    "less memory intensive, but has a chance of having false positives.")
             .allowableValues(
                     HASH_SET_VALUE,
                     BLOOM_FILTER_VALUE
             )
             .defaultValue(HASH_SET_VALUE.getValue())
+            .dependsOn(DEDUPLICATION_STRATEGY, OPTION_SINGLE_FILE)
             .required(true)
             .build();
 
@@ -227,6 +244,7 @@ public class DetectDuplicateRecord extends AbstractProcessor {
             .defaultValue("25000")
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
             .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            .dependsOn(FILTER_TYPE, BLOOM_FILTER_VALUE)
             .required(true)
             .build();
 
@@ -269,20 +287,15 @@ public class DetectDuplicateRecord extends AbstractProcessor {
 
     private Set<Relationship> relationships;
 
-    private final Serializer<String> keySerializer = new StringSerializer();
-    private final Serializer<CacheValue> cacheValueSerializer = new CacheValueSerializer();
-    private final Deserializer<CacheValue> cacheValueDeserializer = new CacheValueDeserializer();
-
     @Override
     protected void init(final ProcessorInitializationContext context) {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(DEDUPLICATION_STRATEGY);
+        descriptors.add(DISTRIBUTED_MAP_CACHE);
+        descriptors.add(CACHE_IDENTIFIER);
         descriptors.add(RECORD_READER);
         descriptors.add(RECORD_WRITER);
         descriptors.add(INCLUDE_ZERO_RECORD_FLOWFILES);
-        descriptors.add(CACHE_IDENTIFIER);
-        descriptors.add(CACHE_ENTRY_IDENTIFIER);
-        descriptors.add(AGE_OFF_DURATION);
-        descriptors.add(DISTRIBUTED_CACHE_SERVICE);
         descriptors.add(RECORD_HASHING_ALGORITHM);
         descriptors.add(FILTER_TYPE);
         descriptors.add(FILTER_CAPACITY_HINT);
@@ -318,24 +331,20 @@ public class DetectDuplicateRecord extends AbstractProcessor {
                         "to access information about the field and the value of the field being evaluated.")
                 .required(false)
                 .dynamic(true)
+                .addValidator(new RecordPathValidator())
                 .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-                .addValidator(new RecordPathPropertyNameValidator())
                 .build();
     }
 
     @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+    protected Collection<ValidationResult> customValidate(final ValidationContext context) {
         RecordPathValidator recordPathValidator = new RecordPathValidator();
-        final List<ValidationResult> validationResults = validationContext.getProperties().keySet().stream()
-                .filter(PropertyDescriptor::isDynamic)
-                .map(property -> recordPathValidator.validate(
-                        "User-defined Properties",
-                        property.getName(),
-                        validationContext
-                )).collect(Collectors.toList());
-
-        if(validationContext.getProperty(BLOOM_FILTER_FPP).isSet()) {
-            final double falsePositiveProbability = validationContext.getProperty(BLOOM_FILTER_FPP).asDouble();
+        List<ValidationResult> validationResults = new ArrayList<>();
+
+        boolean useSingleFile = context.getProperty(DEDUPLICATION_STRATEGY).getValue().equals(OPTION_SINGLE_FILE.getValue());
+
+        if (useSingleFile && context.getProperty(BLOOM_FILTER_FPP).isSet()) {
+            final double falsePositiveProbability = context.getProperty(BLOOM_FILTER_FPP).asDouble();
             if (falsePositiveProbability < 0 || falsePositiveProbability > 1) {
                 validationResults.add(
                         new ValidationResult.Builder()
@@ -344,42 +353,62 @@ public class DetectDuplicateRecord extends AbstractProcessor {
                                 .explanation("Valid values are 0.0 - 1.0 inclusive")
                                 .valid(false).build());
             }
-        }
-
-        if(validationContext.getProperty(CACHE_IDENTIFIER).asBoolean()) {
-            if(!validationContext.getProperty(DISTRIBUTED_CACHE_SERVICE).isSet())
+        } else if (!useSingleFile) {
+            if (!context.getProperty(DISTRIBUTED_MAP_CACHE).isSet()) {
                 validationResults.add(new ValidationResult.Builder()
-                        .subject(DISTRIBUTED_CACHE_SERVICE.getName())
-                        .explanation(DISTRIBUTED_CACHE_SERVICE.getName() + " is required when " + CACHE_IDENTIFIER.getName() + " is true.")
-                        .valid(false).build());
-
-            if(!validationContext.getProperty(CACHE_ENTRY_IDENTIFIER).isSet())
-                validationResults.add(new ValidationResult.Builder()
-                        .subject(CACHE_ENTRY_IDENTIFIER.getName())
-                        .explanation(CACHE_ENTRY_IDENTIFIER.getName() + " is required when " + CACHE_IDENTIFIER.getName() + " is true.")
-                        .valid(false).build());
-
-            if(!validationContext.getProperty(AGE_OFF_DURATION).isSet())
-                validationResults.add(new ValidationResult.Builder()
-                        .subject(AGE_OFF_DURATION.getName())
-                        .explanation(AGE_OFF_DURATION.getName() + " is required when " + CACHE_IDENTIFIER.getName() + " is true.")
+                        .subject(DISTRIBUTED_MAP_CACHE.getName())
+                        .explanation("Multiple files deduplication was chosen, but a distributed map cache client was " +
+                                "not configured")
                         .valid(false).build());
+            }
         }
 
         return validationResults;
     }
 
-    @OnScheduled
-    public void compileRecordPaths(final ProcessContext context) {
-        final List<String> recordPaths = new ArrayList<>();
+    private DistributedMapCacheClient mapCacheClient;
+    private RecordReaderFactory readerFactory;
+    private RecordSetWriterFactory writerFactory;
 
-        recordPaths.addAll(context.getProperties().keySet().stream()
+    private boolean useInMemoryStrategy;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        dynamicProperties = context.getProperties().keySet().stream()
                 .filter(PropertyDescriptor::isDynamic)
-                .map(PropertyDescriptor::getName)
-                .collect(toList()));
+                .collect(Collectors.toList());
+
+        int cacheSize = dynamicProperties.size();
+
+        recordPathCache = new RecordPathCache(cacheSize);
 
-        recordPathCache = new RecordPathCache(recordPaths.size());
-        this.recordPaths = recordPaths;
+        if (context.getProperty(DISTRIBUTED_MAP_CACHE).isSet()) {
+            mapCacheClient = context.getProperty(DISTRIBUTED_MAP_CACHE).asControllerService(DistributedMapCacheClient.class);
+        }
+
+        readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        String strategy = context.getProperty(DEDUPLICATION_STRATEGY).getValue();
+
+        useInMemoryStrategy = strategy.equals(OPTION_SINGLE_FILE.getValue());
+    }
+
+    private FilterWrapper getFilter(ProcessContext context) {
+        if (useInMemoryStrategy) {
+            boolean useHashSet = context.getProperty(FILTER_TYPE).getValue()
+                    .equals(context.getProperty(HASH_SET_VALUE.getValue()));
+            final int filterCapacity = context.getProperty(FILTER_CAPACITY_HINT).asInteger();
+            return useHashSet
+                    ? new HashSetFilterWrapper(new HashSet<>(filterCapacity))
+                    : new BloomFilterWrapper(BloomFilter.create(
+                    Funnels.stringFunnel(Charset.defaultCharset()),
+                    filterCapacity,
+                    context.getProperty(BLOOM_FILTER_FPP).asDouble()
+            ));
+        } else {
+            return new DistributedMapCacheClientWrapper(mapCacheClient);
+        }
     }
 
     @Override
@@ -390,44 +419,29 @@ public class DetectDuplicateRecord extends AbstractProcessor {
         }
 
         final ComponentLog logger = getLogger();
-        final String cacheKey = context.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
-
-        if (isBlank(cacheKey)) {
-            logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
-            session.transfer(session.penalize(flowFile), REL_FAILURE);
-            return;
-        }
 
         FlowFile nonDuplicatesFlowFile = session.create(flowFile);
         FlowFile duplicatesFlowFile = session.create(flowFile);
 
-        try {
-            final long now = System.currentTimeMillis();
-            final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-
-            final boolean shouldCacheIdentifier = context.getProperty(CACHE_IDENTIFIER).asBoolean();
-            final int filterCapacity = context.getProperty(FILTER_CAPACITY_HINT).asInteger();
-            Serializable serializableFilter = context.getProperty(FILTER_TYPE).getValue()
-                    .equals(context.getProperty(HASH_SET_VALUE.getValue()))
-                    ? new HashSet<String>(filterCapacity)
-                    : BloomFilter.create(
-                    Funnels.stringFunnel(Charset.defaultCharset()),
-                    filterCapacity,
-                    context.getProperty(BLOOM_FILTER_FPP).asDouble());
-
-            if(shouldCacheIdentifier && cache.containsKey(cacheKey, keySerializer)) {
-                CacheValue cacheValue = cache.get(cacheKey, keySerializer, cacheValueDeserializer);
-                Long durationMS = context.getProperty(AGE_OFF_DURATION).asTimePeriod(TimeUnit.MILLISECONDS);
-
-                if(durationMS != null && (now >= cacheValue.getEntryTimeMS() + durationMS)) {
-                    boolean status = cache.remove(cacheKey, keySerializer);
-                    logger.debug("Removal of expired cached entry with key {} returned {}", new Object[]{cacheKey, status});
-                } else {
-                    serializableFilter = cacheValue.getFilter();
-                }
-            }
-
-            final FilterWrapper filter = FilterWrapper.create(serializableFilter);
+        long index = 0;
+
+        WriteResult nonDuplicatesWriteResult = null;
+        WriteResult duplicatesWriteResult = null;
+        String duplicateMimeType = null;
+        String nonDuplicateMimeType = null;
+
+        boolean error = false;
+        try (
+                final InputStream inputStream = session.read(flowFile);
+                final RecordReader reader = readerFactory.createRecordReader(flowFile, inputStream, logger);
+                final OutputStream nonDupeStream = session.write(nonDuplicatesFlowFile);
+                final OutputStream dupeStream = session.write(duplicatesFlowFile);
+                final RecordSetWriter nonDuplicatesWriter = writerFactory
+                        .createWriter(getLogger(), writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema()), nonDupeStream, nonDuplicatesFlowFile);
+                final RecordSetWriter duplicatesWriter = writerFactory
+                        .createWriter(getLogger(), writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema()), dupeStream, duplicatesFlowFile);
+        ) {
+            final FilterWrapper filter = getFilter(context);
 
             final String recordHashingAlgorithm = context.getProperty(RECORD_HASHING_ALGORITHM).getValue();
             final MessageDigest messageDigest = recordHashingAlgorithm.equals(NONE_ALGORITHM_VALUE.getValue())
@@ -435,14 +449,6 @@ public class DetectDuplicateRecord extends AbstractProcessor {
                     : DigestUtils.getDigest(recordHashingAlgorithm);
             final Boolean matchWholeRecord = context.getProperties().keySet().stream().noneMatch(p -> p.isDynamic());
 
-            final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-            final RecordReader reader = readerFactory.createRecordReader(flowFile.getAttributes(), session.read(flowFile), logger);
-
-            final RecordSchema writeSchema = writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema());
-            final RecordSetWriter nonDuplicatesWriter = writerFactory.createWriter(getLogger(), writeSchema, session.write(nonDuplicatesFlowFile));
-            final RecordSetWriter duplicatesWriter = writerFactory.createWriter(getLogger(), writeSchema, session.write(duplicatesFlowFile));
-
             nonDuplicatesWriter.beginRecordSet();
             duplicatesWriter.beginRecordSet();
             Record record;
@@ -451,114 +457,125 @@ public class DetectDuplicateRecord extends AbstractProcessor {
                 String recordValue;
 
                 if (matchWholeRecord) {
-                    recordValue = Joiner.on('~').join(record.getValues());
+                    recordValue = Joiner.on(JOIN_CHAR).join(record.getValues());
                 } else {
-                    final List<String> fieldValues = new ArrayList<>();
-                    for (final String recordPathText : recordPaths) {
-                        final PropertyValue recordPathPropertyValue = context.getProperty(recordPathText);
-                        final RecordPath recordPath = recordPathCache.getCompiled(recordPathText);
-                        final RecordPathResult result = recordPath.evaluate(record);
-                        final List<FieldValue> selectedFields = result.getSelectedFields().collect(Collectors.toList());
-
-                        if(recordPathPropertyValue.isExpressionLanguagePresent()) {
-                            final Map<String, String> fieldVariables = new HashMap<>();
-                            selectedFields.forEach(fieldVal -> {
-                                fieldVariables.clear();
-                                fieldVariables.put(FIELD_NAME, fieldVal.getField().getFieldName());
-                                fieldVariables.put(FIELD_VALUE, DataTypeUtils.toString(fieldVal.getValue(), (String) null));
-                                fieldVariables.put(FIELD_TYPE, fieldVal.getField().getDataType().getFieldType().name());
-
-                                fieldValues.add(recordPathPropertyValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue());
-                            });
-                        } else {
-                            fieldValues.add(recordPathPropertyValue.evaluateAttributeExpressions(flowFile).getValue());
-                        }
-
-                        fieldValues.addAll(selectedFields.stream()
-                                        .map(f -> recordPathPropertyValue.evaluateAttributeExpressions(flowFile).getValue())
-                                        .collect(toList())
-                        );
-                    }
-                    recordValue = Joiner.on('~').join(fieldValues);
+                    recordValue = executeDynamicRecordPaths(context, record, flowFile);
                 }
 
-                final String recordHash = messageDigest != null
+                String recordHash = messageDigest != null
                         ? Hex.encodeHexString(messageDigest.digest(getBytesUtf8(recordValue)))
                         : recordValue;
+                messageDigest.reset();
+
+                if (!useInMemoryStrategy && context.getProperty(CACHE_IDENTIFIER).isSet()) {
+                    Map<String, String> additional = new HashMap<>();
+                    additional.put("record.hash.value", recordHash);
+                    String rawPath = context.getProperty(CACHE_IDENTIFIER).evaluateAttributeExpressions(flowFile, additional).getValue();
+                    RecordPath compiled = recordPathCache.getCompiled(rawPath);
+                    RecordPathResult result = compiled.evaluate(record);
+                    FieldValue fieldValue = result.getSelectedFields().findFirst().get();
+                    if (fieldValue.getValue() == null) {
+                        throw new ProcessException(String.format("The path \"%s\" failed to create an ID value at record index %d", rawPath, index));
+                    }
+
+                    recordHash = fieldValue.getValue().toString();
+                }
 
-                if(filter.contains(recordHash)) {
+                if (filter.contains(recordHash)) {
                     duplicatesWriter.write(record);
                 } else {
                     nonDuplicatesWriter.write(record);
+                    filter.put(recordHash);
                 }
 
-                filter.put(recordHash);
+                index++;
             }
 
-            final boolean includeZeroRecordFlowFiles = context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).isSet()
-                    ? context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean()
-                    : true;
-
+            duplicateMimeType = duplicatesWriter.getMimeType();
+            nonDuplicateMimeType = nonDuplicatesWriter.getMimeType();
             // Route Non-Duplicates FlowFile
-            final WriteResult nonDuplicatesWriteResult = nonDuplicatesWriter.finishRecordSet();
-            nonDuplicatesWriter.close();
-            Map<String, String> attributes = new HashMap<>();
-            attributes.putAll(nonDuplicatesWriteResult.getAttributes());
-            attributes.put("record.count", String.valueOf(nonDuplicatesWriteResult.getRecordCount()));
-            attributes.put(CoreAttributes.MIME_TYPE.key(), nonDuplicatesWriter.getMimeType());
-            nonDuplicatesFlowFile = session.putAllAttributes(nonDuplicatesFlowFile, attributes);
-            logger.info("Successfully found {} unique records for {}", new Object[] {nonDuplicatesWriteResult.getRecordCount(), nonDuplicatesFlowFile});
+            nonDuplicatesWriteResult = nonDuplicatesWriter.finishRecordSet();
+            // Route Duplicates FlowFile
+            duplicatesWriteResult = duplicatesWriter.finishRecordSet();
 
-            if(!includeZeroRecordFlowFiles && nonDuplicatesWriteResult.getRecordCount() == 0) {
-                session.remove(nonDuplicatesFlowFile);
+        } catch (final Exception e) {
+            logger.error("Failed in detecting duplicate records at index " + index, e);
+            error = true;
+        } finally {
+            if (!error) {
+                final boolean includeZeroRecordFlowFiles = context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean();
+
+                session.adjustCounter("Records Processed",
+                        nonDuplicatesWriteResult.getRecordCount() + duplicatesWriteResult.getRecordCount(), false);
+
+                sendOrRemove(session, duplicatesFlowFile, REL_DUPLICATE, duplicateMimeType,
+                        includeZeroRecordFlowFiles, duplicatesWriteResult);
+
+                sendOrRemove(session, nonDuplicatesFlowFile, REL_NON_DUPLICATE, nonDuplicateMimeType,
+                        includeZeroRecordFlowFiles, nonDuplicatesWriteResult);
+
+                session.transfer(flowFile, REL_ORIGINAL);
             } else {
-                session.transfer(nonDuplicatesFlowFile, REL_NON_DUPLICATE);
+                session.remove(duplicatesFlowFile);
+                session.remove(nonDuplicatesFlowFile);
+                session.transfer(flowFile, REL_FAILURE);
             }
+        }
+    }
 
-            // Route Duplicates FlowFile
-            final WriteResult duplicatesWriteResult = duplicatesWriter.finishRecordSet();
-            duplicatesWriter.close();
-            attributes.clear();
-            attributes.putAll(duplicatesWriteResult.getAttributes());
-            attributes.put("record.count", String.valueOf(duplicatesWriteResult.getRecordCount()));
-            attributes.put(CoreAttributes.MIME_TYPE.key(), duplicatesWriter.getMimeType());
-            duplicatesFlowFile = session.putAllAttributes(nonDuplicatesFlowFile, attributes);
-            logger.info("Successfully found {} duplicate records for {}", new Object[] {duplicatesWriteResult.getRecordCount(), nonDuplicatesFlowFile});
-
-            if(!includeZeroRecordFlowFiles && duplicatesWriteResult.getRecordCount() == 0) {
-                session.remove(duplicatesFlowFile);
-            } else {
-                session.transfer(duplicatesFlowFile, REL_DUPLICATE);
+    private void sendOrRemove(ProcessSession session,
+                              FlowFile outputFlowFile,
+                              Relationship targetRelationship,
+                              String mimeType,
+                              boolean includeZeroRecordFlowFiles,
+                              WriteResult writeResult) {
+        if (!includeZeroRecordFlowFiles && writeResult.getRecordCount() == 0) {
+            session.remove(outputFlowFile);
+        } else {
+            Map<String, String> attributes = new HashMap<>();
+            attributes.putAll(writeResult.getAttributes());
+            attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+            attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType);
+            outputFlowFile = session.putAllAttributes(outputFlowFile, attributes);
+            if (getLogger().isDebugEnabled()) {
+                getLogger().debug("Successfully found {} unique records for {}",
+                        writeResult.getRecordCount(), outputFlowFile);
             }
 
-            session.adjustCounter("Records Processed",
-                    nonDuplicatesWriteResult.getRecordCount() + duplicatesWriteResult.getRecordCount(), false);
+            session.transfer(outputFlowFile, targetRelationship);
+        }
+    }
 
-            if(shouldCacheIdentifier) {
-                CacheValue cacheValue = new CacheValue(serializableFilter, now);
-                cache.put(cacheKey, cacheValue, keySerializer, cacheValueSerializer);
-            }
+    private String executeDynamicRecordPaths(ProcessContext context, Record record, FlowFile flowFile) {
+        final List<String> fieldValues = new ArrayList<>();
+        for (final PropertyDescriptor propertyDescriptor : dynamicProperties) {
+            final String value = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue();
+            final RecordPath recordPath = recordPathCache.getCompiled(value);
+            final RecordPathResult result = recordPath.evaluate(record);
+            final List<FieldValue> selectedFields = result.getSelectedFields().collect(Collectors.toList());
 
-            session.transfer(flowFile, REL_ORIGINAL);
+            fieldValues.add(propertyDescriptor.getName());
 
-        } catch (final Exception e) {
-            logger.error("Failed in detecting duplicate records.", e);
-            session.remove(duplicatesFlowFile);
-            session.remove(nonDuplicatesFlowFile);
-            session.transfer(flowFile, REL_FAILURE);
-            return;
+            fieldValues.addAll(selectedFields.stream()
+                    .map(f -> f.getValue().toString())
+                    .collect(toList())
+            );
         }
+
+        return Joiner.on(JOIN_CHAR).join(fieldValues);
     }
 
     private abstract static class FilterWrapper {
         public static FilterWrapper create(Object filter) {
-            if(filter instanceof HashSet) {
+            if (filter instanceof HashSet) {
                 return new HashSetFilterWrapper((HashSet<String>) filter);
             } else {
                 return new BloomFilterWrapper((BloomFilter<String>) filter);
             }
         }
+
         public abstract boolean contains(String value);
+
         public abstract void put(String value);
     }
 
@@ -600,6 +617,34 @@ public class DetectDuplicateRecord extends AbstractProcessor {
         }
     }
 
+    private static class DistributedMapCacheClientWrapper extends FilterWrapper {
+        private DistributedMapCacheClient client;
+
+        public DistributedMapCacheClientWrapper(DistributedMapCacheClient client) {
+            this.client = client;
+        }
+
+        @Override
+        public boolean contains(String value) {
+            try {
+                return client.containsKey(value, STRING_SERIALIZER);
+            } catch (IOException e) {
+                throw new ProcessException("Distributed Map lookup failed", e);
+            }
+        }
+
+        @Override
+        public void put(String value) {
+            /*
+             * This needs to be a noop because this process will be used upstream of the systems that would write the records
+             * that power the map cache.
+             */
+        }
+    }
+
+    private static final Serializer<String> STRING_SERIALIZER = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8));
+    private static final Serializer<Boolean> BOOLEAN_SERIALIZER = (value, output) -> output.write((byte) (value ? 1 : 0));
+
     private static class CacheValue implements Serializable {
 
         private final Serializable filter;
@@ -618,31 +663,4 @@ public class DetectDuplicateRecord extends AbstractProcessor {
             return entryTimeMS;
         }
     }
-
-    private static class CacheValueSerializer implements Serializer<CacheValue> {
-        @Override
-        public void serialize(CacheValue cacheValue, OutputStream outputStream) throws SerializationException, IOException {
-            new ObjectOutputStream(outputStream).writeObject(cacheValue);
-        }
-    }
-
-    private static class CacheValueDeserializer implements Deserializer<CacheValue> {
-        @Override
-        public CacheValue deserialize(byte[] bytes) throws DeserializationException, IOException {
-            try {
-                return (CacheValue) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
-            } catch (ClassNotFoundException e) {
-                e.printStackTrace();
-            }
-            return null;
-        }
-    }
-
-    private static class StringSerializer implements Serializer<String> {
-
-        @Override
-        public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
-            out.write(value.getBytes(StandardCharsets.UTF_8));
-        }
-    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 0324a5b..160d552 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -26,7 +26,7 @@ org.apache.nifi.processors.standard.CryptographicHashAttribute
 org.apache.nifi.processors.standard.CryptographicHashContent
 org.apache.nifi.processors.standard.DebugFlow
 org.apache.nifi.processors.standard.DetectDuplicate
-org.apache.nifi.processors.standard.DetectDuplicateRecord
+org.apache.nifi.processors.standard.DeduplicateRecord
 org.apache.nifi.processors.standard.DistributeLoad
 org.apache.nifi.processors.standard.DuplicateFlowFile
 org.apache.nifi.processors.standard.EncryptContent
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DeduplicateRecords/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DeduplicateRecords/additionalDetails.html
new file mode 100644
index 0000000..07b1f64
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DeduplicateRecords/additionalDetails.html
@@ -0,0 +1,70 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8"/>
+    <title>DeduplicateRecords</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+    <h1>Overview</h1>
+    <p>This processor provides deduplication across either a single record set file, across several files or even across an entire data lake
+    using a DistributedMapCacheClient controller service. In the case of the former, it uses either a HashSet or a bloom
+    filter to provide extremely fast in-memory calculations with a high degree of accuracy. In the latter use case, it
+    will use the controller service to compare a generated hash against a map cache stored in one of the supported caching
+    options that Apache NiFi offers.</p>
+
+    <h2>Configuring single file deduplication</h2>
+    <p>Choose the "single file" option under the configuration property labeled "Deduplication Strategy." Then choose
+    whether to use a bloom filter or hash set. Be mindful to set size limits that are in line with the average size of the
+    record sets that you process.</p>
+
+    <h2>Configuring multi-file deduplication</h2>
+    <p>Select the "Multiple Files" option under "Deduplication Strategy" and then configure a DistributedMapCacheClient service.
+    It is possible to configure a cache identifier in multiple ways:</p>
+    <ol>
+        <li>Generate a hash of the entire record by specifying no dynamic properties.</li>
+        <li>Generate a hash using dynamic properties to specify particular fields to use.</li>
+        <li>Manually specify a single record path statement in the cache identifier property. Note:
+            <ul>
+                <li>This can be chained with #1 and #2 because it supports expression language and exposes the computed
+                hash from #1 or #2 as the EL variable <em>record.hash.value</em>. Example:
+                <em>concat('${some.var}', -,  '${record.hash.value}')</em>
+                </li>
+            </ul>
+        </li>
+    </ol>
+    <h2>The role of dynamic properties</h2>
+    <p>Dynamic properties should have a human-readable name for the property name and a record path operation for the
+    value. The record path operations will be used to extract values from the record to assemble a unique identifier. Here is an example:</p>
+    <ul>
+        <li>firstName => /FirstName</li>
+        <li>lastName => /LastName</li>
+    </ul>
+    <p>Record:</p>
+    <pre>
+        {
+            "firstName": "John",
+            "lastName": "Smith"
+        }
+    </pre>
+    <p>Will yield an identifier that has "John" and "Smith" in it before a hash is generated from the final value.</p>
+    <p>If any record path is missing, it will cause an exception to be raised and the flowfile will be sent to the
+    failure relationship.</p>
+</body>
+</html>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org/apache/nifi/processors/standard/DetectDuplicateRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org/apache/nifi/processors/standard/DetectDuplicateRecord/additionalDetails.html
deleted file mode 100644
index 8f24302..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org/apache/nifi/processors/standard/DetectDuplicateRecord/additionalDetails.html
+++ /dev/null
@@ -1,96 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-      http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<head>
-    <meta charset="utf-8" />
-    <title>DetectDuplicateRecord</title>
-
-    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
-</head>
-<body>
-    <p>This processor makes use of the NiFi RecordPath Domain-Specific Language (DSL) to allow the user to
-        indicate which field(s) in the Record should be used to determine uniqueness. Users do this by adding
-        a User-defined Property to the Processor's configuration. The name of the User-defined Property must
-        be the RecordPath text that should be evaluated against each Record. All of the values identified by
-        the record paths are hashed together in the order they were specified to derive a unique value
-        representing a single Record. This hashed value is then optionally stored in the cache for
-        subsequent FlowFile processing.</p>
-    <p>If a RecordPath is given and does not match any field in an input Record, that Property will be
-        skipped and all other Properties will still be evaluated. If the RecordPath matches no fields the
-        record will be routed to the 'non-duplicate' relationship. If no User-defined Properties specifying
-        a RecordPath are defined, all field values of the record will be used.</p>
-    <p>After all RecordPath values are resolved, the values are combined in the order of the User-defined
-        Properties and hashed together using the specified hashing algorithm, ensuring constant space per record.</p>
-
-    <h2>Choosing a Filter Type</h2>
-    <p></p>
-    <h2>Examples</h2>
-    <p>Below, we lay out some examples in order to provide clarity about the Processor's behavior.
-        For all of the examples below, consider the example to operate on the following set of 2 (JSON) records:</p>
-    <code>
-    <pre>
-        [
-            {
-                "id": 1,
-                "name": "John",
-                "gender": "M",
-            },
-            {
-                "id": 2,
-                "name": "Susan",
-                "gender": "F",
-            },
-            {
-                "id": 3,
-                "name": "Megan",
-                "gender": "F",
-            },
-            {
-                "id": 2,
-                "name": "Jerry",
-                "gender": "M",
-            },
-        ]
-    </pre>
-    </code>
-
-    <h3>Example 1: Matching on a Single Record Field</h3>
-    <p>A valid property RecordPath mapping would be <em>/id => ${field.value}</em>.</p>
-    <p>For a record set with JSON like that, the records will be evaluated against the <code>id</code> field
-        to determine uniqueness.</p>
-    <ul>
-        <li><strong>non-duplicate:</strong> John, Susan, Megan</li>
-        <li><strong>duplicate:</strong> Jerry</li>
-    </ul>
-
-    <h3>Example 2: Matching on Multiple Record Fields</h3>
-    <p>If we wanted to define these records to be unique based on the <code>id</code> and <code>gender</code> fields,
-        we would specify two RecordPath mappings: <em>/id => ${field.value}</em> and <em>/gender => ${field.value}</em>.</p>
-    <ul>
-        <li><strong>non-duplicate:</strong> John, Susan, Megan, Jerry</li>
-        <li><strong>duplicate:</strong> <em>None</em></li>
-    </ul>
-
-    <h3>Example 3: Matching on All Record Fields</h3>
-    <p>Do not define any RecordPath properties in the processor to use all fields by default.</p>
-    <p>For a record set with JSON like that, the records will be evaluated against the <code>id, name, gender</code>
-        fields to determine uniqueness.</p>
-    <ul>
-        <li><strong>non-duplicate:</strong> John, Susan, Megan, Jerry</li>
-        <li><strong>duplicate:</strong> <em>None</em></li>
-    </ul>
-</body>
-</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/MockCacheService.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/MockCacheService.groovy
deleted file mode 100644
index d0f5d09..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/MockCacheService.groovy
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard
-
-import org.apache.nifi.controller.AbstractControllerService
-import org.apache.nifi.distributed.cache.client.Deserializer
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
-import org.apache.nifi.distributed.cache.client.Serializer
-
-class MockCacheService extends AbstractControllerService implements DistributedMapCacheClient {
-    def map = [:]
-
-    @Override
-    def <K, V> boolean putIfAbsent(K k, V v, Serializer<K> serializer, Serializer<V> serializer1) throws IOException {
-        def retVal = map.containsKey(k)
-        if (retVal) {
-            false
-        } else {
-            map[k] = v
-            true
-        }
-    }
-
-    @Override
-    def <K, V> V getAndPutIfAbsent(K k, V v, Serializer<K> serializer, Serializer<V> serializer1, Deserializer<V> deserializer) throws IOException {
-        return null
-    }
-
-    @Override
-    def <K> boolean containsKey(K k, Serializer<K> serializer) throws IOException {
-        return map.containsKey(k)
-    }
-
-    @Override
-    def <K, V> void put(K k, V v, Serializer<K> serializer, Serializer<V> serializer1) throws IOException {
-
-    }
-
-    @Override
-    def <K, V> V get(K k, Serializer<K> serializer, Deserializer<V> deserializer) throws IOException {
-        return null
-    }
-
-    @Override
-    void close() throws IOException {
-
-    }
-
-    @Override
-    def <K> boolean remove(K k, Serializer<K> serializer) throws IOException {
-        return false
-    }
-
-    @Override
-    long removeByPattern(String s) throws IOException {
-        return 0
-    }
-
-    void assertContains(String key, String value) {
-        assert map.containsKey(key) && map[key] == value
-    }
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java
new file mode 100644
index 0000000..9de152f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestDeduplicateRecord {
+
+    private TestRunner runner;
+    private MockRecordParser reader;
+    private MockRecordWriter writer;
+
+    @BeforeEach
+    public void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(DeduplicateRecord.class);
+
+        // RECORD_READER, RECORD_WRITER
+        reader = new MockRecordParser();
+        writer = new MockRecordWriter("header", false);
+
+        runner.addControllerService("reader", reader);
+        runner.enableControllerService(reader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(DeduplicateRecord.RECORD_READER, "reader");
+        runner.setProperty(DeduplicateRecord.RECORD_WRITER, "writer");
+        runner.setProperty(DeduplicateRecord.RECORD_HASHING_ALGORITHM, DeduplicateRecord.SHA256_ALGORITHM_VALUE);
+
+        reader.addSchemaField("firstName", RecordFieldType.STRING);
+        reader.addSchemaField("middleName", RecordFieldType.STRING);
+        reader.addSchemaField("lastName", RecordFieldType.STRING);
+
+        // INCLUDE_ZERO_RECORD_FLOWFILES
+        runner.setProperty(DeduplicateRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "true");
+
+        runner.assertValid();
+    }
+
+    void commonEnqueue() {
+        final Map<String, String> props = new HashMap<>();
+        props.put("hash.value", "1000");
+        runner.enqueue(new byte[]{}, props);
+    }
+
+    @Test
+    public void testInvalidRecordPathCausesValidationError() {
+        runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.HASH_SET_VALUE);
+        runner.setProperty("middle_name", "//////middleName");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testDetectDuplicatesHashSet() {
+        commonEnqueue();
+
+        runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.HASH_SET_VALUE);
+        runner.setProperty("middle_name", "/middleName");
+        reader.addRecord("John", "Q", "Smith");
+        reader.addRecord("John", "Q", "Smith");
+        reader.addRecord("Jane", "X", "Doe");
+
+        runner.enqueue("");
+        runner.run();
+
+        doCountTests(0, 1, 1, 1, 2, 1);
+    }
+
+    @Test
+    public void testDetectDuplicatesBloomFilter() {
+        commonEnqueue();
+        runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.BLOOM_FILTER_VALUE);
+        runner.setProperty(DeduplicateRecord.BLOOM_FILTER_FPP, "0.10");
+        runner.setProperty("middle_name", "/middleName");
+        reader.addRecord("John", "Q", "Smith");
+        reader.addRecord("John", "Q", "Smith");
+        reader.addRecord("Jane", "X", "Doe");
+
+        runner.enqueue("");
+        runner.run();
+
+        doCountTests(0, 1, 1, 1, 2, 1);
+    }
+
+    @Test
+    public void testNoDuplicatesHashSet() {
+        commonEnqueue();
+        runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.HASH_SET_VALUE);
+        runner.setProperty("middle_name", "/middleName");
+        reader.addRecord("John", "Q", "Smith");
+        reader.addRecord("Jack", "Z", "Brown");
+        reader.addRecord("Jane", "X", "Doe");
+
+        runner.enqueue("");
+        runner.run();
+
+        doCountTests(0, 1, 1, 1, 3, 0);
+    }
+
+    @Test
+    public void testNoDuplicatesBloomFilter() {
+        commonEnqueue();
+        runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.BLOOM_FILTER_VALUE);
+        runner.setProperty(DeduplicateRecord.BLOOM_FILTER_FPP, "0.10");
+        runner.setProperty("middle_name", "/middleName");
+        reader.addRecord("John", "Q", "Smith");
+        reader.addRecord("Jack", "Z", "Brown");
+        reader.addRecord("Jane", "X", "Doe");
+
+        runner.enqueue("");
+        runner.run();
+
+        doCountTests(0, 1, 1, 1, 3, 0);
+    }
+
+    @Test
+    public void testAllDuplicates() {
+        commonEnqueue();
+        reader.addRecord("John", "Q", "Smith");
+        reader.addRecord("John", "Q", "Smith");
+        reader.addRecord("John", "Q", "Smith");
+
+        runner.enqueue("");
+        runner.run();
+
+        doCountTests(0, 1, 1, 1, 1, 2);
+    }
+
+    @Test
+    public void testAllUnique() {
+        commonEnqueue();
+        reader.addRecord("John", "Q", "Smith");
+        reader.addRecord("Jack", "Z", "Brown");
+        reader.addRecord("Jane", "X", "Doe");
+
+        runner.enqueue("");
+        runner.run();
+
+        doCountTests(0, 1, 1, 1, 3, 0);
+    }
+
+    @Test
+    public void testCacheValueFromRecordPath() {
+        commonEnqueue();
+        reader.addRecord("John", "Q", "Smith");
+        reader.addRecord("Jack", "Z", "Brown");
+        reader.addRecord("Jack", "Z", "Brown");
+
+        runner.enqueue("");
+        runner.run();
+
+        doCountTests(0, 1, 1, 1, 2, 1);
+    }
+
+    /*
+     * These are all related to NIFI-6014
+     */
+
+    @Test
+    public void testMultipleFileDeduplicationRequiresDMC() {
+        runner.setProperty(DeduplicateRecord.DEDUPLICATION_STRATEGY, DeduplicateRecord.OPTION_MULTIPLE_FILES.getValue());
+        runner.assertNotValid();
+    }
+
+    public static final String FIRST_KEY = DigestUtils.sha256Hex(String.join(String.valueOf(DeduplicateRecord.JOIN_CHAR), Arrays.asList(
+            "John", "Q", "Smith"
+    )));
+    public static final String SECOND_KEY = DigestUtils.sha256Hex(String.join(String.valueOf(DeduplicateRecord.JOIN_CHAR), Arrays.asList(
+            "Jack", "Z", "Brown"
+    )));
+
+    @Test
+    public void testDeduplicateWithDMC() throws Exception {
+        DistributedMapCacheClient dmc = new MockCacheService<>();
+        runner.addControllerService("dmc", dmc);
+        runner.setProperty(DeduplicateRecord.DISTRIBUTED_MAP_CACHE, "dmc");
+        runner.setProperty(DeduplicateRecord.DEDUPLICATION_STRATEGY, DeduplicateRecord.OPTION_MULTIPLE_FILES.getValue());
+        runner.enableControllerService(dmc);
+        runner.assertValid();
+
+        dmc.put(FIRST_KEY, true, null, null);
+        dmc.put(SECOND_KEY, true, null, null);
+
+        reader.addRecord("John", "Q", "Smith");
+        reader.addRecord("Jack", "Z", "Brown");
+        reader.addRecord("Jack", "Z", "Brown");
+        reader.addRecord("Jane", "X", "Doe");
+
+        runner.enqueue("");
+        runner.run();
+
+        doCountTests(0, 1, 1, 1, 1, 3);
+    }
+
+    @Test
+    public void testDeduplicateWithDMCAndCacheIdentifier() throws Exception {
+        DistributedMapCacheClient dmc = new MockCacheService<>();
+        runner.addControllerService("dmc", dmc);
+        runner.setProperty(DeduplicateRecord.DISTRIBUTED_MAP_CACHE, "dmc");
+        runner.setProperty(DeduplicateRecord.DEDUPLICATION_STRATEGY, DeduplicateRecord.OPTION_MULTIPLE_FILES.getValue());
+        runner.setProperty(DeduplicateRecord.CACHE_IDENTIFIER, "concat('${user.name}', '${record.hash.value}')");
+        runner.enableControllerService(dmc);
+        runner.assertValid();
+
+        dmc.put(String.format("john.smith-%s", FIRST_KEY), true, null, null);
+        dmc.put(String.format("john.smith-%s", SECOND_KEY), true, null, null);
+
+        reader.addRecord("John", "Q", "Smith");
+        reader.addRecord("Jack", "Z", "Brown");
+        reader.addRecord("Jack", "Z", "Brown");
+        reader.addRecord("Jane", "X", "Doe");
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put("user.name", "john.smith-");
+
+        runner.enqueue("", attrs);
+        runner.run();
+
+        doCountTests(0, 1, 1, 1, 1, 3);
+    }
+
+    void doCountTests(int failure, int original, int duplicates, int notDuplicates, int notDupeCount, int dupeCount) {
+        runner.assertTransferCount(DeduplicateRecord.REL_DUPLICATE, duplicates);
+        runner.assertTransferCount(DeduplicateRecord.REL_NON_DUPLICATE, notDuplicates);
+        runner.assertTransferCount(DeduplicateRecord.REL_ORIGINAL, original);
+        runner.assertTransferCount(DeduplicateRecord.REL_FAILURE, failure);
+
+        List<MockFlowFile> duplicateFlowFile = runner.getFlowFilesForRelationship(DeduplicateRecord.REL_DUPLICATE);
+        if (duplicateFlowFile != null) {
+            assertEquals(String.valueOf(dupeCount), duplicateFlowFile.get(0).getAttribute("record.count"));
+        }
+
+        List<MockFlowFile> nonDuplicateFlowFile = runner.getFlowFilesForRelationship(DeduplicateRecord.REL_NON_DUPLICATE);
+        if (nonDuplicateFlowFile != null) {
+            assertEquals(String.valueOf(notDupeCount), nonDuplicateFlowFile.get(0).getAttribute("record.count"));
+        }
+    }
+
+    private static final class MockCacheService<K, V> extends AbstractControllerService implements DistributedMapCacheClient {
+        private Map storage;
+
+        public MockCacheService() {
+            storage = new HashMap<>();
+        }
+
+        @Override
+        public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+            return false;
+        }
+
+        @Override
+        public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
+            return null;
+        }
+
+        @Override
+        public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+            return storage.containsKey(key);
+        }
+
+        @Override
+        public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+            storage.put(key, value);
+        }
+
+        @Override
+        public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+            return null;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+
+        @Override
+        public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+            return false;
+        }
+
+        @Override
+        public long removeByPattern(String regex) throws IOException {
+            return 0;
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicateRecord.java
deleted file mode 100644
index 6855857..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicateRecord.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.record.MockRecordParser;
-import org.apache.nifi.serialization.record.MockRecordWriter;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.util.*;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.nifi.processors.standard.DetectDuplicateRecord.*;
-import static org.junit.Assert.assertEquals;
-
-public class TestDetectDuplicateRecord {
-
-    private TestRunner runner;
-    private MockCacheService cache;
-    private MockRecordParser reader;
-    private MockRecordWriter writer;
-
-    @BeforeClass
-    public static void beforeClass() {
-        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
-        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DetectDuplicateRecord", "debug");
-        System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestDetectDuplicateRecord", "debug");
-    }
-
-    @Before
-    public void setup() throws InitializationException {
-        runner = TestRunners.newTestRunner(DetectDuplicateRecord.class);
-
-        // RECORD_READER, RECORD_WRITER
-        reader = new MockRecordParser();
-        writer = new MockRecordWriter("header", false);
-
-        runner.addControllerService("reader", reader);
-        runner.enableControllerService(reader);
-        runner.addControllerService("writer", writer);
-        runner.enableControllerService(writer);
-
-        runner.setProperty(RECORD_READER, "reader");
-        runner.setProperty(RECORD_WRITER, "writer");
-
-        reader.addSchemaField("firstName", RecordFieldType.STRING);
-        reader.addSchemaField("middleName", RecordFieldType.STRING);
-        reader.addSchemaField("lastName", RecordFieldType.STRING);
-
-        // INCLUDE_ZERO_RECORD_FLOWFILES
-        runner.setProperty(INCLUDE_ZERO_RECORD_FLOWFILES, "true");
-
-        // CACHE_IDENTIFIER
-        runner.setProperty(CACHE_IDENTIFIER, "true");
-
-        // DISTRIBUTED_CACHE_SERVICE
-        cache = new MockCacheService();
-        runner.addControllerService("cache", cache);
-        runner.setProperty(DISTRIBUTED_CACHE_SERVICE, "cache");
-        runner.enableControllerService(cache);
-
-        // CACHE_ENTRY_IDENTIFIER
-        final Map<String, String> props = new HashMap<>();
-        props.put("hash.value", "1000");
-        runner.enqueue(new byte[]{}, props);
-
-        // AGE_OFF_DURATION
-        runner.setProperty(AGE_OFF_DURATION, "48 hours");
-
-        runner.assertValid();
-    }
-
-     @Test
-     public void testDetectDuplicatesHashSet() {
-        runner.setProperty(FILTER_TYPE, HASH_SET_VALUE);
-        runner.setProperty("/middleName", "${field.value}");
-        reader.addRecord("John", "Q", "Smith");
-        reader.addRecord("John", "Q", "Smith");
-        reader.addRecord("Jane", "X", "Doe");
-
-        runner.enqueue("");
-        runner.run();
-
-        doCountTests(0, 1, 1, 1, 2, 1);
-    }
-
-    @Test
-    public void testDetectDuplicatesBloomFilter() {
-        runner.setProperty(FILTER_TYPE, BLOOM_FILTER_VALUE);
-        runner.setProperty(BLOOM_FILTER_FPP, "0.10");
-        runner.setProperty("/middleName", "${field.value}");
-        reader.addRecord("John", "Q", "Smith");
-        reader.addRecord("John", "Q", "Smith");
-        reader.addRecord("Jane", "X", "Doe");
-
-        runner.enqueue("");
-        runner.run();
-
-        doCountTests(0, 1, 1, 1, 2, 1);
-    }
-
-    @Test
-    public void testNoDuplicatesHashSet() {
-        runner.setProperty(FILTER_TYPE, HASH_SET_VALUE);
-        runner.setProperty("/middleName", "${field.value}");
-        reader.addRecord("John", "Q", "Smith");
-        reader.addRecord("Jack", "Z", "Brown");
-        reader.addRecord("Jane", "X", "Doe");
-
-        runner.enqueue("");
-        runner.run();
-
-        doCountTests(0, 1, 1, 1, 3, 0);
-    }
-
-    @Test
-    public void testNoDuplicatesBloomFilter() {
-        runner.setProperty(FILTER_TYPE, BLOOM_FILTER_VALUE);
-        runner.setProperty(BLOOM_FILTER_FPP, "0.10");
-        runner.setProperty("/middleName", "${field.value}");
-        reader.addRecord("John", "Q", "Smith");
-        reader.addRecord("Jack", "Z", "Brown");
-        reader.addRecord("Jane", "X", "Doe");
-
-        runner.enqueue("");
-        runner.run();
-
-        doCountTests(0, 1, 1, 1, 3, 0);
-    }
-
-    @Test
-    public void testAllDuplicates() {
-        reader.addRecord("John", "Q", "Smith");
-        reader.addRecord("John", "Q", "Smith");
-        reader.addRecord("John", "Q", "Smith");
-
-        runner.enqueue("");
-        runner.run();
-
-        doCountTests(0, 1, 1, 0, 1, 2);
-    }
-
-    @Test
-    public void testAllUnique() {
-        reader.addRecord("John", "Q", "Smith");
-        reader.addRecord("Jack", "Z", "Brown");
-        reader.addRecord("Jane", "X", "Doe");
-
-        runner.enqueue("");
-        runner.run();
-
-        doCountTests(0, 1, 1, 1, 3, 0);
-    }
-
-
-
-    @Test
-    public void testCacheValueFromRecordPath() {
-        runner.setProperty(CACHE_ENTRY_IDENTIFIER, "Users");
-        reader.addRecord("John", "Q", "Smith");
-        reader.addRecord("Jack", "Z", "Brown");
-        reader.addRecord("Jane", "X", "Doe");
-
-        runner.enqueue("");
-        runner.run();
-
-        doCountTests(0, 1, 1, 1, 2, 1);
-
-        cache.assertContains("KEY", "VALUE"); // TODO: Get the tests running so you can see what the key/value is in serialized form
-    }
-
-    void doCountTests(int failure, int original, int duplicates, int notDuplicates, int notDupeCount, int dupeCount) {
-        runner.assertTransferCount(REL_DUPLICATE, duplicates);
-        runner.assertTransferCount(REL_NON_DUPLICATE, notDuplicates);
-        runner.assertTransferCount(REL_ORIGINAL, original);
-        runner.assertTransferCount(REL_FAILURE, failure);
-
-        List<MockFlowFile> duplicateFlowFile = runner.getFlowFilesForRelationship(REL_DUPLICATE);
-        if (duplicateFlowFile != null) {
-            assertEquals(String.valueOf(dupeCount), duplicateFlowFile.get(0).getAttribute("record.count"));
-        }
-
-        List<MockFlowFile> nonDuplicateFlowFile = runner.getFlowFilesForRelationship(REL_NON_DUPLICATE);
-        if (nonDuplicateFlowFile != null) {
-            assertEquals(String.valueOf(notDupeCount), nonDuplicateFlowFile.get(0).getAttribute("record.count"));
-        }
-    }
-}