You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/07/05 06:02:38 UTC

[nifi] branch master updated: NIFI-6387 RetryFlowFile

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 44b84a6  NIFI-6387 RetryFlowFile
44b84a6 is described below

commit 44b84a678f0c0c9769351e0418ec71ab42e966da
Author: Travis Neeley <52...@users.noreply.github.com>
AuthorDate: Wed Jul 3 22:11:26 2019 -0400

    NIFI-6387 RetryFlowFile
    
    NIFI-6387 RetryFlowFile
    
    NIFI-6387 Maximum Retries support FLOWFILE_ATTRIBUTES scope
    
    NIFI-6387 Fixed reuses descriptions for clarity
    
    This closes #3541.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../nifi/processors/standard/RetryFlowFile.java    | 314 +++++++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../processors/standard/TestRetryFlowFile.java     | 257 +++++++++++++++++
 3 files changed, 572 insertions(+)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RetryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RetryFlowFile.java
new file mode 100644
index 0000000..821906f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RetryFlowFile.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"Retry", "FlowFile"})
+@CapabilityDescription("FlowFiles passed to this Processor have a 'Retry Attribute' value checked against a " +
+        "configured 'Maximum Retries' value. If the current attribute value is below the configured maximum, the " +
+        "FlowFile is passed to a retry relationship. The FlowFile may or may not be penalized in that condition. " +
+        "If the FlowFile's attribute value exceeds the configured maximum, the FlowFile will be passed to a " +
+        "'retries_exceeded' relationship. WARNING: If the incoming FlowFile has a non-numeric value in the " +
+        "configured 'Retry Attribute' attribute, it will be reset to '1'. You may choose to fail the FlowFile " +
+        "instead of performing the reset. Additional dynamic properties can be defined for any attributes you " +
+        "wish to add to the FlowFiles transferred to 'retries_exceeded'. These attributes support attribute " +
+        "expression language.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SupportsBatching
+@SideEffectFree
+@ReadsAttribute(attribute = "Retry Attribute",
+        description = "Will read the attribute or attribute expression language result as defined in 'Retry Attribute'")
+@WritesAttributes({
+        @WritesAttribute(attribute = "Retry Attribute",
+                description = "User defined retry attribute is updated with the current retry count"),
+        @WritesAttribute(attribute = "Retry Attribute .uuid",
+                description = "User defined retry attribute with .uuid that determines what processor " +
+                        "retried the FlowFile last")
+})
+@DynamicProperty(name = "Exceeded FlowFile Attribute Key",
+        value = "The value of the attribute added to the FlowFile",
+        description = "One or more dynamic properties can be used to add attributes to FlowFiles passed to " +
+                "the 'retries_exceeded' relationship",
+        expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+public class RetryFlowFile extends AbstractProcessor {
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+    private String retryAttribute;
+    private Boolean penalizeRetried;
+    private Boolean failOnOverwrite;
+    private String reuseMode;
+    private String lastRetriedBy;
+
+    public static final PropertyDescriptor RETRY_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("retry-attribute")
+            .displayName("Retry Attribute")
+            .description("The name of the attribute that contains the current retry count for the FlowFile. " +
+                    "WARNING: If the name matches an attribute already on the FlowFile that does not contain a " +
+                    "numerical value, the processor will either overwrite that attribute with '1' or fail " +
+                    "based on configuration.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("flowfile.retries")
+            .build();
+    public static final PropertyDescriptor MAXIMUM_RETRIES = new PropertyDescriptor.Builder()
+            .name("maximum-retries")
+            .displayName("Maximum Retries")
+            .description("The maximum number of times a FlowFile can be retried before being " +
+                    "passed to the 'retries_exceeded' relationship")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(1, Integer.MAX_VALUE, true))
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue("3")
+            .build();
+    public static final PropertyDescriptor PENALIZE_RETRIED = new PropertyDescriptor.Builder()
+            .name("penalize-retries")
+            .displayName("Penalize Retries")
+            .description("If set to 'true', this Processor will penalize input FlowFiles before passing them " +
+                    "to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+    public static final PropertyDescriptor FAIL_ON_OVERWRITE = new PropertyDescriptor.Builder()
+            .name("Fail on Non-numerical Overwrite")
+            .description("If the FlowFile already has the attribute defined in 'Retry Attribute' that is " +
+                    "*not* a number, fail the FlowFile instead of resetting that value to '1'")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final AllowableValue FAIL_ON_REUSE = new AllowableValue(
+            "fail",
+            "Fail on Reuse",
+            "If the RetryFlowFile's UUID does not match the FlowFile's retry UUID, fail the FlowFile" +
+                    " regardless of current retry count"
+    );
+    public static final AllowableValue WARN_ON_REUSE = new AllowableValue(
+            "warn",
+            "Warn on Reuse",
+            "If the RetryFlowFile's UUID does not match the FlowFile's retry UUID, log a warning " +
+                    "message before resetting the retry attribute and UUID for this instance"
+    );
+    public static final AllowableValue RESET_ON_REUSE = new AllowableValue(
+            "reset",
+            "Reset Reuse",
+            "If the RetryFlowFile's UUID does not match the FlowFile's retry UUID, log a debug " +
+                    "message before resetting the retry attribute and UUID for this instance"
+    );
+    public static final PropertyDescriptor REUSE_MODE = new PropertyDescriptor.Builder()
+            .name("reuse-mode")
+            .displayName("Reuse Mode")
+            .description("Defines how the Processor behaves if the retry FlowFile has a different retry UUID than " +
+                    "the instance that received the FlowFile. This generally means that the attribute was not reset " +
+                    "after being successfully retried by a previous instance of this processor.")
+            .required(true)
+            .allowableValues(FAIL_ON_REUSE, WARN_ON_REUSE, RESET_ON_REUSE)
+            .defaultValue(FAIL_ON_REUSE.getValue())
+            .build();
+
+    public static final Relationship RETRY = new Relationship.Builder()
+            .name("retry")
+            .description("Input FlowFile has not exceeded the configured maximum retry count, pass this " +
+                    "relationship back to the input Processor to create a limited feedback loop.")
+            .build();
+    public static final Relationship RETRIES_EXCEEDED = new Relationship.Builder()
+            .name("retries_exceeded")
+            .description("Input FlowFile has exceeded the configured maximum retry count, do not pass this " +
+                    "relationship back to the input Processor to terminate the limited feedback loop.")
+            .build();
+    public static final Relationship FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("The processor is configured such that a non-numerical value on 'Retry Attribute' " +
+                    "results in a failure instead of resetting that value to '1'. This will immediately " +
+                    "terminate the limited feedback loop. Might also include when 'Maximum Retries' contains " +
+                    "attribute expression language that does not resolve to an Integer.")
+            .autoTerminateDefault(true)
+            .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(RETRY_ATTRIBUTE);
+        props.add(MAXIMUM_RETRIES);
+        props.add(PENALIZE_RETRIED);
+        props.add(FAIL_ON_OVERWRITE);
+        props.add(REUSE_MODE);
+        this.properties = Collections.unmodifiableList(props);
+
+        Set<Relationship> rels = new HashSet<>();
+        rels.add(RETRY);
+        rels.add(RETRIES_EXCEEDED);
+        rels.add(FAILURE);
+        this.relationships = Collections.unmodifiableSet(rels);
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .description("Attribute " + propertyDescriptorName + " will be placed on FlowFiles " +
+                        "exceeding the retry count")
+                .required(false)
+                .dynamic(true)
+                .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .build();
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @OnScheduled
+    @SuppressWarnings("unused")
+    public void onScheduled(final ProcessContext context) {
+        retryAttribute = context.getProperty(RETRY_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+        penalizeRetried = context.getProperty(PENALIZE_RETRIED).asBoolean();
+        failOnOverwrite = context.getProperty(FAIL_ON_OVERWRITE).asBoolean();
+        reuseMode = context.getProperty(REUSE_MODE).getValue();
+        lastRetriedBy = retryAttribute.concat(".uuid");
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowfile = session.get();
+        if (null == flowfile)
+            return;
+
+        String retryAttributeValue = flowfile.getAttribute(retryAttribute);
+        Integer currentRetry;
+        try {
+            currentRetry = (null == retryAttributeValue)
+                    ? 1
+                    : Integer.valueOf(retryAttributeValue.trim()) + 1;
+        } catch (NumberFormatException ex) {
+            // Configured to fail if this was not a number
+            if (failOnOverwrite) {
+                session.transfer(flowfile, FAILURE);
+                return;
+            }
+            // reset to '1' if that wasn't the case
+            currentRetry = 1;
+        }
+
+        String lastRetriedByUUID = flowfile.getAttribute(lastRetriedBy);
+        String currentInstanceUUID = getIdentifier();
+        if (!StringUtils.isBlank(lastRetriedByUUID) && !currentInstanceUUID.equals(lastRetriedByUUID)) {
+            LogLevel reuseLogLevel = LogLevel.DEBUG;
+            switch (reuseMode) {
+                case "fail":
+                    getLogger().error("FlowFile {} was previously retried with the same attribute by a " +
+                            "different processor. Route to 'failure'", new Object[]{flowfile});
+                    getLogger().debug("Current Processor: {}, Previous Processor: {}, Previous Retry: {}",
+                            new Object[]{currentInstanceUUID, lastRetriedByUUID, currentRetry - 1});
+                    session.transfer(flowfile, FAILURE);
+                    return;
+                case "warn":
+                    reuseLogLevel = LogLevel.WARN;
+                case "reset":
+                    getLogger().log(reuseLogLevel, "FlowFile {} was previously retried with the same attribute " +
+                                    "by a different processor. Reset the current retry count to '1'. Consider " +
+                                    "changing the retry attribute for this processor.",
+                            new Object[]{flowfile});
+                    getLogger().debug("Current Processor: {}, Previous Processor: {}, Previous Retry: {}",
+                            new Object[]{currentInstanceUUID, lastRetriedByUUID, currentRetry});
+                    currentRetry = 1;
+                    break;
+            }
+        }
+
+        Integer maximumRetries;
+        try {
+            maximumRetries = context.getProperty(MAXIMUM_RETRIES)
+                    .evaluateAttributeExpressions(flowfile)
+                    .asInteger();
+
+            if (null == maximumRetries) {
+                getLogger().warn("Could not obtain maximum retries off of FlowFile, route to 'failure'");
+                session.transfer(flowfile, FAILURE);
+                return;
+            }
+        } catch (NumberFormatException ex) {
+            getLogger().warn("Maximum Retries was not a number for this FlowFile, route to 'failure'");
+            session.transfer(flowfile, FAILURE);
+            return;
+        }
+
+        if (currentRetry > maximumRetries) {
+            // Add dynamic properties
+            for (PropertyDescriptor descriptor : context.getProperties().keySet()) {
+                if (!descriptor.isDynamic())
+                    continue;
+
+                String value = context.getProperty(descriptor)
+                        .evaluateAttributeExpressions(flowfile)
+                        .getValue();
+                if (!StringUtils.isBlank(value))
+                    flowfile = session.putAttribute(flowfile, descriptor.getName(), value);
+            }
+
+            flowfile = session.removeAttribute(flowfile, retryAttribute);
+            flowfile = session.removeAttribute(flowfile, lastRetriedBy);
+            session.transfer(flowfile, RETRIES_EXCEEDED);
+        } else {
+            if (penalizeRetried)
+                session.penalize(flowfile);
+
+            // Update and transfer
+            flowfile = session.putAttribute(flowfile, retryAttribute, String.valueOf(currentRetry));
+            flowfile = session.putAttribute(flowfile, lastRetriedBy, getIdentifier());
+            session.transfer(flowfile, RETRY);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index bfe1403..37bf6c2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -101,6 +101,7 @@ org.apache.nifi.processors.standard.QueryDatabaseTableRecord
 org.apache.nifi.processors.standard.QueryRecord
 org.apache.nifi.processors.standard.ReplaceText
 org.apache.nifi.processors.standard.ReplaceTextWithMapping
+org.apache.nifi.processors.standard.RetryFlowFile
 org.apache.nifi.processors.standard.RouteOnAttribute
 org.apache.nifi.processors.standard.RouteOnContent
 org.apache.nifi.processors.standard.RouteText
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRetryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRetryFlowFile.java
new file mode 100644
index 0000000..227e685
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRetryFlowFile.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestRetryFlowFile {
+    TestRunner runner;
+
+    @Before
+    public void before() {
+        runner = TestRunners.newTestRunner(new RetryFlowFile());
+    }
+
+    @After
+    public void after() {
+        runner.shutdown();
+    }
+
+    @Test
+    public void testNoRetryAttribute() {
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+        runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+        runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+        runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+            mff.assertAttributeExists("flowfile.retries");
+            mff.assertAttributeExists("flowfile.retries.uuid");
+            mff.assertAttributeEquals("flowfile.retries", "1");
+            return true;
+        });
+    }
+
+    @Test
+    public void testRetryPenalize() {
+        runner.enqueue("", Collections.singletonMap("flowfile.retries", "2"));
+        runner.run();
+
+        runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+        runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+        runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+        runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+            mff.assertAttributeExists("flowfile.retries");
+            mff.assertAttributeExists("flowfile.retries.uuid");
+            mff.assertAttributeEquals("flowfile.retries", "3");
+            Assert.assertTrue("FlowFile was not penalized!", mff.isPenalized());
+            return true;
+        });
+    }
+
+    @Test
+    public void testRetryClustered() {
+        runner.setClustered(true);
+        runner.setThreadCount(5);
+        for (int i = 0; i < 5; i++) {
+            runner.enqueue("", Collections.singletonMap("flowfile.retries", "2"));
+        }
+        runner.run(5);
+
+        runner.assertTransferCount(RetryFlowFile.RETRY, 5);
+        runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+        runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+        runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+            mff.assertAttributeExists("flowfile.retries");
+            mff.assertAttributeExists("flowfile.retries.uuid");
+            mff.assertAttributeEquals("flowfile.retries", "3");
+            Assert.assertTrue("FlowFile was not penalized!", mff.isPenalized());
+            return true;
+        });
+    }
+
+    @Test
+    public void testRetryNoPenalize() {
+        runner.setProperty(RetryFlowFile.PENALIZE_RETRIED, "false");
+        runner.enqueue("", Collections.singletonMap("flowfile.retries", "2"));
+        runner.run();
+
+        runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+        runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+        runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+        runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+            mff.assertAttributeExists("flowfile.retries.uuid");
+            mff.assertAttributeExists("flowfile.retries");
+            mff.assertAttributeEquals("flowfile.retries", "3");
+            Assert.assertFalse("FlowFile was not penalized!", mff.isPenalized());
+            return true;
+        });
+    }
+
+    @Test
+    public void testNoFailOnOverwrite() {
+        runner.enqueue("", Collections.singletonMap("flowfile.retries", "ZZAaa"));
+        runner.run();
+
+        runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+        runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+        runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+        runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+            mff.assertAttributeExists("flowfile.retries.uuid");
+            mff.assertAttributeExists("flowfile.retries");
+            mff.assertAttributeEquals("flowfile.retries", "1");
+            Assert.assertTrue("FlowFile was not penalized!", mff.isPenalized());
+            return true;
+        });
+    }
+
+    @Test
+    public void testFailOnOverwrite() {
+        runner.setProperty(RetryFlowFile.FAIL_ON_OVERWRITE, "true");
+        runner.enqueue("", Collections.singletonMap("flowfile.retries", "ZZAaa"));
+        runner.run();
+
+        runner.assertTransferCount(RetryFlowFile.RETRY, 0);
+        runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+        runner.assertTransferCount(RetryFlowFile.FAILURE, 1);
+    }
+
+    @Test
+    public void testRetriesExceeded() {
+        runner.setProperty("exceeded.time", "${now():toString()}");
+        runner.setProperty("reason", "${uuid} exceeded retries");
+        runner.enqueue("", Collections.singletonMap("flowfile.retries", "3"));
+        runner.run();
+
+        runner.assertTransferCount(RetryFlowFile.RETRY, 0);
+        runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 1);
+        runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+        runner.assertAllConditionsMet(RetryFlowFile.RETRIES_EXCEEDED, mff -> {
+            mff.assertAttributeExists("exceeded.time");
+            mff.assertAttributeExists("reason");
+            Assert.assertFalse("Expression language not evaluated!",
+                    mff.getAttribute("reason").contains("${uuid}"));
+            return true;
+        });
+    }
+
+    @Test
+    public void testReuseFail() {
+        runner.setProperty(RetryFlowFile.REUSE_MODE, RetryFlowFile.FAIL_ON_REUSE.getValue());
+        Map<String, String> inputAttributes = new HashMap<>();
+        inputAttributes.put("flowfile.retries", "2");
+        inputAttributes.put("flowfile.retries.uuid", "1122334455");
+        runner.enqueue("", inputAttributes);
+        runner.run();
+
+        runner.assertTransferCount(RetryFlowFile.RETRY, 0);
+        runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+        runner.assertTransferCount(RetryFlowFile.FAILURE, 1);
+    }
+
+    @Test
+    public void testReuseWarn() {
+        runner.setProperty(RetryFlowFile.REUSE_MODE, RetryFlowFile.WARN_ON_REUSE.getValue());
+        Map<String, String> inputAttributes = new HashMap<>();
+        inputAttributes.put("flowfile.retries", "2");
+        inputAttributes.put("flowfile.retries.uuid", "1122334455");
+        runner.enqueue("", inputAttributes);
+        runner.run();
+
+        runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+        runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+        runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+        runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+            mff.assertAttributeExists("flowfile.retries");
+            mff.assertAttributeEquals("flowfile.retries", "1");
+            return true;
+        });
+    }
+
+    @Test
+    public void testReuseReset() {
+        runner.setProperty(RetryFlowFile.REUSE_MODE, RetryFlowFile.RESET_ON_REUSE.getValue());
+        Map<String, String> inputAttributes = new HashMap<>();
+        inputAttributes.put("flowfile.retries", "2");
+        inputAttributes.put("flowfile.retries.uuid", "1122334455");
+        runner.enqueue("", inputAttributes);
+        runner.run();
+
+        runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+        runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+        runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+        runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+            mff.assertAttributeExists("flowfile.retries");
+            mff.assertAttributeEquals("flowfile.retries", "1");
+            return true;
+        });
+    }
+
+    @Test
+    public void testAlternativeAttributeMaxRetries() {
+        runner.setProperty(RetryFlowFile.MAXIMUM_RETRIES, "${retry.max}");
+        Map<String, String> attributeMap = new HashMap<>();
+        attributeMap.put("retry.max", "3");
+        attributeMap.put("flowfile.retries", "2");
+        runner.enqueue("", attributeMap);
+        runner.run();
+
+        runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+        runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+        runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+        runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+            mff.assertAttributeExists("flowfile.retries");
+            mff.assertAttributeExists("flowfile.retries.uuid");
+            mff.assertAttributeEquals("flowfile.retries", "3");
+            Assert.assertTrue("FlowFile was not penalized!", mff.isPenalized());
+            return true;
+        });
+    }
+
+    @Test
+    public void testInvalidAlternativeAttributeMaxRetries() {
+        runner.setProperty(RetryFlowFile.MAXIMUM_RETRIES, "${retry.max}");
+        Map<String, String> attributeMap = new HashMap<>();
+        attributeMap.put("retry.max", "NiFi");
+        attributeMap.put("flowfile.retries", "2");
+        runner.enqueue("", attributeMap);
+        runner.run();
+
+        runner.assertTransferCount(RetryFlowFile.RETRY, 0);
+        runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+        runner.assertTransferCount(RetryFlowFile.FAILURE, 1);
+    }
+}