You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/07/05 06:02:38 UTC
[nifi] branch master updated: NIFI-6387 RetryFlowFile
This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 44b84a6 NIFI-6387 RetryFlowFile
44b84a6 is described below
commit 44b84a678f0c0c9769351e0418ec71ab42e966da
Author: Travis Neeley <52...@users.noreply.github.com>
AuthorDate: Wed Jul 3 22:11:26 2019 -0400
NIFI-6387 RetryFlowFile
NIFI-6387 RetryFlowFile
NIFI-6387 Maximum Retries support FLOWFILE_ATTRIBUTES scope
NIFI-6387 Fixed reuses descriptions for clarity
This closes #3541.
Signed-off-by: Koji Kawamura <ij...@apache.org>
---
.../nifi/processors/standard/RetryFlowFile.java | 314 +++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../processors/standard/TestRetryFlowFile.java | 257 +++++++++++++++++
3 files changed, 572 insertions(+)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RetryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RetryFlowFile.java
new file mode 100644
index 0000000..821906f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RetryFlowFile.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Tags({"Retry", "FlowFile"})
+@CapabilityDescription("FlowFiles passed to this Processor have a 'Retry Attribute' value checked against a " +
+ "configured 'Maximum Retries' value. If the current attribute value is below the configured maximum, the " +
+ "FlowFile is passed to a retry relationship. The FlowFile may or may not be penalized in that condition. " +
+ "If the FlowFile's attribute value exceeds the configured maximum, the FlowFile will be passed to a " +
+ "'retries_exceeded' relationship. WARNING: If the incoming FlowFile has a non-numeric value in the " +
+ "configured 'Retry Attribute' attribute, it will be reset to '1'. You may choose to fail the FlowFile " +
+ "instead of performing the reset. Additional dynamic properties can be defined for any attributes you " +
+ "wish to add to the FlowFiles transferred to 'retries_exceeded'. These attributes support attribute " +
+ "expression language.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@SupportsBatching
+@SideEffectFree
+@ReadsAttribute(attribute = "Retry Attribute",
+ description = "Will read the attribute or attribute expression language result as defined in 'Retry Attribute'")
+@WritesAttributes({
+ @WritesAttribute(attribute = "Retry Attribute",
+ description = "User defined retry attribute is updated with the current retry count"),
+ @WritesAttribute(attribute = "Retry Attribute .uuid",
+ description = "User defined retry attribute with .uuid that determines what processor " +
+ "retried the FlowFile last")
+})
+@DynamicProperty(name = "Exceeded FlowFile Attribute Key",
+ value = "The value of the attribute added to the FlowFile",
+ description = "One or more dynamic properties can be used to add attributes to FlowFiles passed to " +
+ "the 'retries_exceeded' relationship",
+ expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+public class RetryFlowFile extends AbstractProcessor {
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
+ private String retryAttribute;
+ private Boolean penalizeRetried;
+ private Boolean failOnOverwrite;
+ private String reuseMode;
+ private String lastRetriedBy;
+
+ public static final PropertyDescriptor RETRY_ATTRIBUTE = new PropertyDescriptor.Builder()
+ .name("retry-attribute")
+ .displayName("Retry Attribute")
+ .description("The name of the attribute that contains the current retry count for the FlowFile. " +
+ "WARNING: If the name matches an attribute already on the FlowFile that does not contain a " +
+ "numerical value, the processor will either overwrite that attribute with '1' or fail " +
+ "based on configuration.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("flowfile.retries")
+ .build();
+ public static final PropertyDescriptor MAXIMUM_RETRIES = new PropertyDescriptor.Builder()
+ .name("maximum-retries")
+ .displayName("Maximum Retries")
+ .description("The maximum number of times a FlowFile can be retried before being " +
+ "passed to the 'retries_exceeded' relationship")
+ .required(true)
+ .addValidator(StandardValidators.createLongValidator(1, Integer.MAX_VALUE, true))
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue("3")
+ .build();
+ public static final PropertyDescriptor PENALIZE_RETRIED = new PropertyDescriptor.Builder()
+ .name("penalize-retries")
+ .displayName("Penalize Retries")
+ .description("If set to 'true', this Processor will penalize input FlowFiles before passing them " +
+ "to the 'retry' relationship. This does not apply to the 'retries_exceeded' relationship.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+ public static final PropertyDescriptor FAIL_ON_OVERWRITE = new PropertyDescriptor.Builder()
+ .name("Fail on Non-numerical Overwrite")
+ .description("If the FlowFile already has the attribute defined in 'Retry Attribute' that is " +
+ "*not* a number, fail the FlowFile instead of resetting that value to '1'")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ public static final AllowableValue FAIL_ON_REUSE = new AllowableValue(
+ "fail",
+ "Fail on Reuse",
+ "If the RetryFlowFile's UUID does not match the FlowFile's retry UUID, fail the FlowFile" +
+ " regardless of current retry count"
+ );
+ public static final AllowableValue WARN_ON_REUSE = new AllowableValue(
+ "warn",
+ "Warn on Reuse",
+ "If the RetryFlowFile's UUID does not match the FlowFile's retry UUID, log a warning " +
+ "message before resetting the retry attribute and UUID for this instance"
+ );
+ public static final AllowableValue RESET_ON_REUSE = new AllowableValue(
+ "reset",
+ "Reset Reuse",
+ "If the RetryFlowFile's UUID does not match the FlowFile's retry UUID, log a debug " +
+ "message before resetting the retry attribute and UUID for this instance"
+ );
+ public static final PropertyDescriptor REUSE_MODE = new PropertyDescriptor.Builder()
+ .name("reuse-mode")
+ .displayName("Reuse Mode")
+ .description("Defines how the Processor behaves if the retry FlowFile has a different retry UUID than " +
+ "the instance that received the FlowFile. This generally means that the attribute was not reset " +
+ "after being successfully retried by a previous instance of this processor.")
+ .required(true)
+ .allowableValues(FAIL_ON_REUSE, WARN_ON_REUSE, RESET_ON_REUSE)
+ .defaultValue(FAIL_ON_REUSE.getValue())
+ .build();
+
+ public static final Relationship RETRY = new Relationship.Builder()
+ .name("retry")
+ .description("Input FlowFile has not exceeded the configured maximum retry count, pass this " +
+ "relationship back to the input Processor to create a limited feedback loop.")
+ .build();
+ public static final Relationship RETRIES_EXCEEDED = new Relationship.Builder()
+ .name("retries_exceeded")
+ .description("Input FlowFile has exceeded the configured maximum retry count, do not pass this " +
+ "relationship back to the input Processor to terminate the limited feedback loop.")
+ .build();
+ public static final Relationship FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("The processor is configured such that a non-numerical value on 'Retry Attribute' " +
+ "results in a failure instead of resetting that value to '1'. This will immediately " +
+ "terminate the limited feedback loop. Might also include when 'Maximum Retries' contains " +
+ "attribute expression language that does not resolve to an Integer.")
+ .autoTerminateDefault(true)
+ .build();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected void init(ProcessorInitializationContext context) {
+ List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(RETRY_ATTRIBUTE);
+ props.add(MAXIMUM_RETRIES);
+ props.add(PENALIZE_RETRIED);
+ props.add(FAIL_ON_OVERWRITE);
+ props.add(REUSE_MODE);
+ this.properties = Collections.unmodifiableList(props);
+
+ Set<Relationship> rels = new HashSet<>();
+ rels.add(RETRY);
+ rels.add(RETRIES_EXCEEDED);
+ rels.add(FAILURE);
+ this.relationships = Collections.unmodifiableSet(rels);
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .description("Attribute " + propertyDescriptorName + " will be placed on FlowFiles " +
+ "exceeding the retry count")
+ .required(false)
+ .dynamic(true)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @OnScheduled
+ @SuppressWarnings("unused")
+ public void onScheduled(final ProcessContext context) {
+ retryAttribute = context.getProperty(RETRY_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+ penalizeRetried = context.getProperty(PENALIZE_RETRIED).asBoolean();
+ failOnOverwrite = context.getProperty(FAIL_ON_OVERWRITE).asBoolean();
+ reuseMode = context.getProperty(REUSE_MODE).getValue();
+ lastRetriedBy = retryAttribute.concat(".uuid");
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowfile = session.get();
+ if (null == flowfile)
+ return;
+
+ String retryAttributeValue = flowfile.getAttribute(retryAttribute);
+ Integer currentRetry;
+ try {
+ currentRetry = (null == retryAttributeValue)
+ ? 1
+ : Integer.valueOf(retryAttributeValue.trim()) + 1;
+ } catch (NumberFormatException ex) {
+ // Configured to fail if this was not a number
+ if (failOnOverwrite) {
+ session.transfer(flowfile, FAILURE);
+ return;
+ }
+ // reset to '1' if that wasn't the case
+ currentRetry = 1;
+ }
+
+ String lastRetriedByUUID = flowfile.getAttribute(lastRetriedBy);
+ String currentInstanceUUID = getIdentifier();
+ if (!StringUtils.isBlank(lastRetriedByUUID) && !currentInstanceUUID.equals(lastRetriedByUUID)) {
+ LogLevel reuseLogLevel = LogLevel.DEBUG;
+ switch (reuseMode) {
+ case "fail":
+ getLogger().error("FlowFile {} was previously retried with the same attribute by a " +
+ "different processor. Route to 'failure'", new Object[]{flowfile});
+ getLogger().debug("Current Processor: {}, Previous Processor: {}, Previous Retry: {}",
+ new Object[]{currentInstanceUUID, lastRetriedByUUID, currentRetry - 1});
+ session.transfer(flowfile, FAILURE);
+ return;
+ case "warn":
+ reuseLogLevel = LogLevel.WARN;
+ case "reset":
+ getLogger().log(reuseLogLevel, "FlowFile {} was previously retried with the same attribute " +
+ "by a different processor. Reset the current retry count to '1'. Consider " +
+ "changing the retry attribute for this processor.",
+ new Object[]{flowfile});
+ getLogger().debug("Current Processor: {}, Previous Processor: {}, Previous Retry: {}",
+ new Object[]{currentInstanceUUID, lastRetriedByUUID, currentRetry});
+ currentRetry = 1;
+ break;
+ }
+ }
+
+ Integer maximumRetries;
+ try {
+ maximumRetries = context.getProperty(MAXIMUM_RETRIES)
+ .evaluateAttributeExpressions(flowfile)
+ .asInteger();
+
+ if (null == maximumRetries) {
+ getLogger().warn("Could not obtain maximum retries off of FlowFile, route to 'failure'");
+ session.transfer(flowfile, FAILURE);
+ return;
+ }
+ } catch (NumberFormatException ex) {
+ getLogger().warn("Maximum Retries was not a number for this FlowFile, route to 'failure'");
+ session.transfer(flowfile, FAILURE);
+ return;
+ }
+
+ if (currentRetry > maximumRetries) {
+ // Add dynamic properties
+ for (PropertyDescriptor descriptor : context.getProperties().keySet()) {
+ if (!descriptor.isDynamic())
+ continue;
+
+ String value = context.getProperty(descriptor)
+ .evaluateAttributeExpressions(flowfile)
+ .getValue();
+ if (!StringUtils.isBlank(value))
+ flowfile = session.putAttribute(flowfile, descriptor.getName(), value);
+ }
+
+ flowfile = session.removeAttribute(flowfile, retryAttribute);
+ flowfile = session.removeAttribute(flowfile, lastRetriedBy);
+ session.transfer(flowfile, RETRIES_EXCEEDED);
+ } else {
+ if (penalizeRetried)
+ session.penalize(flowfile);
+
+ // Update and transfer
+ flowfile = session.putAttribute(flowfile, retryAttribute, String.valueOf(currentRetry));
+ flowfile = session.putAttribute(flowfile, lastRetriedBy, getIdentifier());
+ session.transfer(flowfile, RETRY);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index bfe1403..37bf6c2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -101,6 +101,7 @@ org.apache.nifi.processors.standard.QueryDatabaseTableRecord
org.apache.nifi.processors.standard.QueryRecord
org.apache.nifi.processors.standard.ReplaceText
org.apache.nifi.processors.standard.ReplaceTextWithMapping
+org.apache.nifi.processors.standard.RetryFlowFile
org.apache.nifi.processors.standard.RouteOnAttribute
org.apache.nifi.processors.standard.RouteOnContent
org.apache.nifi.processors.standard.RouteText
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRetryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRetryFlowFile.java
new file mode 100644
index 0000000..227e685
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRetryFlowFile.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestRetryFlowFile {
+ TestRunner runner;
+
+ @Before
+ public void before() {
+ runner = TestRunners.newTestRunner(new RetryFlowFile());
+ }
+
+ @After
+ public void after() {
+ runner.shutdown();
+ }
+
+ @Test
+ public void testNoRetryAttribute() {
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+ runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+ runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+ runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+ mff.assertAttributeExists("flowfile.retries");
+ mff.assertAttributeExists("flowfile.retries.uuid");
+ mff.assertAttributeEquals("flowfile.retries", "1");
+ return true;
+ });
+ }
+
+ @Test
+ public void testRetryPenalize() {
+ runner.enqueue("", Collections.singletonMap("flowfile.retries", "2"));
+ runner.run();
+
+ runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+ runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+ runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+ runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+ mff.assertAttributeExists("flowfile.retries");
+ mff.assertAttributeExists("flowfile.retries.uuid");
+ mff.assertAttributeEquals("flowfile.retries", "3");
+ Assert.assertTrue("FlowFile was not penalized!", mff.isPenalized());
+ return true;
+ });
+ }
+
+ @Test
+ public void testRetryClustered() {
+ runner.setClustered(true);
+ runner.setThreadCount(5);
+ for (int i = 0; i < 5; i++) {
+ runner.enqueue("", Collections.singletonMap("flowfile.retries", "2"));
+ }
+ runner.run(5);
+
+ runner.assertTransferCount(RetryFlowFile.RETRY, 5);
+ runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+ runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+ runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+ mff.assertAttributeExists("flowfile.retries");
+ mff.assertAttributeExists("flowfile.retries.uuid");
+ mff.assertAttributeEquals("flowfile.retries", "3");
+ Assert.assertTrue("FlowFile was not penalized!", mff.isPenalized());
+ return true;
+ });
+ }
+
+ @Test
+ public void testRetryNoPenalize() {
+ runner.setProperty(RetryFlowFile.PENALIZE_RETRIED, "false");
+ runner.enqueue("", Collections.singletonMap("flowfile.retries", "2"));
+ runner.run();
+
+ runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+ runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+ runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+ runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+ mff.assertAttributeExists("flowfile.retries.uuid");
+ mff.assertAttributeExists("flowfile.retries");
+ mff.assertAttributeEquals("flowfile.retries", "3");
+ Assert.assertFalse("FlowFile was not penalized!", mff.isPenalized());
+ return true;
+ });
+ }
+
+ @Test
+ public void testNoFailOnOverwrite() {
+ runner.enqueue("", Collections.singletonMap("flowfile.retries", "ZZAaa"));
+ runner.run();
+
+ runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+ runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+ runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+ runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+ mff.assertAttributeExists("flowfile.retries.uuid");
+ mff.assertAttributeExists("flowfile.retries");
+ mff.assertAttributeEquals("flowfile.retries", "1");
+ Assert.assertTrue("FlowFile was not penalized!", mff.isPenalized());
+ return true;
+ });
+ }
+
+ @Test
+ public void testFailOnOverwrite() {
+ runner.setProperty(RetryFlowFile.FAIL_ON_OVERWRITE, "true");
+ runner.enqueue("", Collections.singletonMap("flowfile.retries", "ZZAaa"));
+ runner.run();
+
+ runner.assertTransferCount(RetryFlowFile.RETRY, 0);
+ runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+ runner.assertTransferCount(RetryFlowFile.FAILURE, 1);
+ }
+
+ @Test
+ public void testRetriesExceeded() {
+ runner.setProperty("exceeded.time", "${now():toString()}");
+ runner.setProperty("reason", "${uuid} exceeded retries");
+ runner.enqueue("", Collections.singletonMap("flowfile.retries", "3"));
+ runner.run();
+
+ runner.assertTransferCount(RetryFlowFile.RETRY, 0);
+ runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 1);
+ runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+ runner.assertAllConditionsMet(RetryFlowFile.RETRIES_EXCEEDED, mff -> {
+ mff.assertAttributeExists("exceeded.time");
+ mff.assertAttributeExists("reason");
+ Assert.assertFalse("Expression language not evaluated!",
+ mff.getAttribute("reason").contains("${uuid}"));
+ return true;
+ });
+ }
+
+ @Test
+ public void testReuseFail() {
+ runner.setProperty(RetryFlowFile.REUSE_MODE, RetryFlowFile.FAIL_ON_REUSE.getValue());
+ Map<String, String> inputAttributes = new HashMap<>();
+ inputAttributes.put("flowfile.retries", "2");
+ inputAttributes.put("flowfile.retries.uuid", "1122334455");
+ runner.enqueue("", inputAttributes);
+ runner.run();
+
+ runner.assertTransferCount(RetryFlowFile.RETRY, 0);
+ runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+ runner.assertTransferCount(RetryFlowFile.FAILURE, 1);
+ }
+
+ @Test
+ public void testReuseWarn() {
+ runner.setProperty(RetryFlowFile.REUSE_MODE, RetryFlowFile.WARN_ON_REUSE.getValue());
+ Map<String, String> inputAttributes = new HashMap<>();
+ inputAttributes.put("flowfile.retries", "2");
+ inputAttributes.put("flowfile.retries.uuid", "1122334455");
+ runner.enqueue("", inputAttributes);
+ runner.run();
+
+ runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+ runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+ runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+ runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+ mff.assertAttributeExists("flowfile.retries");
+ mff.assertAttributeEquals("flowfile.retries", "1");
+ return true;
+ });
+ }
+
+ @Test
+ public void testReuseReset() {
+ runner.setProperty(RetryFlowFile.REUSE_MODE, RetryFlowFile.RESET_ON_REUSE.getValue());
+ Map<String, String> inputAttributes = new HashMap<>();
+ inputAttributes.put("flowfile.retries", "2");
+ inputAttributes.put("flowfile.retries.uuid", "1122334455");
+ runner.enqueue("", inputAttributes);
+ runner.run();
+
+ runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+ runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+ runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+ runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+ mff.assertAttributeExists("flowfile.retries");
+ mff.assertAttributeEquals("flowfile.retries", "1");
+ return true;
+ });
+ }
+
+ @Test
+ public void testAlternativeAttributeMaxRetries() {
+ runner.setProperty(RetryFlowFile.MAXIMUM_RETRIES, "${retry.max}");
+ Map<String, String> attributeMap = new HashMap<>();
+ attributeMap.put("retry.max", "3");
+ attributeMap.put("flowfile.retries", "2");
+ runner.enqueue("", attributeMap);
+ runner.run();
+
+ runner.assertTransferCount(RetryFlowFile.RETRY, 1);
+ runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+ runner.assertTransferCount(RetryFlowFile.FAILURE, 0);
+
+ runner.assertAllConditionsMet(RetryFlowFile.RETRY, mff -> {
+ mff.assertAttributeExists("flowfile.retries");
+ mff.assertAttributeExists("flowfile.retries.uuid");
+ mff.assertAttributeEquals("flowfile.retries", "3");
+ Assert.assertTrue("FlowFile was not penalized!", mff.isPenalized());
+ return true;
+ });
+ }
+
+ @Test
+ public void testInvalidAlternativeAttributeMaxRetries() {
+ runner.setProperty(RetryFlowFile.MAXIMUM_RETRIES, "${retry.max}");
+ Map<String, String> attributeMap = new HashMap<>();
+ attributeMap.put("retry.max", "NiFi");
+ attributeMap.put("flowfile.retries", "2");
+ runner.enqueue("", attributeMap);
+ runner.run();
+
+ runner.assertTransferCount(RetryFlowFile.RETRY, 0);
+ runner.assertTransferCount(RetryFlowFile.RETRIES_EXCEEDED, 0);
+ runner.assertTransferCount(RetryFlowFile.FAILURE, 1);
+ }
+}