You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/09/18 00:04:24 UTC
[2/2] nifi git commit: NIFI-5147,
NIFI-5566 Added CryptographicHashAttribute and
CryptographicHashContent processors. Deprecated HashContent processor. Added
documentation to HashAttribute processor. Added shared HashService and
HashAlgorithm enum. Added
NIFI-5147, NIFI-5566 Added CryptographicHashAttribute and CryptographicHashContent processors.
Deprecated HashContent processor.
Added documentation to HashAttribute processor.
Added shared HashService and HashAlgorithm enum.
Added unit tests.
Added #clearProperties() to TestRunner, StandardProcessorTestRunner, and MockProcessContext.
Updated processor manifest.
Updated Javadoc.
Added documentation about deprecated/renamed components.
Added logic handling for UTF-16 encoding where UTF-16BE is overridden so the BOM is not inserted prior to hashing.
This closes #2983.
Co-authored-by: Otto Fowler <ot...@gmail.com>
Signed-off-by: Kevin Doran <kd...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cd687740
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cd687740
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cd687740
Branch: refs/heads/master
Commit: cd6877404b8f9eee151815dd1451c18cc79e707c
Parents: 07ad132
Author: Andy LoPresto <al...@apache.org>
Authored: Mon Jun 18 11:00:38 2018 -0400
Committer: Andy LoPresto <al...@apache.org>
Committed: Mon Sep 17 17:03:57 2018 -0700
----------------------------------------------------------------------
.../apache/nifi/util/MockProcessContext.java | 7 +
.../nifi/util/StandardProcessorTestRunner.java | 5 +
.../java/org/apache/nifi/util/TestRunner.java | 5 +
.../standard/CryptographicHashAttribute.java | 255 +++++++++++
.../standard/CryptographicHashContent.java | 163 +++++++
.../nifi/processors/standard/HashAttribute.java | 23 +-
.../nifi/processors/standard/HashContent.java | 9 +-
.../security/util/crypto/HashAlgorithm.java | 160 +++++++
.../nifi/security/util/crypto/HashService.java | 232 ++++++++++
.../org.apache.nifi.processor.Processor | 34 +-
.../CryptographicHashAttributeTest.groovy | 355 ++++++++++++++
.../CryptographicHashContentTest.groovy | 289 ++++++++++++
.../util/crypto/HashAlgorithmTest.groovy | 116 +++++
.../security/util/crypto/HashServiceTest.groovy | 458 +++++++++++++++++++
.../processors/standard/TestHashAttribute.java | 3 -
.../processors/standard/TestHashContent.java | 11 +-
16 files changed, 2087 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/cd687740/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index 281f4a8..46ce451 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -209,6 +209,13 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
return false;
}
+ public void clearProperties() {
+ Map<PropertyDescriptor, String> properties = getProperties();
+ for (Map.Entry<PropertyDescriptor, String> e : properties.entrySet()) {
+ removeProperty(e.getKey());
+ }
+ }
+
@Override
public void yield() {
yieldCalled = true;
http://git-wip-us.apache.org/repos/asf/nifi/blob/cd687740/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index aca9258..89af696 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -825,6 +825,11 @@ public class StandardProcessorTestRunner implements TestRunner {
}
@Override
+ public void clearProperties() {
+ context.clearProperties();
+ }
+
+ @Override
public List<ProvenanceEventRecord> getProvenanceEvents() {
return sharedState.getProvenanceEvents();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cd687740/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 759bfb8..a15c806 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -865,6 +865,11 @@ public interface TestRunner {
boolean removeProperty(String property);
/**
+ * Clears all set properties from the {@link ProcessContext}.
+ */
+ void clearProperties();
+
+ /**
* Returns a {@link List} of all {@link ProvenanceEventRecord}s that were
* emitted by the Processor
*
http://git-wip-us.apache.org/repos/asf/nifi/blob/cd687740/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CryptographicHashAttribute.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CryptographicHashAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CryptographicHashAttribute.java
new file mode 100644
index 0000000..053dab3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CryptographicHashAttribute.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.crypto.HashAlgorithm;
+import org.apache.nifi.security.util.crypto.HashService;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"attributes", "hash", "md5", "sha", "keccak", "blake2", "cryptography"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Calculates a hash value for each of the specified attributes using the given algorithm and writes it to an output attribute. Please refer to https://csrc.nist.gov/Projects/Hash-Functions/NIST-Policy-on-Hash-Functions for help to decide which algorithm to use. ")
+@WritesAttribute(attribute = "<Specified Attribute Name per Dynamic Property>", description = "This Processor adds an attribute whose value is the result of "
+ + "hashing the specified attribute. The name of this attribute is specified by the value of the dynamic property.")
+@DynamicProperty(name = "A flowfile attribute key for attribute inspection", value = "Attribute Name",
+ description = "The property name defines the attribute to look for and hash in the incoming flowfile. "
+ + "The property value defines the name to give the generated attribute. "
+ + "Attribute names must be unique.")
+public class CryptographicHashAttribute extends AbstractProcessor {
+ public enum PartialAttributePolicy {
+ ALLOW,
+ PROHIBIT
+ }
+
+ private static final AllowableValue ALLOW_PARTIAL_ATTRIBUTES_VALUE = new AllowableValue(PartialAttributePolicy.ALLOW.name(),
+ "Allow missing attributes",
+ "Do not route to failure if there are attributes configured for hashing that are not present in the flowfile");
+
+ private static final AllowableValue FAIL_PARTIAL_ATTRIBUTES_VALUE = new AllowableValue(PartialAttributePolicy.PROHIBIT.name(),
+ "Fail if missing attributes",
+ "Route to failure if there are attributes configured for hashing that are not present in the flowfile");
+
+ static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+ .name("character_set")
+ .displayName("Character Set")
+ .description("The Character Set used to decode the attribute being hashed -- this applies to the incoming data encoding, not the resulting hash encoding. ")
+ .required(true)
+ .allowableValues(HashService.buildCharacterSetAllowableValues())
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .defaultValue("UTF-8")
+ .build();
+
+ static final PropertyDescriptor FAIL_WHEN_EMPTY = new PropertyDescriptor.Builder()
+ .name("fail_when_empty")
+ .displayName("Fail when no attributes present")
+ .description("Route to failure when none of the attributes that are configured for hashing are found. " +
+ "If set to false, then flow files that do not contain any of the attributes that are configured for hashing will just pass through to success.")
+ .allowableValues("true", "false")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("true")
+ .build();
+
+ static final PropertyDescriptor HASH_ALGORITHM = new PropertyDescriptor.Builder()
+ .name("hash_algorithm")
+ .displayName("Hash Algorithm")
+ .description("The cryptographic hash algorithm to use. Note that not all of the algorithms available are recommended for use (some are provided for legacy use). " +
+ "There are many things to consider when picking an algorithm; it is recommended to use the most secure algorithm possible.")
+ .required(true)
+ .allowableValues(HashService.buildHashAlgorithmAllowableValues())
+ .defaultValue(HashAlgorithm.SHA256.getName())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor PARTIAL_ATTR_ROUTE_POLICY = new PropertyDescriptor.Builder()
+ .name("missing_attr_policy")
+ .displayName("Missing attribute policy")
+ .description("Policy for how the processor handles attributes that are configured for hashing but are not found in the flowfile.")
+ .required(true)
+ .allowableValues(ALLOW_PARTIAL_ATTRIBUTES_VALUE, FAIL_PARTIAL_ATTRIBUTES_VALUE)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue(ALLOW_PARTIAL_ATTRIBUTES_VALUE.getValue())
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Used for flowfiles that have a hash value added")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Used for flowfiles that are missing required attributes")
+ .build();
+ private final static Set<Relationship> relationships;
+
+ private final static List<PropertyDescriptor> properties;
+
+ private final AtomicReference<Map<String, String>> attributeToGenerateNameMapRef = new AtomicReference<>(Collections.emptyMap());
+
+ static {
+ final Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_FAILURE);
+ _relationships.add(REL_SUCCESS);
+ relationships = Collections.unmodifiableSet(_relationships);
+
+ final List<PropertyDescriptor> _properties = new ArrayList<>();
+ _properties.add(CHARACTER_SET);
+ _properties.add(FAIL_WHEN_EMPTY);
+ _properties.add(HASH_ALGORITHM);
+ _properties.add(PARTIAL_ATTR_ROUTE_POLICY);
+ properties = Collections.unmodifiableList(_properties);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ if (descriptor.isRequired()) {
+ return;
+ }
+
+ final Map<String, String> attributeToGeneratedNameMap = new HashMap<>(attributeToGenerateNameMapRef.get());
+ if (newValue == null) {
+ attributeToGeneratedNameMap.remove(descriptor.getName());
+ } else {
+ attributeToGeneratedNameMap.put(descriptor.getName(), newValue);
+ }
+
+ attributeToGenerateNameMapRef.set(Collections.unmodifiableMap(attributeToGeneratedNameMap));
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+ final Map<String, String> attributeToGeneratedNameMap = attributeToGenerateNameMapRef.get();
+ final ComponentLog logger = getLogger();
+
+ final SortedMap<String, String> relevantAttributes = getRelevantAttributes(flowFile, attributeToGeneratedNameMap);
+ if (relevantAttributes.isEmpty()) {
+ if (context.getProperty(FAIL_WHEN_EMPTY).asBoolean()) {
+ logger.info("Routing {} to 'failure' because of missing all attributes: {}", new Object[]{flowFile, getMissingKeysString(null, attributeToGeneratedNameMap.keySet())});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+ if (relevantAttributes.size() != attributeToGeneratedNameMap.size()) {
+ if (PartialAttributePolicy.valueOf(context.getProperty(PARTIAL_ATTR_ROUTE_POLICY).getValue()) == PartialAttributePolicy.PROHIBIT) {
+ logger.info("Routing {} to 'failure' because of missing attributes: {}", new Object[]{flowFile,
+ getMissingKeysString(relevantAttributes.keySet(), attributeToGeneratedNameMap.keySet())});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ // Determine the algorithm to use
+ final String algorithmName = context.getProperty(HASH_ALGORITHM).getValue();
+ logger.debug("Using algorithm {}", new Object[]{algorithmName});
+ HashAlgorithm algorithm = HashAlgorithm.fromName(algorithmName);
+
+ // Generate a hash with the configured algorithm for each attribute value
+ // and create a new attribute with the configured name
+ for (final Map.Entry<String, String> entry : relevantAttributes.entrySet()) {
+ logger.debug("Generating {} hash of attribute '{}'", new Object[]{algorithmName, entry.getKey()});
+ String value = hashValue(algorithm, entry.getValue(), charset);
+ session.putAttribute(flowFile, attributeToGeneratedNameMap.get(entry.getKey()), value);
+ }
+ session.getProvenanceReporter().modifyAttributes(flowFile);
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+
+ private static SortedMap<String, String> getRelevantAttributes(final FlowFile flowFile, final Map<String, String> attributeToGeneratedNameMap) {
+ final SortedMap<String, String> attributeMap = new TreeMap<>();
+ for (final Map.Entry<String, String> entry : attributeToGeneratedNameMap.entrySet()) {
+ final String attributeName = entry.getKey();
+ final String attributeValue = flowFile.getAttribute(attributeName);
+ if (attributeValue != null) {
+ attributeMap.put(attributeName, attributeValue);
+ }
+ }
+ return attributeMap;
+ }
+
+ private String hashValue(HashAlgorithm algorithm, String value, Charset charset) {
+ if (value == null) {
+ getLogger().warn("Tried to calculate {} hash of null value; returning empty string", new Object[]{algorithm.getName()});
+ return "";
+ }
+ return HashService.hashValue(algorithm, value, charset);
+ }
+
+ private static String getMissingKeysString(Set<String> foundKeys, Set<String> wantedKeys) {
+ final StringBuilder missingKeys = new StringBuilder();
+ for (final String wantedKey : wantedKeys) {
+ if (foundKeys == null || !foundKeys.contains(wantedKey)) {
+ missingKeys.append(wantedKey).append(" ");
+ }
+ }
+ return missingKeys.toString();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/nifi/blob/cd687740/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CryptographicHashContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CryptographicHashContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CryptographicHashContent.java
new file mode 100644
index 0000000..f681ab8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CryptographicHashContent.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.crypto.HashAlgorithm;
+import org.apache.nifi.security.util.crypto.HashService;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"content", "hash", "sha", "blake2", "md5", "cryptography"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Calculates a cryptographic hash value for the flowfile content using the given algorithm and writes it to an output attribute. Please refer to https://csrc.nist.gov/Projects/Hash-Functions/NIST-Policy-on-Hash-Functions for help to decide which algorithm to use.")
+@WritesAttribute(attribute = "content_<algorithm>", description = "This processor adds an attribute whose value is the result of "
+ + "hashing the flowfile content. The name of this attribute is specified by the value of the algorithm, e.g. 'content_SHA-256'.")
+public class CryptographicHashContent extends AbstractProcessor {
+
+ static final PropertyDescriptor FAIL_WHEN_EMPTY = new PropertyDescriptor.Builder()
+ .name("fail_when_empty")
+ .displayName("Fail if the content is empty")
+ .description("Route to failure if the content is empty. " +
+ "While hashing an empty value is valid, some flows may want to detect empty input.")
+ .allowableValues("true", "false")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .defaultValue("false")
+ .build();
+
+ static final PropertyDescriptor HASH_ALGORITHM = new PropertyDescriptor.Builder()
+ .name("hash_algorithm")
+ .displayName("Hash Algorithm")
+ .description("The hash algorithm to use. Note that not all of the algorithms available are recommended for use (some are provided for legacy compatibility). " +
+ "There are many things to consider when picking an algorithm; it is recommended to use the most secure algorithm possible.")
+ .required(true)
+ .allowableValues(HashService.buildHashAlgorithmAllowableValues())
+ .defaultValue(HashAlgorithm.SHA256.getName())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Used for flowfiles that have a hash value added")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Used for flowfiles that have no content if the 'fail on empty' setting is enabled")
+ .build();
+
+ private static Set<Relationship> relationships;
+
+ private static List<PropertyDescriptor> properties;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final Set<Relationship> _relationships = new HashSet<>();
+ _relationships.add(REL_FAILURE);
+ _relationships.add(REL_SUCCESS);
+ relationships = Collections.unmodifiableSet(_relationships);
+
+ final List<PropertyDescriptor> _properties = new ArrayList<>();
+ _properties.add(FAIL_WHEN_EMPTY);
+ _properties.add(HASH_ALGORITHM);
+ properties = Collections.unmodifiableList(_properties);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+ final ComponentLog logger = getLogger();
+
+ // Determine the algorithm to use
+ final String algorithmName = context.getProperty(HASH_ALGORITHM).getValue();
+ logger.debug("Using algorithm {}", new Object[]{algorithmName});
+ HashAlgorithm algorithm = HashAlgorithm.fromName(algorithmName);
+
+ if (flowFile.getSize() == 0) {
+ if (context.getProperty(FAIL_WHEN_EMPTY).asBoolean()) {
+ logger.info("Routing {} to 'failure' because content is empty (and FAIL_WHEN_EMPTY is true)");
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ } else {
+ logger.debug("Flowfile content is empty; hashing with {} anyway", new Object[]{algorithmName});
+ }
+ }
+
+ // Generate a hash with the configured algorithm for the content
+ // and create a new attribute with the configured name
+ logger.debug("Generating {} hash of content", new Object[]{algorithmName});
+ final AtomicReference<String> hashValueHolder = new AtomicReference<>(null);
+
+ try {
+ // Read the flowfile content via a lambda InputStreamCallback and hash the content
+ session.read(flowFile, in -> hashValueHolder.set(HashService.hashValueStreaming(algorithm, in)));
+
+ // Determine the destination attribute name
+ final String attributeName = "content_" + algorithmName;
+ logger.debug("Writing {} hash to attribute '{}'", new Object[]{algorithmName, attributeName});
+
+ // Write the attribute
+ flowFile = session.putAttribute(flowFile, attributeName, hashValueHolder.get());
+ logger.info("Successfully added attribute '{}' to {} with a value of {}; routing to success", new Object[]{attributeName, flowFile, hashValueHolder.get()});
+
+ // Update provenance and route to success
+ session.getProvenanceReporter().modifyAttributes(flowFile);
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (ProcessException e) {
+ logger.error("Failed to process {} due to {}; routing to failure", new Object[]{flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cd687740/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java
index e263705..8d7fe35 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashAttribute.java
@@ -28,7 +28,6 @@ import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -51,6 +50,9 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
/**
+ * This processor <strong>does not calculate a cryptographic hash of one or more attributes</strong>.
+ * For that behavior, see {@link CryptographicHashAttribute}.
+ *
* <p>
* This processor identifies groups of user-specified flowfile attributes and assigns a unique hash value to each group, recording this hash value in the flowfile's attributes using a user-specified
* attribute key. The groups are identified dynamically and preserved across application restarts. </p>
@@ -91,7 +93,7 @@ import org.apache.nifi.processor.util.StandardValidators;
* </p>
*
* <p>
- * The following flow file attributes are created or modified: <ul>
+ * The following flowfile attributes are created or modified: <ul>
* <li><b><group.id.attribute.key></b> - The hash value.</li> </ul> </p>
*/
@EventDriven
@@ -99,13 +101,13 @@ import org.apache.nifi.processor.util.StandardValidators;
@SupportsBatching
@Tags({"attributes", "hash"})
@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Hashes together the key/value pairs of several FlowFile Attributes and adds the hash as a new attribute. "
- + "Optional properties are to be added such that the name of the property is the name of a FlowFile Attribute to consider "
+@CapabilityDescription("Hashes together the key/value pairs of several flowfile attributes and adds the hash as a new attribute. "
+ + "Optional properties are to be added such that the name of the property is the name of a flowfile attribute to consider "
+ "and the value of the property is a regular expression that, if matched by the attribute value, will cause that attribute "
+ "to be used as part of the hash. If the regular expression contains a capturing group, only the value of the capturing "
- + "group will be used.")
+ + "group will be used. " + "For a processor which accepts various attributes and generates a cryptographic hash of each, see \"CryptographicHashAttribute\". ")
@WritesAttribute(attribute = "<Hash Value Attribute Key>", description = "This Processor adds an attribute whose value is the result of "
- + "Hashing the existing FlowFile attributes. The name of this attribute is specified by the <Hash Value Attribute Key> property.")
+ + "Hashing the existing flowfile attributes. The name of this attribute is specified by the <Hash Value Attribute Key> property.")
@DynamicProperty(name = "A flowfile attribute key for attribute inspection", value = "A Regular Expression",
description = "This regular expression is evaluated against the "
+ "flowfile attribute values. If the regular expression contains a capturing "
@@ -116,23 +118,24 @@ public class HashAttribute extends AbstractProcessor {
public static final PropertyDescriptor HASH_VALUE_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Hash Value Attribute Key")
- .description("The name of the FlowFile Attribute where the hash value should be stored")
+ .displayName("Hash Value Attribute Key")
+ .description("The name of the flowfile attribute where the hash value should be stored")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
- .description("Used for FlowFiles that have a hash value added")
+ .description("Used for flowfiles that have a hash value added")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
- .description("Used for FlowFiles that are missing required attributes")
+ .description("Used for flowfiles that are missing required attributes")
.build();
private Set<Relationship> relationships;
private List<PropertyDescriptor> properties;
- private final AtomicReference<Map<String, Pattern>> regexMapRef = new AtomicReference<>(Collections.<String, Pattern>emptyMap());
+ private final AtomicReference<Map<String, Pattern>> regexMapRef = new AtomicReference<>(Collections.emptyMap());
@Override
protected void init(final ProcessorInitializationContext context) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/cd687740/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
index ade251c..52608b0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HashContent.java
@@ -28,13 +28,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
@@ -50,12 +50,15 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
+@Deprecated
+@DeprecationNotice(classNames = {"org.apache.nifi.processors.standard.CryptographicHashContent"}, reason = "This processor is deprecated and may be removed in future releases.")
@EventDriven
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"hash", "content", "MD5", "SHA-1", "SHA-256"})
@CapabilityDescription("Calculates a hash value for the Content of a FlowFile and puts that hash value on the FlowFile as an attribute whose name "
- + "is determined by the <Hash Attribute Name> property")
+ + "is determined by the <Hash Attribute Name> property. "
+ + "This processor did not provide a consistent offering of hash algorithms, and is now deprecated. For modern cryptographic hashing capabilities, see \"CryptographicHashContent\". ")
@WritesAttribute(attribute = "<Hash Attribute Name>", description = "This Processor adds an attribute whose value is the result of Hashing the "
+ "existing FlowFile content. The name of this attribute is specified by the <Hash Attribute Name> property")
public class HashContent extends AbstractProcessor {
@@ -141,7 +144,7 @@ public class HashContent extends AbstractProcessor {
final byte[] hash = digest.digest();
final StringBuilder strb = new StringBuilder(hash.length * 2);
for (int i = 0; i < hash.length; i++) {
- strb.append(Integer.toHexString((hash[i] & 0xFF) | 0x100).substring(1, 3));
+ strb.append(Integer.toHexString((hash[i] & 0xFF) | 0x100), 1, 3);
}
hashValueHolder.set(strb.toString());
http://git-wip-us.apache.org/repos/asf/nifi/blob/cd687740/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/security/util/crypto/HashAlgorithm.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/security/util/crypto/HashAlgorithm.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/security/util/crypto/HashAlgorithm.java
new file mode 100644
index 0000000..1d9413d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/security/util/crypto/HashAlgorithm.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util.crypto;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.nifi.processors.standard.CryptographicHashAttribute;
+import org.apache.nifi.processors.standard.HashContent;
+
+/**
+ * Enumeration capturing information about the cryptographic hash algorithms used in
+ * {@link CryptographicHashAttribute}, {@link HashContent}, and
+ * {@link HashContent} processors.
+ */
+public enum HashAlgorithm {
+
+ MD2("MD2", 16, "Cryptographically broken due to collisions"),
+ MD5("MD5", 16, "Cryptographically broken due to collisions"),
+ SHA1("SHA-1", 20, "Cryptographically broken due to collisions"),
+ SHA224("SHA-224", 28, "SHA-2 family"),
+ SHA256("SHA-256", 32, "SHA-2 family"),
+ SHA384("SHA-384", 48, "SHA-2 family"),
+ SHA512("SHA-512", 64, "SHA-2 family"),
+ SHA512_224("SHA-512/224", 28, "SHA-2 using SHA-512 with truncated output"),
+ SHA512_256("SHA-512/256", 32, "SHA-2 using SHA-512 with truncated output"),
+ SHA3_224("SHA3-224", 28, "Keccak-based SHA3 family"),
+ SHA3_256("SHA3-256", 32, "Keccak-based SHA3 family"),
+ SHA3_384("SHA3-384", 48, "Keccak-based SHA3 family"),
+ SHA3_512("SHA3-512", 64, "Keccak-based SHA3 family"),
+ BLAKE2_160("BLAKE2-160", 20, "Also known as Blake2b"),
+ BLAKE2_256("BLAKE2-256", 32, "Also known as Blake2b"),
+ BLAKE2_384("BLAKE2-384", 48, "Also known as Blake2b"),
+ BLAKE2_512("BLAKE2-512", 64, "Also known as Blake2b");
+
+ private final String name;
+ private final int digestBytesLength;
+ private final String description;
+
+ private static final List<String> BROKEN_ALGORITHMS = Arrays.asList(MD2.name, MD5.name, SHA1.name);
+
+ HashAlgorithm(String name, int digestBytesLength, String description) {
+ this.name = name;
+ this.digestBytesLength = digestBytesLength;
+ this.description = description;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getDigestBytesLength() {
+ return digestBytesLength;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ /**
+ * Returns {@code true} if this algorithm is considered cryptographically secure. These determinations were made as of 2018-08-30.
+ *
+ * Current strong algorithms:
+ *
+ * * SHA-224 (SHA2)
+ * * SHA-256 (SHA2)
+ * * SHA-384 (SHA2)
+ * * SHA-512 (SHA2)
+ * * SHA-512/224 (SHA2)
+ * * SHA-512/256 (SHA2)
+ * * SHA3-224
+ * * SHA3-256
+ * * SHA3-384
+ * * SHA3-512
+ * * Blake2b-160
+ * * Blake2b-256
+ * * Blake2b-384
+ * * Blake2b-512
+ *
+ * Current broken algorithms:
+ *
+ * * MD2
+ * * MD5
+ * * SHA-1
+ *
+ * @return true if the algorithm is considered strong
+ */
+ public boolean isStrongAlgorithm() {
+ return (!BROKEN_ALGORITHMS.contains(name));
+ }
+
+ /**
+ * Returns {@code true} if this hash algorithm is Blake2, as it requires different initialization through BouncyCastle.
+ *
+ * @return true if this algorithm is in the Blake2 family
+ */
+ public boolean isBlake2() {
+ return name.contains("BLAKE2");
+ }
+
+ @Override
+ public String toString() {
+ final ToStringBuilder builder = new ToStringBuilder(this);
+ ToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE);
+ builder.append("Algorithm Name", name);
+ builder.append("Digest Length", digestBytesLength + " bytes");
+ builder.append("Description", description);
+ return builder.toString();
+ }
+
+ /**
+ * Returns a more complete description of the algorithm for {@link org.apache.nifi.components.AllowableValue} construction.
+ *
+ * Ex:
+ *
+ * {@code description} -- Cryptographically broken due to collisions
+ * {@code buildAllowableValueDescription} -- SHA-1 (20 byte output) [WARNING -- Cryptographically broken] Cryptographically broken due to collisions
+ *
+ * @return the description for dropdown help
+ */
+ public String buildAllowableValueDescription() {
+ StringBuilder sb = new StringBuilder(name);
+ sb.append(" (").append(digestBytesLength).append(" byte output)");
+ if (!isStrongAlgorithm()) {
+ sb.append(" [WARNING -- Cryptographically broken]");
+ }
+ if (StringUtils.isNotBlank(description)) {
+ sb.append(" ").append(description);
+ }
+ return sb.toString();
+ }
+
+ public static HashAlgorithm fromName(String algorithmName) {
+ HashAlgorithm match = Arrays.stream(HashAlgorithm.values())
+ .filter(algo -> algorithmName.equalsIgnoreCase(algo.name))
+ .findAny()
+ .orElse(null);
+ if (match == null) {
+ throw new IllegalArgumentException("No algorithm matches " + algorithmName);
+ } else {
+ return match;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cd687740/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/security/util/crypto/HashService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/security/util/crypto/HashService.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/security/util/crypto/HashService.java
new file mode 100644
index 0000000..85d0dba
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/security/util/crypto/HashService.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util.crypto;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.processors.standard.CryptographicHashAttribute;
+import org.apache.nifi.processors.standard.CryptographicHashContent;
+import org.apache.nifi.processors.standard.HashAttribute;
+import org.apache.nifi.processors.standard.HashContent;
+import org.bouncycastle.crypto.digests.Blake2bDigest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides a generic service for cryptographic hashing. It is used in
+ * {@link CryptographicHashAttribute}, {@link CryptographicHashContent}, {@link HashAttribute}, and
+ * {@link HashContent}.
+ * <p>
+ * See also:
+ * * {@link HashAlgorithm}
+ */
+public class HashService {
+ private static final Logger logger = LoggerFactory.getLogger(HashService.class);
+ private static final int BUFFER_SIZE = 8192;
+ private static final String UTF_16_DESCRIPTION = "This character set normally decodes using an optional BOM at the beginning of the data but encodes by inserting a BE BOM. " +
+ "For hashing, it will be replaced with UTF-16BE. ";
+
+ /**
+ * Returns an array of {@link AllowableValue} elements for each {@link HashAlgorithm}. The
+ * complete {@code description} is built from the digest length, safety warnings, etc. See
+ * {@link HashAlgorithm#buildAllowableValueDescription()}.
+ *
+ * @return an ordered {@code AllowableValue[]} containing the values
+ */
+ public static AllowableValue[] buildHashAlgorithmAllowableValues() {
+ final HashAlgorithm[] hashAlgorithms = HashAlgorithm.values();
+ List<AllowableValue> allowableValues = new ArrayList<>(hashAlgorithms.length);
+ for (HashAlgorithm algorithm : hashAlgorithms) {
+ allowableValues.add(new AllowableValue(algorithm.getName(), algorithm.getName(), algorithm.buildAllowableValueDescription()));
+ }
+
+ return allowableValues.toArray(new AllowableValue[0]);
+ }
+
+ /**
+ * Returns an array of {@link AllowableValue} elements for each {@link Charset}. Only the charsets in {@link StandardCharsets} are returned to be consistent across JVM instances.
+ *
+ * @return an ordered {@code AllowableValue[]} containing the values
+ */
+ public static AllowableValue[] buildCharacterSetAllowableValues() {
+ final List<Charset> charsets = getSupportedCharsets();
+ return charsets.stream().map(cs ->
+ new AllowableValue(cs.name(),
+ cs.displayName(),
+ cs == StandardCharsets.UTF_16 ? UTF_16_DESCRIPTION : cs.displayName())
+ ).toArray(AllowableValue[]::new);
+ }
+
+ /**
+ * Returns a {@link List} of supported {@link Charset}s on this platform. This is not a complete
+ * list, as only the charsets in {@link StandardCharsets} are returned to be consistent across
+ * JVM instances.
+ *
+ * @return the list of charsets
+ */
+ public static List<Charset> getSupportedCharsets() {
+ return Arrays.asList(StandardCharsets.US_ASCII,
+ StandardCharsets.ISO_8859_1,
+ StandardCharsets.UTF_8,
+ StandardCharsets.UTF_16BE,
+ StandardCharsets.UTF_16LE,
+ StandardCharsets.UTF_16);
+ }
+
+ /**
+ * Returns the hash of the specified value. This method uses an {@link java.io.InputStream} to perform the operation in a streaming manner for large inputs.
+ *
+ * @param algorithm the hash algorithm to use
+ * @param value the value to hash (cannot be {@code null} but can be an empty stream)
+ * @return the hash value in hex
+ */
+ public static String hashValueStreaming(HashAlgorithm algorithm, InputStream value) throws IOException {
+ if (algorithm == null) {
+ throw new IllegalArgumentException("The hash algorithm cannot be null");
+ }
+ if (value == null) {
+ throw new IllegalArgumentException("The value cannot be null");
+ }
+ // The Blake2 algorithms are instantiated differently and rely on BouncyCastle
+ if (algorithm.isBlake2()) {
+ return Hex.encodeHexString(blake2HashStreaming(algorithm, value));
+ } else {
+ return Hex.encodeHexString(traditionalHashStreaming(algorithm, value));
+ }
+ }
+
+ /**
+ * Returns the hex-encoded hash of the specified value.
+ *
+ * @param algorithm the hash algorithm to use
+ * @param value the value to hash (cannot be {@code null} but can be an empty String)
+ * @param charset the charset to use
+ * @return the hash value in hex
+ */
+ public static String hashValue(HashAlgorithm algorithm, String value, Charset charset) {
+ byte[] rawHash = hashValueRaw(algorithm, value, charset);
+ return Hex.encodeHexString(rawHash);
+ }
+
+ /**
+ * Returns the hex-encoded hash of the specified value. The default charset ({@code StandardCharsets.UTF_8}) is used.
+ *
+ * @param algorithm the hash algorithm to use
+ * @param value the value to hash (cannot be {@code null} but can be an empty String)
+ * @return the hash value in hex
+ */
+ public static String hashValue(HashAlgorithm algorithm, String value) {
+ return hashValue(algorithm, value, StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Returns the raw {@code byte[]} hash of the specified value.
+ *
+ * @param algorithm the hash algorithm to use
+ * @param value the value to hash (cannot be {@code null} but can be an empty String)
+ * @param charset the charset to use
+ * @return the hash value in bytes
+ */
+ public static byte[] hashValueRaw(HashAlgorithm algorithm, String value, Charset charset) {
+ if (value == null) {
+ throw new IllegalArgumentException("The value cannot be null");
+ }
+ /** See the note on {@link HashServiceTest#testHashValueShouldHandleUTF16BOMIssue()} */
+ if (charset == StandardCharsets.UTF_16) {
+ logger.warn("The charset provided was UTF-16, but Java will insert a Big Endian BOM in the decoded message before hashing, so switching to UTF-16BE");
+ charset = StandardCharsets.UTF_16BE;
+ }
+ return hashValueRaw(algorithm, value.getBytes(charset));
+ }
+
+ /**
+ * Returns the raw {@code byte[]} hash of the specified value. The default charset ({@code StandardCharsets.UTF_8}) is used.
+ *
+ * @param algorithm the hash algorithm to use
+ * @param value the value to hash (cannot be {@code null} but can be an empty String)
+ * @return the hash value in bytes
+ */
+ public static byte[] hashValueRaw(HashAlgorithm algorithm, String value) {
+ return hashValueRaw(algorithm, value, StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Returns the raw {@code byte[]} hash of the specified value.
+ *
+ * @param algorithm the hash algorithm to use
+ * @param value the value to hash
+ * @return the hash value in bytes
+ */
+ public static byte[] hashValueRaw(HashAlgorithm algorithm, byte[] value) {
+ if (algorithm == null) {
+ throw new IllegalArgumentException("The hash algorithm cannot be null");
+ }
+ if (value == null) {
+ throw new IllegalArgumentException("The value cannot be null");
+ }
+ if (algorithm.isBlake2()) {
+ return blake2Hash(algorithm, value);
+ } else {
+ return traditionalHash(algorithm, value);
+ }
+ }
+
+ private static byte[] traditionalHash(HashAlgorithm algorithm, byte[] value) {
+ return DigestUtils.getDigest(algorithm.getName()).digest(value);
+ }
+
+ private static byte[] traditionalHashStreaming(HashAlgorithm algorithm, InputStream value) throws IOException {
+ MessageDigest digest = DigestUtils.getDigest(algorithm.getName());
+ // DigestInputStream digestInputStream = new DigestInputStream(value, digest);
+ return DigestUtils.digest(digest, value);
+ }
+
+ private static byte[] blake2Hash(HashAlgorithm algorithm, byte[] value) {
+ int digestLengthBytes = algorithm.getDigestBytesLength();
+ Blake2bDigest blake2bDigest = new Blake2bDigest(digestLengthBytes * 8);
+ byte[] rawHash = new byte[blake2bDigest.getDigestSize()];
+ blake2bDigest.update(value, 0, value.length);
+ blake2bDigest.doFinal(rawHash, 0);
+ return rawHash;
+ }
+
+ private static byte[] blake2HashStreaming(HashAlgorithm algorithm, InputStream value) throws IOException {
+ int digestLengthBytes = algorithm.getDigestBytesLength();
+ Blake2bDigest blake2bDigest = new Blake2bDigest(digestLengthBytes * 8);
+ byte[] rawHash = new byte[blake2bDigest.getDigestSize()];
+
+ final byte[] buffer = new byte[BUFFER_SIZE];
+ int read = value.read(buffer, 0, BUFFER_SIZE);
+
+ while (read > -1) {
+ blake2bDigest.update(buffer, 0, read);
+ read = value.read(buffer, 0, BUFFER_SIZE);
+ }
+
+ blake2bDigest.doFinal(rawHash, 0);
+ return rawHash;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cd687740/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 8fc361d..d21b7f4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,8 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.processors.standard.AttributesToJSON
org.apache.nifi.processors.standard.AttributesToCSV
+org.apache.nifi.processors.standard.AttributesToJSON
org.apache.nifi.processors.standard.Base64EncodeContent
org.apache.nifi.processors.standard.CalculateRecordStats
org.apache.nifi.processors.standard.CompressContent
@@ -22,6 +22,8 @@ org.apache.nifi.processors.standard.ConvertCharacterSet
org.apache.nifi.processors.standard.ConvertJSONToSQL
org.apache.nifi.processors.standard.ConvertRecord
org.apache.nifi.processors.standard.CountText
+org.apache.nifi.processors.standard.CryptographicHashAttribute
+org.apache.nifi.processors.standard.CryptographicHashContent
org.apache.nifi.processors.standard.DebugFlow
org.apache.nifi.processors.standard.DetectDuplicate
org.apache.nifi.processors.standard.DistributeLoad
@@ -31,17 +33,24 @@ org.apache.nifi.processors.standard.EnforceOrder
org.apache.nifi.processors.standard.EvaluateJsonPath
org.apache.nifi.processors.standard.EvaluateXPath
org.apache.nifi.processors.standard.EvaluateXQuery
-org.apache.nifi.processors.standard.ExecuteStreamCommand
org.apache.nifi.processors.standard.ExecuteProcess
+org.apache.nifi.processors.standard.ExecuteSQL
+org.apache.nifi.processors.standard.ExecuteStreamCommand
+org.apache.nifi.processors.standard.ExtractGrok
org.apache.nifi.processors.standard.ExtractText
-org.apache.nifi.processors.standard.FetchSFTP
+org.apache.nifi.processors.standard.FetchDistributedMapCache
org.apache.nifi.processors.standard.FetchFile
+org.apache.nifi.processors.standard.FetchFTP
+org.apache.nifi.processors.standard.FetchSFTP
org.apache.nifi.processors.standard.FlattenJson
org.apache.nifi.processors.standard.ForkRecord
org.apache.nifi.processors.standard.GenerateFlowFile
+org.apache.nifi.processors.standard.GenerateTableFetch
org.apache.nifi.processors.standard.GetFile
org.apache.nifi.processors.standard.GetFTP
org.apache.nifi.processors.standard.GetHTTP
+org.apache.nifi.processors.standard.GetJMSQueue
+org.apache.nifi.processors.standard.GetJMSTopic
org.apache.nifi.processors.standard.GetSFTP
org.apache.nifi.processors.standard.HandleHttpRequest
org.apache.nifi.processors.standard.HandleHttpResponse
@@ -50,12 +59,7 @@ org.apache.nifi.processors.standard.HashContent
org.apache.nifi.processors.standard.IdentifyMimeType
org.apache.nifi.processors.standard.InvokeHTTP
org.apache.nifi.processors.standard.JoltTransformJSON
-org.apache.nifi.processors.standard.GenerateTableFetch
-org.apache.nifi.processors.standard.GetJMSQueue
-org.apache.nifi.processors.standard.GetJMSTopic
-org.apache.nifi.processors.standard.ExtractGrok
org.apache.nifi.processors.standard.ListDatabaseTables
-org.apache.nifi.processors.standard.ListFile
org.apache.nifi.processors.standard.ListenHTTP
org.apache.nifi.processors.standard.ListenRELP
org.apache.nifi.processors.standard.ListenSyslog
@@ -63,6 +67,8 @@ org.apache.nifi.processors.standard.ListenTCP
org.apache.nifi.processors.standard.ListenTCPRecord
org.apache.nifi.processors.standard.ListenUDP
org.apache.nifi.processors.standard.ListenUDPRecord
+org.apache.nifi.processors.standard.ListFile
+org.apache.nifi.processors.standard.ListFTP
org.apache.nifi.processors.standard.ListSFTP
org.apache.nifi.processors.standard.LogAttribute
org.apache.nifi.processors.standard.LogMessage
@@ -92,10 +98,10 @@ org.apache.nifi.processors.standard.PutUDP
org.apache.nifi.processors.standard.QueryDatabaseTable
org.apache.nifi.processors.standard.QueryRecord
org.apache.nifi.processors.standard.ReplaceText
-org.apache.nifi.processors.standard.RouteText
org.apache.nifi.processors.standard.ReplaceTextWithMapping
org.apache.nifi.processors.standard.RouteOnAttribute
org.apache.nifi.processors.standard.RouteOnContent
+org.apache.nifi.processors.standard.RouteText
org.apache.nifi.processors.standard.ScanAttribute
org.apache.nifi.processors.standard.ScanContent
org.apache.nifi.processors.standard.SegmentContent
@@ -107,13 +113,9 @@ org.apache.nifi.processors.standard.SplitXml
org.apache.nifi.processors.standard.TailFile
org.apache.nifi.processors.standard.TransformXml
org.apache.nifi.processors.standard.UnpackContent
-org.apache.nifi.processors.standard.ValidateXml
+org.apache.nifi.processors.standard.UpdateCounter
+org.apache.nifi.processors.standard.UpdateRecord
org.apache.nifi.processors.standard.ValidateCsv
org.apache.nifi.processors.standard.ValidateRecord
+org.apache.nifi.processors.standard.ValidateXml
org.apache.nifi.processors.standard.Wait
-org.apache.nifi.processors.standard.ExecuteSQL
-org.apache.nifi.processors.standard.FetchDistributedMapCache
-org.apache.nifi.processors.standard.ListFTP
-org.apache.nifi.processors.standard.FetchFTP
-org.apache.nifi.processors.standard.UpdateCounter
-org.apache.nifi.processors.standard.UpdateRecord
http://git-wip-us.apache.org/repos/asf/nifi/blob/cd687740/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CryptographicHashAttributeTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CryptographicHashAttributeTest.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CryptographicHashAttributeTest.groovy
new file mode 100644
index 0000000..7e588c7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CryptographicHashAttributeTest.groovy
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard
+
+
+import org.apache.nifi.security.util.crypto.HashAlgorithm
+import org.apache.nifi.security.util.crypto.HashService
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.junit.After
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import java.nio.charset.Charset
+import java.nio.charset.StandardCharsets
+import java.security.Security
+
+@RunWith(JUnit4.class)
+class CryptographicHashAttributeTest extends GroovyTestCase {
+ private static final Logger logger = LoggerFactory.getLogger(CryptographicHashAttributeTest.class)
+
+
+ @BeforeClass
+ static void setUpOnce() throws Exception {
+ Security.addProvider(new BouncyCastleProvider())
+
+ logger.metaClass.methodMissing = { String name, args ->
+ logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+ }
+ }
+
+ @Before
+ void setUp() throws Exception {
+ }
+
+ @After
+ void tearDown() throws Exception {
+ }
+
+ @Test
+ void testShouldCalculateHashOfPresentAttribute() {
+ // Arrange
+ def algorithms = HashAlgorithm.values()
+
+ final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashAttribute())
+
+ // Create attributes for username and date
+ def attributes = [
+ username: "alopresto",
+ date : new Date().format("YYYY-MM-dd HH:mm:ss.SSS Z")
+ ]
+ def attributeKeys = attributes.keySet()
+
+ algorithms.each { HashAlgorithm algorithm ->
+ final EXPECTED_USERNAME_HASH = HashService.hashValue(algorithm, attributes["username"])
+ logger.expected("${algorithm.name.padLeft(11)}(${attributes["username"]}) = ${EXPECTED_USERNAME_HASH}")
+ final EXPECTED_DATE_HASH = HashService.hashValue(algorithm, attributes["date"])
+ logger.expected("${algorithm.name.padLeft(11)}(${attributes["date"]}) = ${EXPECTED_DATE_HASH}")
+
+ // Reset the processor
+ runner.clearProperties()
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
+
+ // Set the algorithm
+ logger.info("Setting hash algorithm to ${algorithm.name}")
+ runner.setProperty(CryptographicHashAttribute.HASH_ALGORITHM, algorithm.name)
+
+ // Add the desired dynamic properties
+ attributeKeys.each { String attr ->
+ runner.setProperty(attr, "${attr}_${algorithm.name}")
+ }
+
+ // Insert the attributes in the mock flowfile
+ runner.enqueue(new byte[0], attributes)
+
+ // Act
+ runner.run(1)
+
+ // Assert
+ runner.assertTransferCount(CryptographicHashAttribute.REL_FAILURE, 0)
+ runner.assertTransferCount(CryptographicHashAttribute.REL_SUCCESS, 1)
+
+ final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashAttribute.REL_SUCCESS)
+
+ // Extract the generated attributes from the flowfile
+ MockFlowFile flowFile = successfulFlowfiles.first()
+ String hashedUsername = flowFile.getAttribute("username_${algorithm.name}")
+ logger.info("flowfile.username_${algorithm.name} = ${hashedUsername}")
+ String hashedDate = flowFile.getAttribute("date_${algorithm.name}")
+ logger.info("flowfile.date_${algorithm.name} = ${hashedDate}")
+
+ assert hashedUsername == EXPECTED_USERNAME_HASH
+ assert hashedDate == EXPECTED_DATE_HASH
+ }
+ }
+
+ @Test
+ void testShouldCalculateHashOfMissingAttribute() {
+ // Arrange
+ def algorithms = HashAlgorithm.values()
+
+ final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashAttribute())
+
+ // Create attributes for username (empty string) and date (null)
+ def attributes = [
+ username: "",
+ date : null
+ ]
+ def attributeKeys = attributes.keySet()
+
+ algorithms.each { HashAlgorithm algorithm ->
+ final EXPECTED_USERNAME_HASH = HashService.hashValue(algorithm, attributes["username"])
+ logger.expected("${algorithm.name.padLeft(11)}(${attributes["username"]}) = ${EXPECTED_USERNAME_HASH}")
+ final EXPECTED_DATE_HASH = null
+ logger.expected("${algorithm.name.padLeft(11)}(${attributes["date"]}) = ${EXPECTED_DATE_HASH}")
+
+ // Reset the processor
+ runner.clearProperties()
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
+
+ // Set the algorithm
+ logger.info("Setting hash algorithm to ${algorithm.name}")
+ runner.setProperty(CryptographicHashAttribute.HASH_ALGORITHM, algorithm.name)
+
+ // Add the desired dynamic properties
+ attributeKeys.each { String attr ->
+ runner.setProperty(attr, "${attr}_${algorithm.name}")
+ }
+
+ // Insert the attributes in the mock flowfile
+ runner.enqueue(new byte[0], attributes)
+
+ // Act
+ runner.run(1)
+
+ // Assert
+ runner.assertTransferCount(CryptographicHashAttribute.REL_FAILURE, 0)
+ runner.assertTransferCount(CryptographicHashAttribute.REL_SUCCESS, 1)
+
+ final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashAttribute.REL_SUCCESS)
+
+ // Extract the generated attributes from the flowfile
+ MockFlowFile flowFile = successfulFlowfiles.first()
+ String hashedUsername = flowFile.getAttribute("username_${algorithm.name}")
+ logger.info("flowfile.username_${algorithm.name} = ${hashedUsername}")
+ String hashedDate = flowFile.getAttribute("date_${algorithm.name}")
+ logger.info("flowfile.date_${algorithm.name} = ${hashedDate}")
+
+ assert hashedUsername == EXPECTED_USERNAME_HASH
+ assert hashedDate == EXPECTED_DATE_HASH
+ }
+ }
+
+ @Test
+ void testShouldRouteToFailureOnProhibitedMissingAttribute() {
+ // Arrange
+ def algorithms = HashAlgorithm.values()
+
+ final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashAttribute())
+
+ // Create attributes for username (empty string) and date (null)
+ def attributes = [
+ username: "",
+ date : null
+ ]
+ def attributeKeys = attributes.keySet()
+
+ algorithms.each { HashAlgorithm algorithm ->
+ final EXPECTED_USERNAME_HASH = HashService.hashValue(algorithm, attributes["username"])
+ logger.expected("${algorithm.name.padLeft(11)}(${attributes["username"]}) = ${EXPECTED_USERNAME_HASH}")
+ final EXPECTED_DATE_HASH = null
+ logger.expected("${algorithm.name.padLeft(11)}(${attributes["date"]}) = ${EXPECTED_DATE_HASH}")
+
+ // Reset the processor
+ runner.clearProperties()
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
+
+ // Set the algorithm
+ logger.info("Setting hash algorithm to ${algorithm.name}")
+ runner.setProperty(CryptographicHashAttribute.HASH_ALGORITHM, algorithm.name)
+
+ // Set to fail if there are missing attributes
+ runner.setProperty(CryptographicHashAttribute.PARTIAL_ATTR_ROUTE_POLICY, CryptographicHashAttribute.PartialAttributePolicy.PROHIBIT.name())
+
+ // Add the desired dynamic properties
+ attributeKeys.each { String attr ->
+ runner.setProperty(attr, "${attr}_${algorithm.name}")
+ }
+
+ // Insert the attributes in the mock flowfile
+ runner.enqueue(new byte[0], attributes)
+
+ // Act
+ runner.run(1)
+
+ // Assert
+ runner.assertTransferCount(CryptographicHashAttribute.REL_FAILURE, 1)
+ runner.assertTransferCount(CryptographicHashAttribute.REL_SUCCESS, 0)
+
+ final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(CryptographicHashAttribute.REL_FAILURE)
+
+ // Extract the generated attributes from the flowfile
+ MockFlowFile flowFile = failedFlowFiles.first()
+ logger.info("Failed flowfile has attributes ${flowFile.attributes}")
+ attributeKeys.each { String missingAttribute ->
+ flowFile.assertAttributeNotExists("${missingAttribute}_${algorithm.name}")
+ }
+ }
+ }
+
+ @Test
+ void testShouldRouteToFailureOnEmptyAttributes() {
+ // Arrange
+ def algorithms = HashAlgorithm.values()
+
+ final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashAttribute())
+
+ // Create attributes for username (empty string) and date (null)
+ def attributes = [
+ username: "",
+ date : null
+ ]
+ def attributeKeys = attributes.keySet()
+
+ algorithms.each { HashAlgorithm algorithm ->
+ // Reset the processor
+ runner.clearProperties()
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
+
+ // Set the algorithm
+ logger.info("Setting hash algorithm to ${algorithm.name}")
+ runner.setProperty(CryptographicHashAttribute.HASH_ALGORITHM, algorithm.name)
+
+ // Set to fail if all attributes are missing
+ runner.setProperty(CryptographicHashAttribute.FAIL_WHEN_EMPTY, "true")
+
+ // Insert the attributes in the mock flowfile
+ runner.enqueue(new byte[0], attributes)
+
+ // Act
+ runner.run(1)
+
+ // Assert
+ runner.assertTransferCount(CryptographicHashAttribute.REL_FAILURE, 1)
+ runner.assertTransferCount(CryptographicHashAttribute.REL_SUCCESS, 0)
+
+ final List<MockFlowFile> failedFlowFiles = runner.getFlowFilesForRelationship(CryptographicHashAttribute.REL_FAILURE)
+
+ // Extract the generated attributes from the flowfile
+ MockFlowFile flowFile = failedFlowFiles.first()
+ logger.info("Failed flowfile has attributes ${flowFile.attributes}")
+ attributeKeys.each { String missingAttribute ->
+ flowFile.assertAttributeNotExists("${missingAttribute}_${algorithm.name}")
+ }
+ }
+ }
+
+ @Test
+ void testShouldCalculateHashWithVariousCharacterEncodings() {
+ // Arrange
+ final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashAttribute())
+
+ // Create attributes
+ def attributes = [test_attribute: "apachenifi"]
+ def attributeKeys = attributes.keySet()
+
+ HashAlgorithm algorithm = HashAlgorithm.MD5
+
+ List<Charset> charsets = [StandardCharsets.UTF_8, StandardCharsets.UTF_16, StandardCharsets.UTF_16LE, StandardCharsets.UTF_16BE]
+
+ final def EXPECTED_MD5_HASHES = [
+ "utf_8" : "a968b5ec1d52449963dcc517789baaaf",
+ "utf_16" : "b8413d18f7e64042bb0322a1cd61eba2",
+ "utf_16be": "b8413d18f7e64042bb0322a1cd61eba2",
+ "utf_16le": "91c3b67f9f8ae77156f21f271cc09121",
+ ]
+ EXPECTED_MD5_HASHES.each { k, hash ->
+ logger.expected("MD5(${k.padLeft(9)}(${attributes["test_attribute"]})) = ${hash}")
+ }
+
+ charsets.each { Charset charset ->
+ // Calculate the expected hash value given the character set
+ final EXPECTED_HASH = HashService.hashValue(algorithm, attributes["test_attribute"], charset)
+ logger.expected("${algorithm.name}(${attributes["test_attribute"]}, ${charset.name()}) = ${EXPECTED_HASH}")
+
+ // Sanity check
+ assert EXPECTED_HASH == EXPECTED_MD5_HASHES[translateEncodingToMapKey(charset.name())]
+
+ // Reset the processor
+ runner.clearProperties()
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
+
+ // Set the properties
+ logger.info("Setting hash algorithm to ${algorithm.name}")
+ runner.setProperty(CryptographicHashAttribute.HASH_ALGORITHM, algorithm.name)
+
+ logger.info("Setting character set to ${charset.name()}")
+ runner.setProperty(CryptographicHashAttribute.CHARACTER_SET, charset.name())
+
+ // Add the desired dynamic properties
+ attributeKeys.each { String attr ->
+ runner.setProperty(attr, "${attr}_${algorithm.name}")
+ }
+
+ // Insert the attributes in the mock flowfile
+ runner.enqueue(new byte[0], attributes)
+
+ // Act
+ runner.run(1)
+
+ // Assert
+ runner.assertTransferCount(CryptographicHashAttribute.REL_FAILURE, 0)
+ runner.assertTransferCount(CryptographicHashAttribute.REL_SUCCESS, 1)
+
+ final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashAttribute.REL_SUCCESS)
+
+ // Extract the generated attributes from the flowfile
+ MockFlowFile flowFile = successfulFlowfiles.first()
+ String hashedAttribute = flowFile.getAttribute("test_attribute_${algorithm.name}")
+ logger.info("flowfile.test_attribute_${algorithm.name} = ${hashedAttribute}")
+
+ assert hashedAttribute == EXPECTED_HASH
+ }
+ }
+
+ static String translateEncodingToMapKey(String charsetName) {
+ charsetName.toLowerCase().replaceAll(/[-\/]/, '_')
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/cd687740/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CryptographicHashContentTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CryptographicHashContentTest.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CryptographicHashContentTest.groovy
new file mode 100644
index 0000000..ec25594
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CryptographicHashContentTest.groovy
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard
+
+
+import org.apache.nifi.security.util.crypto.HashAlgorithm
+import org.apache.nifi.security.util.crypto.HashService
+import org.apache.nifi.util.MockFlowFile
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.junit.After
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import java.nio.charset.StandardCharsets
+import java.security.Security
+
+@RunWith(JUnit4.class)
+class CryptographicHashContentTest extends GroovyTestCase {
+ private static final Logger logger = LoggerFactory.getLogger(CryptographicHashContentTest.class)
+
+ @BeforeClass
+ static void setUpOnce() throws Exception {
+ Security.addProvider(new BouncyCastleProvider())
+
+ logger.metaClass.methodMissing = { String name, args ->
+ logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+ }
+ }
+
+ @Before
+ void setUp() throws Exception {
+ }
+
+ @After
+ void tearDown() throws Exception {
+ }
+
+ @Test
+ void testShouldCalculateHashOfPresentContent() {
+ // Arrange
+ def algorithms = HashAlgorithm.values()
+
+ // Generate some long content (90 KB)
+ final String LONG_CONTENT = "apachenifi " * 8192
+
+ final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashContent())
+
+ algorithms.each { HashAlgorithm algorithm ->
+ final String EXPECTED_CONTENT_HASH = HashService.hashValueStreaming(algorithm, new ByteArrayInputStream(LONG_CONTENT.bytes))
+ logger.info("Expected ${algorithm.name.padLeft(11)}: ${EXPECTED_CONTENT_HASH}")
+
+ // Reset the processor
+ runner.clearProperties()
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
+
+ // Set the algorithm
+ logger.info("Setting hash algorithm to ${algorithm.name}")
+ runner.setProperty(CryptographicHashContent.HASH_ALGORITHM, algorithm.name)
+
+ // Insert the content in the mock flowfile
+ runner.enqueue(LONG_CONTENT.getBytes(StandardCharsets.UTF_8),
+ [size: LONG_CONTENT.length() as String])
+
+ // Act
+ runner.run(1)
+
+ // Assert
+ runner.assertTransferCount(CryptographicHashContent.REL_FAILURE, 0)
+ runner.assertTransferCount(CryptographicHashContent.REL_SUCCESS, 1)
+
+ final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashContent.REL_SUCCESS)
+
+ // Extract the generated attributes from the flowfile
+ MockFlowFile flowFile = successfulFlowfiles.first()
+ String hashAttribute = "content_${algorithm.name}"
+ flowFile.assertAttributeExists(hashAttribute)
+
+ String hashedContent = flowFile.getAttribute(hashAttribute)
+ logger.info("flowfile.${hashAttribute} = ${hashedContent}")
+
+ assert hashedContent == EXPECTED_CONTENT_HASH
+ }
+ }
+
+ @Test
+ void testShouldCalculateHashOfEmptyContent() {
+ // Arrange
+ def algorithms = HashAlgorithm.values()
+
+ final String EMPTY_CONTENT = ""
+
+ final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashContent())
+
+ algorithms.each { HashAlgorithm algorithm ->
+ final String EXPECTED_CONTENT_HASH = HashService.hashValueStreaming(algorithm, new ByteArrayInputStream(EMPTY_CONTENT.bytes))
+ logger.info("Expected ${algorithm.name.padLeft(11)}: ${EXPECTED_CONTENT_HASH}")
+
+ // Reset the processor
+ runner.clearProperties()
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
+
+ // Set the algorithm
+ logger.info("Setting hash algorithm to ${algorithm.name}")
+ runner.setProperty(CryptographicHashContent.HASH_ALGORITHM, algorithm.name)
+
+ // Insert the content in the mock flowfile
+ runner.enqueue(EMPTY_CONTENT.getBytes(StandardCharsets.UTF_8), [size: "0"])
+
+ // Act
+ runner.run(1)
+
+ // Assert
+ runner.assertTransferCount(CryptographicHashContent.REL_FAILURE, 0)
+ runner.assertTransferCount(CryptographicHashContent.REL_SUCCESS, 1)
+
+ final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashContent.REL_SUCCESS)
+
+ // Extract the generated attributes from the flowfile
+ MockFlowFile flowFile = successfulFlowfiles.first()
+ String hashAttribute = "content_${algorithm.name}"
+ flowFile.assertAttributeExists(hashAttribute)
+
+ String hashedContent = flowFile.getAttribute(hashAttribute)
+ logger.info("flowfile.${hashAttribute} = ${hashedContent}")
+
+ assert hashedContent == EXPECTED_CONTENT_HASH
+ }
+ }
+
+ /**
+ * This test works because {@link MockFlowFile} uses the actual internal {@code data.size} for {@code getSize ( )}, while {@code StandardFlowFileRecord} uses a separate {@code size} field. May need to use {@code flowfile.getContentClaim ( ) .getLength ( )}.
+ */
+ @Test
+ void testShouldCalculateHashOfContentWithIncorrectSizeAttribute() {
+ // Arrange
+ def algorithms = HashAlgorithm.values()
+
+ final String NON_EMPTY_CONTENT = "apachenifi"
+
+ final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashContent())
+
+ algorithms.each { HashAlgorithm algorithm ->
+ final String EXPECTED_CONTENT_HASH = HashService.hashValueStreaming(algorithm, new ByteArrayInputStream(NON_EMPTY_CONTENT.bytes))
+ logger.info("Expected ${algorithm.name.padLeft(11)}: ${EXPECTED_CONTENT_HASH}")
+
+ // Reset the processor
+ runner.clearProperties()
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
+
+ // Set the algorithm
+ logger.info("Setting hash algorithm to ${algorithm.name}")
+ runner.setProperty(CryptographicHashContent.HASH_ALGORITHM, algorithm.name)
+
+ // Insert the content in the mock flowfile (with the wrong size attribute)
+ runner.enqueue(NON_EMPTY_CONTENT.getBytes(StandardCharsets.UTF_8), [size: "0"])
+
+ // Act
+ runner.run(1)
+
+ // Assert
+ runner.assertTransferCount(CryptographicHashContent.REL_FAILURE, 0)
+ runner.assertTransferCount(CryptographicHashContent.REL_SUCCESS, 1)
+
+ final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashContent.REL_SUCCESS)
+
+ // Extract the generated attributes from the flowfile
+ MockFlowFile flowFile = successfulFlowfiles.first()
+ String hashAttribute = "content_${algorithm.name}"
+ flowFile.assertAttributeExists(hashAttribute)
+
+ String hashedContent = flowFile.getAttribute(hashAttribute)
+ logger.info("flowfile.${hashAttribute} = ${hashedContent}")
+
+ assert hashedContent == EXPECTED_CONTENT_HASH
+ }
+ }
+
+ @Test
+ void testShouldOverwriteExistingAttribute() {
+ // Arrange
+ final String NON_EMPTY_CONTENT = "apachenifi"
+ final String OLD_HASH_ATTRIBUTE_VALUE = "OLD VALUE"
+
+ HashAlgorithm algorithm = HashAlgorithm.SHA256
+
+ final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashContent())
+
+ final String EXPECTED_CONTENT_HASH = HashService.hashValue(algorithm, NON_EMPTY_CONTENT)
+ logger.info("Expected ${algorithm.name.padLeft(11)}: ${EXPECTED_CONTENT_HASH}")
+
+ // Set the algorithm
+ logger.info("Setting hash algorithm to ${algorithm.name}")
+ runner.setProperty(CryptographicHashContent.HASH_ALGORITHM, algorithm.name)
+
+ // Insert the content in the mock flowfile (with an existing attribute)
+ def oldAttributes = [("content_${algorithm.name}".toString()): OLD_HASH_ATTRIBUTE_VALUE]
+ runner.enqueue(NON_EMPTY_CONTENT.getBytes(StandardCharsets.UTF_8),
+ oldAttributes)
+
+ // Act
+ runner.run(1)
+
+ // Assert
+ runner.assertTransferCount(CryptographicHashContent.REL_FAILURE, 0)
+ runner.assertTransferCount(CryptographicHashContent.REL_SUCCESS, 1)
+
+ final List<MockFlowFile> successfulFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashContent.REL_SUCCESS)
+
+ // Extract the generated attributes from the flowfile
+ MockFlowFile flowFile = successfulFlowfiles.first()
+ String hashAttribute = "content_${algorithm.name}"
+ flowFile.assertAttributeExists(hashAttribute)
+
+ String hashedContent = flowFile.getAttribute(hashAttribute)
+ logger.info("flowfile.${hashAttribute} = ${hashedContent}")
+
+ assert hashedContent != OLD_HASH_ATTRIBUTE_VALUE
+ assert hashedContent == EXPECTED_CONTENT_HASH
+ }
+
+ @Test
+ void testShouldRouteToFailureOnEmptyContent() {
+ // Arrange
+ def algorithms = HashAlgorithm.values()
+
+ final String EMPTY_CONTENT = ""
+
+ final TestRunner runner = TestRunners.newTestRunner(new CryptographicHashContent())
+
+ algorithms.each { HashAlgorithm algorithm ->
+ final String EXPECTED_CONTENT_HASH = HashService.hashValueStreaming(algorithm, new ByteArrayInputStream(EMPTY_CONTENT.bytes))
+ logger.info("Expected ${algorithm.name.padLeft(11)}: ${EXPECTED_CONTENT_HASH}")
+
+ // Reset the processor
+ runner.clearProperties()
+ runner.clearProvenanceEvents()
+ runner.clearTransferState()
+
+ // Set the failure property
+ logger.info("Setting fail when empty to true")
+ runner.setProperty(CryptographicHashContent.FAIL_WHEN_EMPTY, "true")
+
+ // Set the algorithm
+ logger.info("Setting hash algorithm to ${algorithm.name}")
+ runner.setProperty(CryptographicHashContent.HASH_ALGORITHM, algorithm.name)
+
+ // Insert the content in the mock flowfile
+ runner.enqueue(EMPTY_CONTENT.getBytes(StandardCharsets.UTF_8))
+
+ // Act
+ runner.run(1)
+
+ // Assert
+ runner.assertTransferCount(CryptographicHashContent.REL_FAILURE, 1)
+ runner.assertTransferCount(CryptographicHashContent.REL_SUCCESS, 0)
+
+ final List<MockFlowFile> failedFlowfiles = runner.getFlowFilesForRelationship(CryptographicHashContent.REL_FAILURE)
+
+ // Extract the generated attributes from the flowfile
+ MockFlowFile flowFile = failedFlowfiles.first()
+ String hashAttribute = "content_${algorithm.name}"
+ flowFile.assertAttributeNotExists(hashAttribute)
+ }
+ }
+}