You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/03/10 00:08:01 UTC
[nifi] 02/02: NIFI-6047 Cleaned up code to allow tests to run against 1.13.0-snapshot Removed DMC. NIFI-6047 Started integrating changes from NIFI-6014. NIFI-6047 Added DMC tests. NIFI-6047 Added cache identifier recordpath test. NIFI-6047 Added additional details. NIFI-6047 Removed old additional details. NIFI-6047 made some changes requested in a follow up review. NIFI-6047 latest. NIFI-6047 Finished updates First round of code review cleanup Latest Removed EL from the dynamic properties. Finished cod [...]
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit df00cc6cb576c11ae3ef0f1c6f64454598298936
Author: Mike Thomsen <mt...@apache.org>
AuthorDate: Thu Oct 29 13:52:12 2020 -0400
NIFI-6047 Cleaned up code to allow tests to run against 1.13.0-snapshot
Removed DMC.
NIFI-6047 Started integrating changes from NIFI-6014.
NIFI-6047 Added DMC tests.
NIFI-6047 Added cache identifier recordpath test.
NIFI-6047 Added additional details.
NIFI-6047 Removed old additional details.
NIFI-6047 made some changes requested in a follow up review.
NIFI-6047 latest.
NIFI-6047 Finished updates
First round of code review cleanup
Latest
Removed EL from the dynamic properties.
Finished code review requested refactoring.
Checkstyle fix.
Removed a Java 11 API
NIFI-6047 Renamed processor to DeduplicateRecord
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #4646
---
...DuplicateRecord.java => DeduplicateRecord.java} | 562 +++++++++++----------
.../services/org.apache.nifi.processor.Processor | 2 +-
.../additionalDetails.html | 70 +++
.../DetectDuplicateRecord/additionalDetails.html | 96 ----
.../processors/standard/MockCacheService.groovy | 77 ---
.../processors/standard/TestDeduplicateRecord.java | 321 ++++++++++++
.../standard/TestDetectDuplicateRecord.java | 209 --------
7 files changed, 682 insertions(+), 655 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeduplicateRecord.java
similarity index 53%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicateRecord.java
rename to nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeduplicateRecord.java
index 191a675..b055a75 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DeduplicateRecord.java
@@ -22,94 +22,111 @@ import com.google.common.hash.Funnels;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.codec.digest.MessageDigestAlgorithms;
-import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.*;
-import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
-import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
-import org.apache.nifi.distributed.cache.client.exception.SerializationException;
-import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.*;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
-import org.apache.nifi.record.path.validation.RecordPathPropertyNameValidator;
import org.apache.nifi.record.path.validation.RecordPathValidator;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.*;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
-import java.io.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.codec.binary.StringUtils.getBytesUtf8;
-import static org.apache.commons.lang3.StringUtils.*;
@EventDriven
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@SystemResourceConsideration(resource = SystemResource.MEMORY,
- description = "The HashSet filter type will grow memory space proportionate to the number of unique records processed. " +
- "The BloomFilter type will use constant memory regardless of the number of records processed.")
-@Tags({"text", "record", "update", "change", "replace", "modify", "distinct", "unique",
- "filter", "hash", "dupe", "duplicate", "dedupe"})
-@CapabilityDescription("Caches records from each incoming FlowFile and determines if the record " +
- "has already been seen. If so, routes the record to 'duplicate'. If the record is " +
- "not determined to be a duplicate, it is routed to 'non-duplicate'."
+ description = "The HashSet filter type will grow memory space proportionate to the number of unique records processed. " +
+ "The BloomFilter type will use constant memory regardless of the number of records processed.")
+@SystemResourceConsideration(resource = SystemResource.CPU,
+ description = "If a more advanced hash algorithm is chosen, the amount of time required to hash any particular " +
+ "record could increase substantially."
)
+@Tags({"text", "record", "update", "change", "replace", "modify", "distinct", "unique",
+ "filter", "hash", "dupe", "duplicate", "dedupe"})
+@CapabilityDescription("This processor attempts to deduplicate a record set in memory using either a hashset or a bloom filter. " +
+ "It operates on a per-file basis rather than across an entire data set that spans multiple files.")
@WritesAttribute(attribute = "record.count", description = "The number of records processed.")
@DynamicProperty(
- name = "RecordPath",
- value = "An expression language statement used to determine how the RecordPath is resolved. " +
- "The following variables are availble: ${field.name}, ${field.value}, ${field.type}",
- description = "The name of each user-defined property must be a valid RecordPath.")
+ name = "RecordPath",
+ value = "An expression language statement used to determine how the RecordPath is resolved. " +
+ "The following variables are availible: ${field.name}, ${field.value}, ${field.type}",
+ description = "The name of each user-defined property must be a valid RecordPath.")
@SeeAlso(classNames = {
- "org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService",
- "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
- "org.apache.nifi.processors.standard.DetectDuplicate"
+ "org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService",
+ "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
+ "org.apache.nifi.processors.standard.DetectDuplicate"
})
-public class DetectDuplicateRecord extends AbstractProcessor {
+public class DeduplicateRecord extends AbstractProcessor {
+ public static final char JOIN_CHAR = '~';
private static final String FIELD_NAME = "field.name";
private static final String FIELD_VALUE = "field.value";
private static final String FIELD_TYPE = "field.type";
private volatile RecordPathCache recordPathCache;
- private volatile List<String> recordPaths;
+ private volatile List<PropertyDescriptor> dynamicProperties;
// VALUES
static final AllowableValue NONE_ALGORITHM_VALUE = new AllowableValue("none", "None",
"Do not use a hashing algorithm. The value of resolved RecordPaths will be combined with tildes (~) to form the unique record key. " +
"This may use significantly more storage depending on the size and shape or your data.");
- static final AllowableValue MD5_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.MD5, "MD5",
- "The MD5 message-digest algorithm.");
- static final AllowableValue SHA1_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA_1, "SHA-1",
- "The SHA-1 cryptographic hash algorithm.");
- static final AllowableValue SHA256_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA3_256, "SHA-256",
+ static final AllowableValue SHA256_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA_256, "SHA-256",
"The SHA-256 cryptographic hash algorithm.");
- static final AllowableValue SHA512_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA3_512, "SHA-512",
+ static final AllowableValue SHA512_ALGORITHM_VALUE = new AllowableValue(MessageDigestAlgorithms.SHA_512, "SHA-512",
"The SHA-512 cryptographic hash algorithm.");
static final AllowableValue HASH_SET_VALUE = new AllowableValue("hash-set", "HashSet",
@@ -139,56 +156,54 @@ public class DetectDuplicateRecord extends AbstractProcessor {
.required(true)
.build();
- static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
- .name("include-zero-record-flowfiles")
- .displayName("Include Zero Record FlowFiles")
- .description("When converting an incoming FlowFile, if the conversion results in no data, "
- + "this property specifies whether or not a FlowFile will be sent to the corresponding relationship")
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .allowableValues("true", "false")
- .defaultValue("true")
- .required(true)
- .build();
-
- static final PropertyDescriptor CACHE_IDENTIFIER = new PropertyDescriptor.Builder()
- .name("cache-the-entry-identifier")
- .displayName("Cache The Entry Identifier")
- .description("When true this cause the processor to check for duplicates and cache the Entry Identifier. When false, "
- + "the processor would only check for duplicates and not cache the Entry Identifier, requiring another "
- + "processor to add identifiers to the distributed cache.")
+ static final AllowableValue OPTION_SINGLE_FILE = new AllowableValue("single", "Single File");
+ static final AllowableValue OPTION_MULTIPLE_FILES = new AllowableValue("multiple", "Multiple Files");
+
+ static final PropertyDescriptor DEDUPLICATION_STRATEGY = new PropertyDescriptor.Builder()
+ .name("deduplication-strategy")
+ .displayName("Deduplication Strategy")
+ .description("The strategy to use for detecting and isolating duplicate records. The option for doing it " +
+ "across a single data file will operate in memory, whereas the one for going across the enter repository " +
+ "will require a distributed map cache.")
+ .allowableValues(OPTION_SINGLE_FILE, OPTION_MULTIPLE_FILES)
+ .defaultValue(OPTION_SINGLE_FILE.getValue())
.required(true)
- .allowableValues("true", "false")
- .defaultValue("true")
.build();
- static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
- .name("distributed-cache-service")
- .displayName("Distributed Cache Service")
- .description("The Controller Service that is used to cache unique records, used to determine duplicates")
- .required(false)
+ static final PropertyDescriptor DISTRIBUTED_MAP_CACHE = new PropertyDescriptor.Builder()
+ .name("distributed-map-cache")
+ .displayName("Distributed Map Cache client")
+ .description("This configuration is required when the deduplication strategy is set to 'multiple files.' The map " +
+ "cache will be used to check a data source such as HBase or Redis for entries indicating that a record has " +
+ "been processed before. This option requires a downstream process that uses PutDistributedMapCache to write " +
+ "an entry to the cache data source once the record has been processed to indicate that it has been handled before.")
.identifiesControllerService(DistributedMapCacheClient.class)
+ .required(false)
+ .addValidator(Validator.VALID)
+ .dependsOn(DEDUPLICATION_STRATEGY, OPTION_MULTIPLE_FILES)
.build();
- static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
- .name("cache-entry-identifier")
- .displayName("Cache Entry Identifier")
- .description(
- "A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated " +
- "against a FlowFile in order to determine the cached filter type value used to identify duplicates.")
- .required(false)
- .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
- .defaultValue("${hash.value}")
+ static final PropertyDescriptor CACHE_IDENTIFIER = new PropertyDescriptor.Builder()
+ .name("cache-identifier")
+ .displayName("Cache Identifier")
+ .description("This option defines a record path operation to use for defining the cache identifier. It can be used " +
+ "in addition to the hash settings. This field will have the expression language attribute \"record.hash.value\" " +
+ "available to it to use with it to generate the record path operation.")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .addValidator(Validator.VALID)
+ .dependsOn(DEDUPLICATION_STRATEGY, OPTION_MULTIPLE_FILES)
.build();
- static final PropertyDescriptor AGE_OFF_DURATION = new PropertyDescriptor.Builder()
- .name("age-off-duration")
- .displayName("Age Off Duration")
- .description("Time interval to age off cached filter entries. When the cache expires, the entire filter and its values " +
- "are destroyed. Leaving this value empty will cause the cached entries to never expire but may eventually be rotated " +
- "out when the cache servers rotation policy automatically expires entries.")
- .required(false)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder()
+ .name("include-zero-record-flowfiles")
+ .displayName("Include Zero Record FlowFiles")
+ .description("When converting an incoming FlowFile, if the conversion results in no data, "
+ + "this property specifies whether or not a FlowFile will be sent to the corresponding relationship")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
.build();
static final PropertyDescriptor RECORD_HASHING_ALGORITHM = new PropertyDescriptor.Builder()
@@ -197,12 +212,10 @@ public class DetectDuplicateRecord extends AbstractProcessor {
.description("The algorithm used to hash the combined set of resolved RecordPath values for cache storage.")
.allowableValues(
NONE_ALGORITHM_VALUE,
- MD5_ALGORITHM_VALUE,
- SHA1_ALGORITHM_VALUE,
SHA256_ALGORITHM_VALUE,
SHA512_ALGORITHM_VALUE
)
- .defaultValue(SHA1_ALGORITHM_VALUE.getValue())
+ .defaultValue(SHA256_ALGORITHM_VALUE.getValue())
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
@@ -210,12 +223,16 @@ public class DetectDuplicateRecord extends AbstractProcessor {
static final PropertyDescriptor FILTER_TYPE = new PropertyDescriptor.Builder()
.name("filter-type")
.displayName("Filter Type")
- .description("The filter used to determine whether a record has been seen before based on the matching RecordPath criteria.")
+ .description("The filter used to determine whether a record has been seen before based on the matching RecordPath " +
+ "criteria. If hash set is selected, a Java HashSet object will be used to deduplicate all encountered " +
+ "records. If the bloom filter option is selected, a bloom filter will be used. The bloom filter option is " +
+ "less memory intensive, but has a chance of having false positives.")
.allowableValues(
HASH_SET_VALUE,
BLOOM_FILTER_VALUE
)
.defaultValue(HASH_SET_VALUE.getValue())
+ .dependsOn(DEDUPLICATION_STRATEGY, OPTION_SINGLE_FILE)
.required(true)
.build();
@@ -227,6 +244,7 @@ public class DetectDuplicateRecord extends AbstractProcessor {
.defaultValue("25000")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .dependsOn(FILTER_TYPE, BLOOM_FILTER_VALUE)
.required(true)
.build();
@@ -269,20 +287,15 @@ public class DetectDuplicateRecord extends AbstractProcessor {
private Set<Relationship> relationships;
- private final Serializer<String> keySerializer = new StringSerializer();
- private final Serializer<CacheValue> cacheValueSerializer = new CacheValueSerializer();
- private final Deserializer<CacheValue> cacheValueDeserializer = new CacheValueDeserializer();
-
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(DEDUPLICATION_STRATEGY);
+ descriptors.add(DISTRIBUTED_MAP_CACHE);
+ descriptors.add(CACHE_IDENTIFIER);
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(INCLUDE_ZERO_RECORD_FLOWFILES);
- descriptors.add(CACHE_IDENTIFIER);
- descriptors.add(CACHE_ENTRY_IDENTIFIER);
- descriptors.add(AGE_OFF_DURATION);
- descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(RECORD_HASHING_ALGORITHM);
descriptors.add(FILTER_TYPE);
descriptors.add(FILTER_CAPACITY_HINT);
@@ -318,24 +331,20 @@ public class DetectDuplicateRecord extends AbstractProcessor {
"to access information about the field and the value of the field being evaluated.")
.required(false)
.dynamic(true)
+ .addValidator(new RecordPathValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(new RecordPathPropertyNameValidator())
.build();
}
@Override
- protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ protected Collection<ValidationResult> customValidate(final ValidationContext context) {
RecordPathValidator recordPathValidator = new RecordPathValidator();
- final List<ValidationResult> validationResults = validationContext.getProperties().keySet().stream()
- .filter(PropertyDescriptor::isDynamic)
- .map(property -> recordPathValidator.validate(
- "User-defined Properties",
- property.getName(),
- validationContext
- )).collect(Collectors.toList());
-
- if(validationContext.getProperty(BLOOM_FILTER_FPP).isSet()) {
- final double falsePositiveProbability = validationContext.getProperty(BLOOM_FILTER_FPP).asDouble();
+ List<ValidationResult> validationResults = new ArrayList<>();
+
+ boolean useSingleFile = context.getProperty(DEDUPLICATION_STRATEGY).getValue().equals(OPTION_SINGLE_FILE.getValue());
+
+ if (useSingleFile && context.getProperty(BLOOM_FILTER_FPP).isSet()) {
+ final double falsePositiveProbability = context.getProperty(BLOOM_FILTER_FPP).asDouble();
if (falsePositiveProbability < 0 || falsePositiveProbability > 1) {
validationResults.add(
new ValidationResult.Builder()
@@ -344,42 +353,62 @@ public class DetectDuplicateRecord extends AbstractProcessor {
.explanation("Valid values are 0.0 - 1.0 inclusive")
.valid(false).build());
}
- }
-
- if(validationContext.getProperty(CACHE_IDENTIFIER).asBoolean()) {
- if(!validationContext.getProperty(DISTRIBUTED_CACHE_SERVICE).isSet())
+ } else if (!useSingleFile) {
+ if (!context.getProperty(DISTRIBUTED_MAP_CACHE).isSet()) {
validationResults.add(new ValidationResult.Builder()
- .subject(DISTRIBUTED_CACHE_SERVICE.getName())
- .explanation(DISTRIBUTED_CACHE_SERVICE.getName() + " is required when " + CACHE_IDENTIFIER.getName() + " is true.")
- .valid(false).build());
-
- if(!validationContext.getProperty(CACHE_ENTRY_IDENTIFIER).isSet())
- validationResults.add(new ValidationResult.Builder()
- .subject(CACHE_ENTRY_IDENTIFIER.getName())
- .explanation(CACHE_ENTRY_IDENTIFIER.getName() + " is required when " + CACHE_IDENTIFIER.getName() + " is true.")
- .valid(false).build());
-
- if(!validationContext.getProperty(AGE_OFF_DURATION).isSet())
- validationResults.add(new ValidationResult.Builder()
- .subject(AGE_OFF_DURATION.getName())
- .explanation(AGE_OFF_DURATION.getName() + " is required when " + CACHE_IDENTIFIER.getName() + " is true.")
+ .subject(DISTRIBUTED_MAP_CACHE.getName())
+ .explanation("Multiple files deduplication was chosen, but a distributed map cache client was " +
+ "not configured")
.valid(false).build());
+ }
}
return validationResults;
}
- @OnScheduled
- public void compileRecordPaths(final ProcessContext context) {
- final List<String> recordPaths = new ArrayList<>();
+ private DistributedMapCacheClient mapCacheClient;
+ private RecordReaderFactory readerFactory;
+ private RecordSetWriterFactory writerFactory;
- recordPaths.addAll(context.getProperties().keySet().stream()
+ private boolean useInMemoryStrategy;
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ dynamicProperties = context.getProperties().keySet().stream()
.filter(PropertyDescriptor::isDynamic)
- .map(PropertyDescriptor::getName)
- .collect(toList()));
+ .collect(Collectors.toList());
+
+ int cacheSize = dynamicProperties.size();
+
+ recordPathCache = new RecordPathCache(cacheSize);
- recordPathCache = new RecordPathCache(recordPaths.size());
- this.recordPaths = recordPaths;
+ if (context.getProperty(DISTRIBUTED_MAP_CACHE).isSet()) {
+ mapCacheClient = context.getProperty(DISTRIBUTED_MAP_CACHE).asControllerService(DistributedMapCacheClient.class);
+ }
+
+ readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ String strategy = context.getProperty(DEDUPLICATION_STRATEGY).getValue();
+
+ useInMemoryStrategy = strategy.equals(OPTION_SINGLE_FILE.getValue());
+ }
+
+ private FilterWrapper getFilter(ProcessContext context) {
+ if (useInMemoryStrategy) {
+ boolean useHashSet = context.getProperty(FILTER_TYPE).getValue()
+ .equals(context.getProperty(HASH_SET_VALUE.getValue()));
+ final int filterCapacity = context.getProperty(FILTER_CAPACITY_HINT).asInteger();
+ return useHashSet
+ ? new HashSetFilterWrapper(new HashSet<>(filterCapacity))
+ : new BloomFilterWrapper(BloomFilter.create(
+ Funnels.stringFunnel(Charset.defaultCharset()),
+ filterCapacity,
+ context.getProperty(BLOOM_FILTER_FPP).asDouble()
+ ));
+ } else {
+ return new DistributedMapCacheClientWrapper(mapCacheClient);
+ }
}
@Override
@@ -390,44 +419,29 @@ public class DetectDuplicateRecord extends AbstractProcessor {
}
final ComponentLog logger = getLogger();
- final String cacheKey = context.getProperty(CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
-
- if (isBlank(cacheKey)) {
- logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
- session.transfer(session.penalize(flowFile), REL_FAILURE);
- return;
- }
FlowFile nonDuplicatesFlowFile = session.create(flowFile);
FlowFile duplicatesFlowFile = session.create(flowFile);
- try {
- final long now = System.currentTimeMillis();
- final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-
- final boolean shouldCacheIdentifier = context.getProperty(CACHE_IDENTIFIER).asBoolean();
- final int filterCapacity = context.getProperty(FILTER_CAPACITY_HINT).asInteger();
- Serializable serializableFilter = context.getProperty(FILTER_TYPE).getValue()
- .equals(context.getProperty(HASH_SET_VALUE.getValue()))
- ? new HashSet<String>(filterCapacity)
- : BloomFilter.create(
- Funnels.stringFunnel(Charset.defaultCharset()),
- filterCapacity,
- context.getProperty(BLOOM_FILTER_FPP).asDouble());
-
- if(shouldCacheIdentifier && cache.containsKey(cacheKey, keySerializer)) {
- CacheValue cacheValue = cache.get(cacheKey, keySerializer, cacheValueDeserializer);
- Long durationMS = context.getProperty(AGE_OFF_DURATION).asTimePeriod(TimeUnit.MILLISECONDS);
-
- if(durationMS != null && (now >= cacheValue.getEntryTimeMS() + durationMS)) {
- boolean status = cache.remove(cacheKey, keySerializer);
- logger.debug("Removal of expired cached entry with key {} returned {}", new Object[]{cacheKey, status});
- } else {
- serializableFilter = cacheValue.getFilter();
- }
- }
-
- final FilterWrapper filter = FilterWrapper.create(serializableFilter);
+ long index = 0;
+
+ WriteResult nonDuplicatesWriteResult = null;
+ WriteResult duplicatesWriteResult = null;
+ String duplicateMimeType = null;
+ String nonDuplicateMimeType = null;
+
+ boolean error = false;
+ try (
+ final InputStream inputStream = session.read(flowFile);
+ final RecordReader reader = readerFactory.createRecordReader(flowFile, inputStream, logger);
+ final OutputStream nonDupeStream = session.write(nonDuplicatesFlowFile);
+ final OutputStream dupeStream = session.write(duplicatesFlowFile);
+ final RecordSetWriter nonDuplicatesWriter = writerFactory
+ .createWriter(getLogger(), writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema()), nonDupeStream, nonDuplicatesFlowFile);
+ final RecordSetWriter duplicatesWriter = writerFactory
+ .createWriter(getLogger(), writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema()), dupeStream, duplicatesFlowFile);
+ ) {
+ final FilterWrapper filter = getFilter(context);
final String recordHashingAlgorithm = context.getProperty(RECORD_HASHING_ALGORITHM).getValue();
final MessageDigest messageDigest = recordHashingAlgorithm.equals(NONE_ALGORITHM_VALUE.getValue())
@@ -435,14 +449,6 @@ public class DetectDuplicateRecord extends AbstractProcessor {
: DigestUtils.getDigest(recordHashingAlgorithm);
final Boolean matchWholeRecord = context.getProperties().keySet().stream().noneMatch(p -> p.isDynamic());
- final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
- final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final RecordReader reader = readerFactory.createRecordReader(flowFile.getAttributes(), session.read(flowFile), logger);
-
- final RecordSchema writeSchema = writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema());
- final RecordSetWriter nonDuplicatesWriter = writerFactory.createWriter(getLogger(), writeSchema, session.write(nonDuplicatesFlowFile));
- final RecordSetWriter duplicatesWriter = writerFactory.createWriter(getLogger(), writeSchema, session.write(duplicatesFlowFile));
-
nonDuplicatesWriter.beginRecordSet();
duplicatesWriter.beginRecordSet();
Record record;
@@ -451,114 +457,125 @@ public class DetectDuplicateRecord extends AbstractProcessor {
String recordValue;
if (matchWholeRecord) {
- recordValue = Joiner.on('~').join(record.getValues());
+ recordValue = Joiner.on(JOIN_CHAR).join(record.getValues());
} else {
- final List<String> fieldValues = new ArrayList<>();
- for (final String recordPathText : recordPaths) {
- final PropertyValue recordPathPropertyValue = context.getProperty(recordPathText);
- final RecordPath recordPath = recordPathCache.getCompiled(recordPathText);
- final RecordPathResult result = recordPath.evaluate(record);
- final List<FieldValue> selectedFields = result.getSelectedFields().collect(Collectors.toList());
-
- if(recordPathPropertyValue.isExpressionLanguagePresent()) {
- final Map<String, String> fieldVariables = new HashMap<>();
- selectedFields.forEach(fieldVal -> {
- fieldVariables.clear();
- fieldVariables.put(FIELD_NAME, fieldVal.getField().getFieldName());
- fieldVariables.put(FIELD_VALUE, DataTypeUtils.toString(fieldVal.getValue(), (String) null));
- fieldVariables.put(FIELD_TYPE, fieldVal.getField().getDataType().getFieldType().name());
-
- fieldValues.add(recordPathPropertyValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue());
- });
- } else {
- fieldValues.add(recordPathPropertyValue.evaluateAttributeExpressions(flowFile).getValue());
- }
-
- fieldValues.addAll(selectedFields.stream()
- .map(f -> recordPathPropertyValue.evaluateAttributeExpressions(flowFile).getValue())
- .collect(toList())
- );
- }
- recordValue = Joiner.on('~').join(fieldValues);
+ recordValue = executeDynamicRecordPaths(context, record, flowFile);
}
- final String recordHash = messageDigest != null
+ String recordHash = messageDigest != null
? Hex.encodeHexString(messageDigest.digest(getBytesUtf8(recordValue)))
: recordValue;
+ messageDigest.reset();
+
+ if (!useInMemoryStrategy && context.getProperty(CACHE_IDENTIFIER).isSet()) {
+ Map<String, String> additional = new HashMap<>();
+ additional.put("record.hash.value", recordHash);
+ String rawPath = context.getProperty(CACHE_IDENTIFIER).evaluateAttributeExpressions(flowFile, additional).getValue();
+ RecordPath compiled = recordPathCache.getCompiled(rawPath);
+ RecordPathResult result = compiled.evaluate(record);
+ FieldValue fieldValue = result.getSelectedFields().findFirst().get();
+ if (fieldValue.getValue() == null) {
+ throw new ProcessException(String.format("The path \"%s\" failed to create an ID value at record index %d", rawPath, index));
+ }
+
+ recordHash = fieldValue.getValue().toString();
+ }
- if(filter.contains(recordHash)) {
+ if (filter.contains(recordHash)) {
duplicatesWriter.write(record);
} else {
nonDuplicatesWriter.write(record);
+ filter.put(recordHash);
}
- filter.put(recordHash);
+ index++;
}
- final boolean includeZeroRecordFlowFiles = context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).isSet()
- ? context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean()
- : true;
-
+ duplicateMimeType = duplicatesWriter.getMimeType();
+ nonDuplicateMimeType = nonDuplicatesWriter.getMimeType();
// Route Non-Duplicates FlowFile
- final WriteResult nonDuplicatesWriteResult = nonDuplicatesWriter.finishRecordSet();
- nonDuplicatesWriter.close();
- Map<String, String> attributes = new HashMap<>();
- attributes.putAll(nonDuplicatesWriteResult.getAttributes());
- attributes.put("record.count", String.valueOf(nonDuplicatesWriteResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(), nonDuplicatesWriter.getMimeType());
- nonDuplicatesFlowFile = session.putAllAttributes(nonDuplicatesFlowFile, attributes);
- logger.info("Successfully found {} unique records for {}", new Object[] {nonDuplicatesWriteResult.getRecordCount(), nonDuplicatesFlowFile});
+ nonDuplicatesWriteResult = nonDuplicatesWriter.finishRecordSet();
+ // Route Duplicates FlowFile
+ duplicatesWriteResult = duplicatesWriter.finishRecordSet();
- if(!includeZeroRecordFlowFiles && nonDuplicatesWriteResult.getRecordCount() == 0) {
- session.remove(nonDuplicatesFlowFile);
+ } catch (final Exception e) {
+ logger.error("Failed in detecting duplicate records at index " + index, e);
+ error = true;
+ } finally {
+ if (!error) {
+ final boolean includeZeroRecordFlowFiles = context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean();
+
+ session.adjustCounter("Records Processed",
+ nonDuplicatesWriteResult.getRecordCount() + duplicatesWriteResult.getRecordCount(), false);
+
+ sendOrRemove(session, duplicatesFlowFile, REL_DUPLICATE, duplicateMimeType,
+ includeZeroRecordFlowFiles, duplicatesWriteResult);
+
+ sendOrRemove(session, nonDuplicatesFlowFile, REL_NON_DUPLICATE, nonDuplicateMimeType,
+ includeZeroRecordFlowFiles, nonDuplicatesWriteResult);
+
+ session.transfer(flowFile, REL_ORIGINAL);
} else {
- session.transfer(nonDuplicatesFlowFile, REL_NON_DUPLICATE);
+ session.remove(duplicatesFlowFile);
+ session.remove(nonDuplicatesFlowFile);
+ session.transfer(flowFile, REL_FAILURE);
}
+ }
+ }
- // Route Duplicates FlowFile
- final WriteResult duplicatesWriteResult = duplicatesWriter.finishRecordSet();
- duplicatesWriter.close();
- attributes.clear();
- attributes.putAll(duplicatesWriteResult.getAttributes());
- attributes.put("record.count", String.valueOf(duplicatesWriteResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(), duplicatesWriter.getMimeType());
- duplicatesFlowFile = session.putAllAttributes(nonDuplicatesFlowFile, attributes);
- logger.info("Successfully found {} duplicate records for {}", new Object[] {duplicatesWriteResult.getRecordCount(), nonDuplicatesFlowFile});
-
- if(!includeZeroRecordFlowFiles && duplicatesWriteResult.getRecordCount() == 0) {
- session.remove(duplicatesFlowFile);
- } else {
- session.transfer(duplicatesFlowFile, REL_DUPLICATE);
+ private void sendOrRemove(ProcessSession session,
+ FlowFile outputFlowFile,
+ Relationship targetRelationship,
+ String mimeType,
+ boolean includeZeroRecordFlowFiles,
+ WriteResult writeResult) {
+ if (!includeZeroRecordFlowFiles && writeResult.getRecordCount() == 0) {
+ session.remove(outputFlowFile);
+ } else {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.putAll(writeResult.getAttributes());
+ attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType);
+ outputFlowFile = session.putAllAttributes(outputFlowFile, attributes);
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Successfully found {} unique records for {}",
+ writeResult.getRecordCount(), outputFlowFile);
}
- session.adjustCounter("Records Processed",
- nonDuplicatesWriteResult.getRecordCount() + duplicatesWriteResult.getRecordCount(), false);
+ session.transfer(outputFlowFile, targetRelationship);
+ }
+ }
- if(shouldCacheIdentifier) {
- CacheValue cacheValue = new CacheValue(serializableFilter, now);
- cache.put(cacheKey, cacheValue, keySerializer, cacheValueSerializer);
- }
+ private String executeDynamicRecordPaths(ProcessContext context, Record record, FlowFile flowFile) {
+ final List<String> fieldValues = new ArrayList<>();
+ for (final PropertyDescriptor propertyDescriptor : dynamicProperties) {
+ final String value = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue();
+ final RecordPath recordPath = recordPathCache.getCompiled(value);
+ final RecordPathResult result = recordPath.evaluate(record);
+ final List<FieldValue> selectedFields = result.getSelectedFields().collect(Collectors.toList());
- session.transfer(flowFile, REL_ORIGINAL);
+ fieldValues.add(propertyDescriptor.getName());
- } catch (final Exception e) {
- logger.error("Failed in detecting duplicate records.", e);
- session.remove(duplicatesFlowFile);
- session.remove(nonDuplicatesFlowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
+ fieldValues.addAll(selectedFields.stream()
+ .map(f -> f.getValue().toString())
+ .collect(toList())
+ );
}
+
+ return Joiner.on(JOIN_CHAR).join(fieldValues);
}
private abstract static class FilterWrapper {
public static FilterWrapper create(Object filter) {
- if(filter instanceof HashSet) {
+ if (filter instanceof HashSet) {
return new HashSetFilterWrapper((HashSet<String>) filter);
} else {
return new BloomFilterWrapper((BloomFilter<String>) filter);
}
}
+
public abstract boolean contains(String value);
+
public abstract void put(String value);
}
@@ -600,6 +617,34 @@ public class DetectDuplicateRecord extends AbstractProcessor {
}
}
+ private static class DistributedMapCacheClientWrapper extends FilterWrapper {
+ private DistributedMapCacheClient client;
+
+ public DistributedMapCacheClientWrapper(DistributedMapCacheClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public boolean contains(String value) {
+ try {
+ return client.containsKey(value, STRING_SERIALIZER);
+ } catch (IOException e) {
+ throw new ProcessException("Distributed Map lookup failed", e);
+ }
+ }
+
+ @Override
+ public void put(String value) {
+ /*
+ * This needs to be a noop because this process will be used upstream of the systems that would write the records
+ * that power the map cache.
+ */
+ }
+ }
+
+ private static final Serializer<String> STRING_SERIALIZER = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8));
+ private static final Serializer<Boolean> BOOLEAN_SERIALIZER = (value, output) -> output.write((byte) (value ? 1 : 0));
+
private static class CacheValue implements Serializable {
private final Serializable filter;
@@ -618,31 +663,4 @@ public class DetectDuplicateRecord extends AbstractProcessor {
return entryTimeMS;
}
}
-
- private static class CacheValueSerializer implements Serializer<CacheValue> {
- @Override
- public void serialize(CacheValue cacheValue, OutputStream outputStream) throws SerializationException, IOException {
- new ObjectOutputStream(outputStream).writeObject(cacheValue);
- }
- }
-
- private static class CacheValueDeserializer implements Deserializer<CacheValue> {
- @Override
- public CacheValue deserialize(byte[] bytes) throws DeserializationException, IOException {
- try {
- return (CacheValue) new ObjectInputStream(new ByteArrayInputStream(bytes)).readObject();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- return null;
- }
- }
-
- private static class StringSerializer implements Serializer<String> {
-
- @Override
- public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
- out.write(value.getBytes(StandardCharsets.UTF_8));
- }
- }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 0324a5b..160d552 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -26,7 +26,7 @@ org.apache.nifi.processors.standard.CryptographicHashAttribute
org.apache.nifi.processors.standard.CryptographicHashContent
org.apache.nifi.processors.standard.DebugFlow
org.apache.nifi.processors.standard.DetectDuplicate
-org.apache.nifi.processors.standard.DetectDuplicateRecord
+org.apache.nifi.processors.standard.DeduplicateRecord
org.apache.nifi.processors.standard.DistributeLoad
org.apache.nifi.processors.standard.DuplicateFlowFile
org.apache.nifi.processors.standard.EncryptContent
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DeduplicateRecords/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DeduplicateRecords/additionalDetails.html
new file mode 100644
index 0000000..07b1f64
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.DeduplicateRecords/additionalDetails.html
@@ -0,0 +1,70 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8"/>
+ <title>DeduplicateRecords</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+ <h1>Overview</h1>
+ <p>This processor provides deduplication across either a single record set file, across several files or even across an entire data lake
+ using a DistributedMapCacheClient controller service. In the case of the former, it uses either a HashSet or a bloom
+ filter to provide extremely fast in-memory calculations with a high degree of accuracy. In the latter use case, it
+ will use the controller service to compare a generated hash against a map cache stored in one of the supported caching
+ options that Apache NiFi offers.</p>
+
+ <h2>Configuring single file deduplication</h2>
+ <p>Choose the "single file" option under the configuration property labeled "Deduplication Strategy." Then choose
+ whether to use a bloom filter or hash set. Be mindful to set size limits that are in line with the average size of the
+ record sets that you process.</p>
+
+ <h2>Configuring multi-file deduplication</h2>
+ <p>Select the "Multiple Files" option under "Deduplication Strategy" and then configure a DistributedMapCacheClient service.
+ It is possible to configure a cache identifier in multiple ways:</p>
+ <ol>
+ <li>Generate a hash of the entire record by specifying no dynamic properties.</li>
+ <li>Generate a hash using dynamic properties to specify particular fields to use.</li>
+ <li>Manually specify a single record path statement in the cache identifier property. Note:
+ <ul>
+ <li>This can be chained with #1 and #2 because it supports expression language and exposes the computed
+ hash from #1 or #2 as the EL variable <em>record.hash.value</em>. Example:
+ <em>concat('${some.var}', -, '${record.hash.value}')</em>
+ </li>
+ </ul>
+ </li>
+ </ol>
+ <h2>The role of dynamic properties</h2>
+ <p>Dynamic properties should have a human-readable name for the property name and a record path operation for the
+ value. The record path operations will be used to extract values from the record to assemble a unique identifier. Here is an example:</p>
+ <ul>
+ <li>firstName => /FirstName</li>
+ <li>lastName => /LastName</li>
+ </ul>
+ <p>Record:</p>
+ <pre>
+ {
+ "firstName": "John",
+ "lastName": "Smith"
+ }
+ </pre>
+ <p>Will yield an identifier that has "John" and "Smith" in it before a hash is generated from the final value.</p>
+ <p>If any record path is missing, it will cause an exception to be raised and the flowfile will be sent to the
+ failure relationship.</p>
+</body>
+</html>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org/apache/nifi/processors/standard/DetectDuplicateRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org/apache/nifi/processors/standard/DetectDuplicateRecord/additionalDetails.html
deleted file mode 100644
index 8f24302..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org/apache/nifi/processors/standard/DetectDuplicateRecord/additionalDetails.html
+++ /dev/null
@@ -1,96 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<head>
- <meta charset="utf-8" />
- <title>DetectDuplicateRecord</title>
-
- <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
-</head>
-<body>
- <p>This processor makes use of the NiFi RecordPath Domain-Specific Language (DSL) to allow the user to
- indicate which field(s) in the Record should be used to determine uniqueness. Users do this by adding
- a User-defined Property to the Processor's configuration. The name of the User-defined Property must
- be the RecordPath text that should be evaluated against each Record. All of the values identified by
- the record paths are hashed together in the order they were specified to derive a unique value
- representing a single Record. This hashed value is then optionally stored in the cache for
- subsequent FlowFile processing.</p>
- <p>If a RecordPath is given and does not match any field in an input Record, that Property will be
- skipped and all other Properties will still be evaluated. If the RecordPath matches no fields the
- record will be routed to the 'non-duplicate' relationship. If no User-defined Properties specifying
- a RecordPath are defined, all field values of the record will be used.</p>
- <p>After all RecordPath values are resolved, the values are combined in the order of the User-defined
- Properties and hashed together using the specified hashing algorithm, ensuring constant space per record.</p>
-
- <h2>Choosing a Filter Type</h2>
- <p></p>
- <h2>Examples</h2>
- <p>Below, we lay out some examples in order to provide clarity about the Processor's behavior.
- For all of the examples below, consider the example to operate on the following set of 2 (JSON) records:</p>
- <code>
- <pre>
- [
- {
- "id": 1,
- "name": "John",
- "gender": "M",
- },
- {
- "id": 2,
- "name": "Susan",
- "gender": "F",
- },
- {
- "id": 3,
- "name": "Megan",
- "gender": "F",
- },
- {
- "id": 2,
- "name": "Jerry",
- "gender": "M",
- },
- ]
- </pre>
- </code>
-
- <h3>Example 1: Matching on a Single Record Field</h3>
- <p>A valid property RecordPath mapping would be <em>/id => ${field.value}</em>.</p>
- <p>For a record set with JSON like that, the records will be evaluated against the <code>id</code> field
- to determine uniqueness.</p>
- <ul>
- <li><strong>non-duplicate:</strong> John, Susan, Megan</li>
- <li><strong>duplicate:</strong> Jerry</li>
- </ul>
-
- <h3>Example 2: Matching on Multiple Record Fields</h3>
- <p>If we wanted to define these records to be unique based on the <code>id</code> and <code>gender</code> fields,
- we would specify two RecordPath mappings: <em>/id => ${field.value}</em> and <em>/gender => ${field.value}</em>.</p>
- <ul>
- <li><strong>non-duplicate:</strong> John, Susan, Megan, Jerry</li>
- <li><strong>duplicate:</strong> <em>None</em></li>
- </ul>
-
- <h3>Example 3: Matching on All Record Fields</h3>
- <p>Do not define any RecordPath properties in the processor to use all fields by default.</p>
- <p>For a record set with JSON like that, the records will be evaluated against the <code>id, name, gender</code>
- fields to determine uniqueness.</p>
- <ul>
- <li><strong>non-duplicate:</strong> John, Susan, Megan, Jerry</li>
- <li><strong>duplicate:</strong> <em>None</em></li>
- </ul>
-</body>
-</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/MockCacheService.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/MockCacheService.groovy
deleted file mode 100644
index d0f5d09..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/MockCacheService.groovy
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard
-
-import org.apache.nifi.controller.AbstractControllerService
-import org.apache.nifi.distributed.cache.client.Deserializer
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
-import org.apache.nifi.distributed.cache.client.Serializer
-
-class MockCacheService extends AbstractControllerService implements DistributedMapCacheClient {
- def map = [:]
-
- @Override
- def <K, V> boolean putIfAbsent(K k, V v, Serializer<K> serializer, Serializer<V> serializer1) throws IOException {
- def retVal = map.containsKey(k)
- if (retVal) {
- false
- } else {
- map[k] = v
- true
- }
- }
-
- @Override
- def <K, V> V getAndPutIfAbsent(K k, V v, Serializer<K> serializer, Serializer<V> serializer1, Deserializer<V> deserializer) throws IOException {
- return null
- }
-
- @Override
- def <K> boolean containsKey(K k, Serializer<K> serializer) throws IOException {
- return map.containsKey(k)
- }
-
- @Override
- def <K, V> void put(K k, V v, Serializer<K> serializer, Serializer<V> serializer1) throws IOException {
-
- }
-
- @Override
- def <K, V> V get(K k, Serializer<K> serializer, Deserializer<V> deserializer) throws IOException {
- return null
- }
-
- @Override
- void close() throws IOException {
-
- }
-
- @Override
- def <K> boolean remove(K k, Serializer<K> serializer) throws IOException {
- return false
- }
-
- @Override
- long removeByPattern(String s) throws IOException {
- return 0
- }
-
- void assertContains(String key, String value) {
- assert map.containsKey(key) && map[key] == value
- }
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java
new file mode 100644
index 0000000..9de152f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDeduplicateRecord.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestDeduplicateRecord {
+
+ private TestRunner runner;
+ private MockRecordParser reader;
+ private MockRecordWriter writer;
+
+ @BeforeEach
+ public void setup() throws InitializationException {
+ runner = TestRunners.newTestRunner(DeduplicateRecord.class);
+
+ // RECORD_READER, RECORD_WRITER
+ reader = new MockRecordParser();
+ writer = new MockRecordWriter("header", false);
+
+ runner.addControllerService("reader", reader);
+ runner.enableControllerService(reader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(DeduplicateRecord.RECORD_READER, "reader");
+ runner.setProperty(DeduplicateRecord.RECORD_WRITER, "writer");
+ runner.setProperty(DeduplicateRecord.RECORD_HASHING_ALGORITHM, DeduplicateRecord.SHA256_ALGORITHM_VALUE);
+
+ reader.addSchemaField("firstName", RecordFieldType.STRING);
+ reader.addSchemaField("middleName", RecordFieldType.STRING);
+ reader.addSchemaField("lastName", RecordFieldType.STRING);
+
+ // INCLUDE_ZERO_RECORD_FLOWFILES
+ runner.setProperty(DeduplicateRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "true");
+
+ runner.assertValid();
+ }
+
+ void commonEnqueue() {
+ final Map<String, String> props = new HashMap<>();
+ props.put("hash.value", "1000");
+ runner.enqueue(new byte[]{}, props);
+ }
+
+ @Test
+ public void testInvalidRecordPathCausesValidationError() {
+ runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.HASH_SET_VALUE);
+ runner.setProperty("middle_name", "//////middleName");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testDetectDuplicatesHashSet() {
+ commonEnqueue();
+
+ runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.HASH_SET_VALUE);
+ runner.setProperty("middle_name", "/middleName");
+ reader.addRecord("John", "Q", "Smith");
+ reader.addRecord("John", "Q", "Smith");
+ reader.addRecord("Jane", "X", "Doe");
+
+ runner.enqueue("");
+ runner.run();
+
+ doCountTests(0, 1, 1, 1, 2, 1);
+ }
+
+ @Test
+ public void testDetectDuplicatesBloomFilter() {
+ commonEnqueue();
+ runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.BLOOM_FILTER_VALUE);
+ runner.setProperty(DeduplicateRecord.BLOOM_FILTER_FPP, "0.10");
+ runner.setProperty("middle_name", "/middleName");
+ reader.addRecord("John", "Q", "Smith");
+ reader.addRecord("John", "Q", "Smith");
+ reader.addRecord("Jane", "X", "Doe");
+
+ runner.enqueue("");
+ runner.run();
+
+ doCountTests(0, 1, 1, 1, 2, 1);
+ }
+
+ @Test
+ public void testNoDuplicatesHashSet() {
+ commonEnqueue();
+ runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.HASH_SET_VALUE);
+ runner.setProperty("middle_name", "/middleName");
+ reader.addRecord("John", "Q", "Smith");
+ reader.addRecord("Jack", "Z", "Brown");
+ reader.addRecord("Jane", "X", "Doe");
+
+ runner.enqueue("");
+ runner.run();
+
+ doCountTests(0, 1, 1, 1, 3, 0);
+ }
+
+ @Test
+ public void testNoDuplicatesBloomFilter() {
+ commonEnqueue();
+ runner.setProperty(DeduplicateRecord.FILTER_TYPE, DeduplicateRecord.BLOOM_FILTER_VALUE);
+ runner.setProperty(DeduplicateRecord.BLOOM_FILTER_FPP, "0.10");
+ runner.setProperty("middle_name", "/middleName");
+ reader.addRecord("John", "Q", "Smith");
+ reader.addRecord("Jack", "Z", "Brown");
+ reader.addRecord("Jane", "X", "Doe");
+
+ runner.enqueue("");
+ runner.run();
+
+ doCountTests(0, 1, 1, 1, 3, 0);
+ }
+
+ @Test
+ public void testAllDuplicates() {
+ commonEnqueue();
+ reader.addRecord("John", "Q", "Smith");
+ reader.addRecord("John", "Q", "Smith");
+ reader.addRecord("John", "Q", "Smith");
+
+ runner.enqueue("");
+ runner.run();
+
+ doCountTests(0, 1, 1, 1, 1, 2);
+ }
+
+ @Test
+ public void testAllUnique() {
+ commonEnqueue();
+ reader.addRecord("John", "Q", "Smith");
+ reader.addRecord("Jack", "Z", "Brown");
+ reader.addRecord("Jane", "X", "Doe");
+
+ runner.enqueue("");
+ runner.run();
+
+ doCountTests(0, 1, 1, 1, 3, 0);
+ }
+
+ @Test
+ public void testCacheValueFromRecordPath() {
+ commonEnqueue();
+ reader.addRecord("John", "Q", "Smith");
+ reader.addRecord("Jack", "Z", "Brown");
+ reader.addRecord("Jack", "Z", "Brown");
+
+ runner.enqueue("");
+ runner.run();
+
+ doCountTests(0, 1, 1, 1, 2, 1);
+ }
+
+ /*
+ * These are all related to NIFI-6014
+ */
+
+ @Test
+ public void testMultipleFileDeduplicationRequiresDMC() {
+ runner.setProperty(DeduplicateRecord.DEDUPLICATION_STRATEGY, DeduplicateRecord.OPTION_MULTIPLE_FILES.getValue());
+ runner.assertNotValid();
+ }
+
+ public static final String FIRST_KEY = DigestUtils.sha256Hex(String.join(String.valueOf(DeduplicateRecord.JOIN_CHAR), Arrays.asList(
+ "John", "Q", "Smith"
+ )));
+ public static final String SECOND_KEY = DigestUtils.sha256Hex(String.join(String.valueOf(DeduplicateRecord.JOIN_CHAR), Arrays.asList(
+ "Jack", "Z", "Brown"
+ )));
+
+ @Test
+ public void testDeduplicateWithDMC() throws Exception {
+ DistributedMapCacheClient dmc = new MockCacheService<>();
+ runner.addControllerService("dmc", dmc);
+ runner.setProperty(DeduplicateRecord.DISTRIBUTED_MAP_CACHE, "dmc");
+ runner.setProperty(DeduplicateRecord.DEDUPLICATION_STRATEGY, DeduplicateRecord.OPTION_MULTIPLE_FILES.getValue());
+ runner.enableControllerService(dmc);
+ runner.assertValid();
+
+ dmc.put(FIRST_KEY, true, null, null);
+ dmc.put(SECOND_KEY, true, null, null);
+
+ reader.addRecord("John", "Q", "Smith");
+ reader.addRecord("Jack", "Z", "Brown");
+ reader.addRecord("Jack", "Z", "Brown");
+ reader.addRecord("Jane", "X", "Doe");
+
+ runner.enqueue("");
+ runner.run();
+
+ doCountTests(0, 1, 1, 1, 1, 3);
+ }
+
+ @Test
+ public void testDeduplicateWithDMCAndCacheIdentifier() throws Exception {
+ DistributedMapCacheClient dmc = new MockCacheService<>();
+ runner.addControllerService("dmc", dmc);
+ runner.setProperty(DeduplicateRecord.DISTRIBUTED_MAP_CACHE, "dmc");
+ runner.setProperty(DeduplicateRecord.DEDUPLICATION_STRATEGY, DeduplicateRecord.OPTION_MULTIPLE_FILES.getValue());
+ runner.setProperty(DeduplicateRecord.CACHE_IDENTIFIER, "concat('${user.name}', '${record.hash.value}')");
+ runner.enableControllerService(dmc);
+ runner.assertValid();
+
+ dmc.put(String.format("john.smith-%s", FIRST_KEY), true, null, null);
+ dmc.put(String.format("john.smith-%s", SECOND_KEY), true, null, null);
+
+ reader.addRecord("John", "Q", "Smith");
+ reader.addRecord("Jack", "Z", "Brown");
+ reader.addRecord("Jack", "Z", "Brown");
+ reader.addRecord("Jane", "X", "Doe");
+
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put("user.name", "john.smith-");
+
+ runner.enqueue("", attrs);
+ runner.run();
+
+ doCountTests(0, 1, 1, 1, 1, 3);
+ }
+
+ void doCountTests(int failure, int original, int duplicates, int notDuplicates, int notDupeCount, int dupeCount) {
+ runner.assertTransferCount(DeduplicateRecord.REL_DUPLICATE, duplicates);
+ runner.assertTransferCount(DeduplicateRecord.REL_NON_DUPLICATE, notDuplicates);
+ runner.assertTransferCount(DeduplicateRecord.REL_ORIGINAL, original);
+ runner.assertTransferCount(DeduplicateRecord.REL_FAILURE, failure);
+
+ List<MockFlowFile> duplicateFlowFile = runner.getFlowFilesForRelationship(DeduplicateRecord.REL_DUPLICATE);
+ if (duplicateFlowFile != null) {
+ assertEquals(String.valueOf(dupeCount), duplicateFlowFile.get(0).getAttribute("record.count"));
+ }
+
+ List<MockFlowFile> nonDuplicateFlowFile = runner.getFlowFilesForRelationship(DeduplicateRecord.REL_NON_DUPLICATE);
+ if (nonDuplicateFlowFile != null) {
+ assertEquals(String.valueOf(notDupeCount), nonDuplicateFlowFile.get(0).getAttribute("record.count"));
+ }
+ }
+
+ private static final class MockCacheService<K, V> extends AbstractControllerService implements DistributedMapCacheClient {
+ private Map storage;
+
+ public MockCacheService() {
+ storage = new HashMap<>();
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ return false;
+ }
+
+ @Override
+ public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
+ return null;
+ }
+
+ @Override
+ public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
+ return storage.containsKey(key);
+ }
+
+ @Override
+ public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ storage.put(key, value);
+ }
+
+ @Override
+ public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
+ return false;
+ }
+
+ @Override
+ public long removeByPattern(String regex) throws IOException {
+ return 0;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicateRecord.java
deleted file mode 100644
index 6855857..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicateRecord.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.record.MockRecordParser;
-import org.apache.nifi.serialization.record.MockRecordWriter;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.util.*;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.nifi.processors.standard.DetectDuplicateRecord.*;
-import static org.junit.Assert.assertEquals;
-
-public class TestDetectDuplicateRecord {
-
- private TestRunner runner;
- private MockCacheService cache;
- private MockRecordParser reader;
- private MockRecordWriter writer;
-
- @BeforeClass
- public static void beforeClass() {
- System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
- System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DetectDuplicateRecord", "debug");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestDetectDuplicateRecord", "debug");
- }
-
- @Before
- public void setup() throws InitializationException {
- runner = TestRunners.newTestRunner(DetectDuplicateRecord.class);
-
- // RECORD_READER, RECORD_WRITER
- reader = new MockRecordParser();
- writer = new MockRecordWriter("header", false);
-
- runner.addControllerService("reader", reader);
- runner.enableControllerService(reader);
- runner.addControllerService("writer", writer);
- runner.enableControllerService(writer);
-
- runner.setProperty(RECORD_READER, "reader");
- runner.setProperty(RECORD_WRITER, "writer");
-
- reader.addSchemaField("firstName", RecordFieldType.STRING);
- reader.addSchemaField("middleName", RecordFieldType.STRING);
- reader.addSchemaField("lastName", RecordFieldType.STRING);
-
- // INCLUDE_ZERO_RECORD_FLOWFILES
- runner.setProperty(INCLUDE_ZERO_RECORD_FLOWFILES, "true");
-
- // CACHE_IDENTIFIER
- runner.setProperty(CACHE_IDENTIFIER, "true");
-
- // DISTRIBUTED_CACHE_SERVICE
- cache = new MockCacheService();
- runner.addControllerService("cache", cache);
- runner.setProperty(DISTRIBUTED_CACHE_SERVICE, "cache");
- runner.enableControllerService(cache);
-
- // CACHE_ENTRY_IDENTIFIER
- final Map<String, String> props = new HashMap<>();
- props.put("hash.value", "1000");
- runner.enqueue(new byte[]{}, props);
-
- // AGE_OFF_DURATION
- runner.setProperty(AGE_OFF_DURATION, "48 hours");
-
- runner.assertValid();
- }
-
- @Test
- public void testDetectDuplicatesHashSet() {
- runner.setProperty(FILTER_TYPE, HASH_SET_VALUE);
- runner.setProperty("/middleName", "${field.value}");
- reader.addRecord("John", "Q", "Smith");
- reader.addRecord("John", "Q", "Smith");
- reader.addRecord("Jane", "X", "Doe");
-
- runner.enqueue("");
- runner.run();
-
- doCountTests(0, 1, 1, 1, 2, 1);
- }
-
- @Test
- public void testDetectDuplicatesBloomFilter() {
- runner.setProperty(FILTER_TYPE, BLOOM_FILTER_VALUE);
- runner.setProperty(BLOOM_FILTER_FPP, "0.10");
- runner.setProperty("/middleName", "${field.value}");
- reader.addRecord("John", "Q", "Smith");
- reader.addRecord("John", "Q", "Smith");
- reader.addRecord("Jane", "X", "Doe");
-
- runner.enqueue("");
- runner.run();
-
- doCountTests(0, 1, 1, 1, 2, 1);
- }
-
- @Test
- public void testNoDuplicatesHashSet() {
- runner.setProperty(FILTER_TYPE, HASH_SET_VALUE);
- runner.setProperty("/middleName", "${field.value}");
- reader.addRecord("John", "Q", "Smith");
- reader.addRecord("Jack", "Z", "Brown");
- reader.addRecord("Jane", "X", "Doe");
-
- runner.enqueue("");
- runner.run();
-
- doCountTests(0, 1, 1, 1, 3, 0);
- }
-
- @Test
- public void testNoDuplicatesBloomFilter() {
- runner.setProperty(FILTER_TYPE, BLOOM_FILTER_VALUE);
- runner.setProperty(BLOOM_FILTER_FPP, "0.10");
- runner.setProperty("/middleName", "${field.value}");
- reader.addRecord("John", "Q", "Smith");
- reader.addRecord("Jack", "Z", "Brown");
- reader.addRecord("Jane", "X", "Doe");
-
- runner.enqueue("");
- runner.run();
-
- doCountTests(0, 1, 1, 1, 3, 0);
- }
-
- @Test
- public void testAllDuplicates() {
- reader.addRecord("John", "Q", "Smith");
- reader.addRecord("John", "Q", "Smith");
- reader.addRecord("John", "Q", "Smith");
-
- runner.enqueue("");
- runner.run();
-
- doCountTests(0, 1, 1, 0, 1, 2);
- }
-
- @Test
- public void testAllUnique() {
- reader.addRecord("John", "Q", "Smith");
- reader.addRecord("Jack", "Z", "Brown");
- reader.addRecord("Jane", "X", "Doe");
-
- runner.enqueue("");
- runner.run();
-
- doCountTests(0, 1, 1, 1, 3, 0);
- }
-
-
-
- @Test
- public void testCacheValueFromRecordPath() {
- runner.setProperty(CACHE_ENTRY_IDENTIFIER, "Users");
- reader.addRecord("John", "Q", "Smith");
- reader.addRecord("Jack", "Z", "Brown");
- reader.addRecord("Jane", "X", "Doe");
-
- runner.enqueue("");
- runner.run();
-
- doCountTests(0, 1, 1, 1, 2, 1);
-
- cache.assertContains("KEY", "VALUE"); // TODO: Get the tests running so you can see what the key/value is in serialized form
- }
-
- void doCountTests(int failure, int original, int duplicates, int notDuplicates, int notDupeCount, int dupeCount) {
- runner.assertTransferCount(REL_DUPLICATE, duplicates);
- runner.assertTransferCount(REL_NON_DUPLICATE, notDuplicates);
- runner.assertTransferCount(REL_ORIGINAL, original);
- runner.assertTransferCount(REL_FAILURE, failure);
-
- List<MockFlowFile> duplicateFlowFile = runner.getFlowFilesForRelationship(REL_DUPLICATE);
- if (duplicateFlowFile != null) {
- assertEquals(String.valueOf(dupeCount), duplicateFlowFile.get(0).getAttribute("record.count"));
- }
-
- List<MockFlowFile> nonDuplicateFlowFile = runner.getFlowFilesForRelationship(REL_NON_DUPLICATE);
- if (nonDuplicateFlowFile != null) {
- assertEquals(String.valueOf(notDupeCount), nonDuplicateFlowFile.get(0).getAttribute("record.count"));
- }
- }
-}