You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by br...@apache.org on 2016/12/29 20:51:48 UTC

nifi git commit: NIFI-1582 Fixing how failures with state are handled and improving flowfile handling

Repository: nifi
Updated Branches:
  refs/heads/master 4986b83b8 -> 8acbe9aa3


NIFI-1582 Fixing how failures with state are handled and improving flowfile handling

This closes #1371

Signed-off-by: Bryan Rosander <br...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8acbe9aa
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8acbe9aa
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8acbe9aa

Branch: refs/heads/master
Commit: 8acbe9aa3f72c4da3cc5d19f9827718b2ce8cc56
Parents: 4986b83
Author: jpercivall <JP...@apache.org>
Authored: Thu Dec 29 13:45:47 2016 -0500
Committer: Bryan Rosander <br...@apache.org>
Committed: Thu Dec 29 15:49:52 2016 -0500

----------------------------------------------------------------------
 .../processors/attributes/UpdateAttribute.java  | 130 +++++++++-------
 .../additionalDetails.html                      | 123 ++++++++++-----
 .../update/attributes/TestUpdateAttribute.java  | 151 ++++++++++++++++++-
 3 files changed, 312 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8acbe9aa/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
index 4dee379..699c8e2 100644
--- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
+++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,7 +45,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
@@ -86,10 +86,9 @@ import org.apache.nifi.update.attributes.serde.CriteriaSerDe;
 public class UpdateAttribute extends AbstractProcessor implements Searchable {
 
 
-    public static final String DO_NOT_STORE_STATE = "do not store state";
-    public static final String STORE_STATE_LOCALLY = "store state locally";
+    public static final String DO_NOT_STORE_STATE = "Do not store state";
+    public static final String STORE_STATE_LOCALLY = "Store state locally";
 
-    private boolean stateful = false;
     private final AtomicReference<Criteria> criteriaCache = new AtomicReference<>(null);
     private final ConcurrentMap<String, PropertyValue> propertyValues = new ConcurrentHashMap<>();
 
@@ -179,6 +178,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
 
     private volatile Map<String, Action> defaultActions;
     private volatile boolean debugEnabled;
+    private volatile boolean stateful = false;
 
 
     public UpdateAttribute() {
@@ -275,11 +275,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
         debugEnabled = getLogger().isDebugEnabled();
     }
 
-    @OnStopped
-    public void onStopped() {
-        defaultActions = null;
-    }
-
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext context) {
         final List<ValidationResult> reasons = new ArrayList<>(super.customValidate(context));
@@ -411,8 +406,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
         final ComponentLog logger = getLogger();
         final Criteria criteria = criteriaCache.get();
 
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
+        FlowFile incomingFlowFile = session.get();
+        if (incomingFlowFile == null) {
             return;
         }
 
@@ -424,59 +419,102 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
         // because is the original flowfile is used for all matching rules. in this
         // case the order of the matching rules is preserved in the list
         final Map<FlowFile, List<Rule>> matchedRules = new HashMap<>();
-        Map<String, String> statefulAttributes = null;
 
-        matchedRules.clear();
+        final Map<String, String> stateInitialAttributes;
+        final Map<String, String> stateWorkingAttributes;
+        StateMap stateMap = null;
 
         try {
             if (stateful) {
-                statefulAttributes = new HashMap<>(context.getStateManager().getState(Scope.LOCAL).toMap());
+                stateMap = context.getStateManager().getState(Scope.LOCAL);
+                stateInitialAttributes = stateMap.toMap();
+                stateWorkingAttributes = new  HashMap<>(stateMap.toMap());
             } else {
-                statefulAttributes = null;
+                stateInitialAttributes = null;
+                stateWorkingAttributes = null;
             }
         } catch (IOException e) {
-            logger.error("Failed to update attributes for {} due to failing to get state; transferring FlowFile back to '{}'", new Object[]{flowFile, Relationship.SELF.getName()}, e);
-            session.transfer(flowFile);
+            logger.error("Failed to get the initial state when processing {}; transferring FlowFile back to its incoming queue", new Object[]{incomingFlowFile}, e);
+            session.transfer(incomingFlowFile);
             context.yield();
             return;
         }
 
         Map<String, Action> defaultActions = this.defaultActions;
+        List<FlowFile> flowFilesToTransfer = new LinkedList<>();
 
         // if there is update criteria specified, evaluate it
-        if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules, statefulAttributes)) {
+        if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) {
             // apply the actions for each rule and transfer the flowfile
             for (final Map.Entry<FlowFile, List<Rule>> entry : matchedRules.entrySet()) {
                 FlowFile match = entry.getKey();
                 final List<Rule> rules = entry.getValue();
+                boolean updateWorking = incomingFlowFile.equals(match);
 
                 // execute each matching rule(s)
-                try {
-                    match = executeActions(session, context, rules, defaultActions, match, statefulAttributes);
-                    logger.info("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()});
-
-                    // transfer the match
-                    session.getProvenanceReporter().modifyAttributes(match);
-                    session.transfer(match, REL_SUCCESS);
-                } catch (IOException e) {
-                    logger.error("Failed to update attributes for {} due to a failure to set the state afterwards; transferring to '{}'", new Object[]{match, REL_FAILED_SET_STATE.getName()}, e);
-                    session.transfer(match, REL_FAILED_SET_STATE);
-                    return;
+                match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes);
+
+                if (updateWorking) {
+                    incomingFlowFile = match;
                 }
+
+                if (debugEnabled) {
+                    logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()});
+                }
+
+                // add the match to the list to transfer
+                flowFilesToTransfer.add(match);
             }
         } else {
-            // transfer the flowfile to no match (that has the default actions applied)
+            // Either we're running without any rules or the FlowFile didn't match any
+            incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes);
+
+            if (debugEnabled) {
+                logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()});
+            }
+
+            // add the flowfile to the list to transfer
+            flowFilesToTransfer.add(incomingFlowFile);
+        }
+
+        if (stateInitialAttributes != null) {
             try {
-                flowFile = executeActions(session, context, null, defaultActions, flowFile, statefulAttributes);
-                logger.info("Updated attributes for {}; transferring to '{}'", new Object[]{flowFile, REL_SUCCESS.getName()});
-                session.getProvenanceReporter().modifyAttributes(flowFile);
-                session.transfer(flowFile, REL_SUCCESS);
+                // Able to use "equals()" since we're just checking if the map was modified at all
+                if (!stateWorkingAttributes.equals(stateInitialAttributes)) {
+
+                    boolean setState = context.getStateManager().replace(stateMap, stateWorkingAttributes, Scope.LOCAL);
+                    if (!setState) {
+                        logger.warn("Failed to update the state after successfully processing {} due to having an old version of the StateMap. This is normally due to multiple threads running at " +
+                                "once; transferring to '{}'", new Object[]{incomingFlowFile, REL_FAILED_SET_STATE.getName()});
+
+                        flowFilesToTransfer.remove(incomingFlowFile);
+                        if (flowFilesToTransfer.size() > 0){
+                            session.remove(flowFilesToTransfer);
+                        }
+
+                        session.transfer(incomingFlowFile, REL_FAILED_SET_STATE);
+                        return;
+                    }
+                }
             } catch (IOException e) {
-                logger.error("Failed to update attributes for {} due to failures setting state afterwards; transferring to '{}'", new Object[]{flowFile, REL_FAILED_SET_STATE.getName()}, e);
-                session.transfer(flowFile, REL_FAILED_SET_STATE);
+                logger.error("Failed to set the state after successfully processing {} due a failure when setting the state. This is normally due to multiple threads running at " +
+                        "once; transferring to '{}'", new Object[]{incomingFlowFile, REL_FAILED_SET_STATE.getName()}, e);
+
+                flowFilesToTransfer.remove(incomingFlowFile);
+                if (flowFilesToTransfer.size() > 0){
+                    session.remove(flowFilesToTransfer);
+                }
+
+                session.transfer(incomingFlowFile, REL_FAILED_SET_STATE);
+                context.yield();
                 return;
             }
         }
+
+        for(FlowFile toTransfer: flowFilesToTransfer) {
+            session.getProvenanceReporter().modifyAttributes(toTransfer);
+        }
+        session.transfer(flowFilesToTransfer, REL_SUCCESS);
     }
 
     //Evaluates the specified Criteria on the specified flowfile. Clones the
@@ -548,7 +586,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
 
     // Executes the specified action on the specified flowfile.
     private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List<Rule> rules, final Map<String, Action> defaultActions, final FlowFile flowfile,
-                                    final Map<String, String> statefulAttributes) throws IOException {
+                                    final Map<String, String> stateInitialAttributes, final Map<String, String> stateWorkingAttributes) {
             final ComponentLog logger = getLogger();
         final Map<String, Action> actions = new HashMap<>(defaultActions);
         final String ruleName = (rules == null || rules.isEmpty()) ? "default" : rules.get(rules.size() - 1).getName();
@@ -579,14 +617,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
         final Map<String, String> attributesToUpdate = new HashMap<>(actions.size());
         final Set<String> attributesToDelete = new HashSet<>(actions.size());
 
-        final Map<String, String> statefulAttributesToSet;
-
-        if (statefulAttributes != null){
-            statefulAttributesToSet = new HashMap<>();
-        } else {
-            statefulAttributesToSet = null;
-        }
-
         // go through each action
         boolean debugEnabled = this.debugEnabled;
         for (final Action action : actions.values()) {
@@ -618,11 +648,11 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
                 }
             } else {
                 boolean notDeleted = !attributesToDelete.contains(attribute);
-                boolean setStatefulAttribute = statefulAttributesToSet != null && !attribute.equals("UpdateAttribute.matchedRule");
+                boolean setStatefulAttribute = stateInitialAttributes != null && !attribute.equals("UpdateAttribute.matchedRule");
 
                 if (notDeleted || setStatefulAttribute) {
                     try {
-                        final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue();
+                        final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, stateInitialAttributes).getValue();
 
                         // log if appropriate
                         if (debugEnabled) {
@@ -630,7 +660,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
                         }
 
                         if (setStatefulAttribute) {
-                            statefulAttributesToSet.put(attribute, newAttributeValue);
+                            stateWorkingAttributes.put(attribute, newAttributeValue);
                         }
 
                         // No point in updating if it will be removed
@@ -669,10 +699,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
             returnFlowfile = session.removeAllAttributes(returnFlowfile, attributesToDelete);
         }
 
-        if(statefulAttributesToSet != null) {
-            context.getStateManager().setState(statefulAttributesToSet, Scope.LOCAL);
-        }
-
         return  returnFlowfile;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8acbe9aa/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
index e2a9d83..75428a6 100644
--- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
@@ -34,11 +34,40 @@
 
         <p>
             Please note that "Delete Attributes Expression" supersedes any updates that occur. If an existing attribute matches the "Delete Attributes Expression", it will be removed whether it
-            was updated or not.  The "Delete Attributes Expression" only applies to attributes that exist in the input FlowFile, if it is added by this processor, the "Delete Attributes Expression"
+            was updated or not.  That said, the "Delete Attributes Expression" only applies to attributes that exist in the input FlowFile, if it is added by this processor, the "Delete Attributes Expression"
             will not detect it.
         </p>
 
         <p>
+            <strong>Properties:</strong>
+        </p>
+        <p>
+            The properties in this processor are added by the user. The expression language is supported in user-added
+            properties for this processor. See the NiFi Expression Language Guide to learn how to formulate proper expression language statements to perform the desired functions.
+        </p>
+
+        <p>
+            If an Attribute is added with the name <strong>alternate.identifier</strong> and that attribute's value is a URI, an ADD_INFO Provenance Event will be registered,
+            correlating the FlowFile with the given alternate identifier.
+        </p>
+
+        <p>
+            <strong>Relationships:</strong>
+        </p>
+        <ul>
+            <li>success
+                <ul>
+                    <li>If the processor successfully updates the specified attribute(s), then the FlowFile follows this relationship.</li>
+                </ul>
+            </li>
+            <li>set state fail
+                <ul>
+                    <li>If the processor is running statefully, and fails to set the state after adding attributes to the FlowFile, then the FlowFile will be routed to this relationship.</li>
+                </ul>
+            </li>
+        </ul>
+
+        <p>
             <strong>Basic Usage</strong>
         </p>
 
@@ -261,6 +290,7 @@
             By selecting "store state locally" option for the "Store State" property UpdateAttribute will not only store the evaluated properties as attributes of the FlowFile but
             also as stateful variables to be referenced in a recursive fashion. This enables the processor to calculate things like the sum or count of incoming FlowFiles. A dynamic property can be
             referenced as a stateful variable like so:
+        </p>
 
             <ul>
                 <li>Dynamic Property
@@ -271,6 +301,7 @@
                 </li>
             </ul>
 
+        <p>
             This example will keep a count of the total number of FlowFiles that have passed through the processor. To use logic on top of State, simply use the "Advanced Usage" of UpdateAttribute.
             All Actions will be stored as stateful attributes as well as being added to FlowFiles. Using the "Advanced Usage" it is possible to keep track of things like a maximum value of the
             flow so far. This would be done by having a condition of "${getStateValue("maxValue"):lt(${value})}" and an action of attribute:"maxValue", value:"${value}".
@@ -278,34 +309,34 @@
             The "Stateful Variables Initial Value" property is used to initialize the stateful variables and is required to be set if running statefully. Some logic rules will require a very high initial value, like using the Advanced rules to
             determine the minimum value.
 
-
             If stateful properties reference other stateful properties then the value for the other stateful properties will be an iteration behind. For example, attempting to calculate the
             average of the incoming stream requires the sum and count. If all three properties are set in the same UpdateAttribute (like below) then the Average will always not include the most
             recent values of count and sum:
 
-            <ul>
-                <li>Count
-                    <ul>
-                        <li>key : theCount</li>
-                        <li>value : ${getStateValue("theCount"):plus(1)}</li>
-                    </ul>
-                </li>
-
-                <li>Sum
-                    <ul>
-                        <li>key : theSum</li>
-                        <li>value : ${getStateValue("theSum"):plus(${flowfileValue})}</li>
-                    </ul>
-                </li>
+        </p>
+        <ul>
+            <li>Count
+                <ul>
+                    <li>key : theCount</li>
+                    <li>value : ${getStateValue("theCount"):plus(1)}</li>
+                </ul>
+            </li>
 
-                <li>Average
-                    <ul>
-                        <li>key : theAverage</li>
-                        <li>value : ${getStateValue("theSum"):divide(getStateValue("theCount"))}</li>
-                    </ul>
-                </li>
-            </ul>
+            <li>Sum
+                <ul>
+                    <li>key : theSum</li>
+                    <li>value : ${getStateValue("theSum"):plus(${flowfileValue})}</li>
+                </ul>
+            </li>
 
+            <li>Average
+                <ul>
+                    <li>key : theAverage</li>
+                    <li>value : ${getStateValue("theSum"):divide(getStateValue("theCount"))}</li>
+                </ul>
+            </li>
+        </ul>
+        <p>
             Instead, since average only relies on theCount and theSum attributes (which are added to the FlowFile as well) there should be a following Stateless UpdateAttribute which properly
             calculates the average.
 
@@ -319,33 +350,47 @@
         </p>
 
         <p>
-            <strong>Properties:</strong>
-        </p>
-        <p>
-            The properties in this processor are added by the user. The expression language is supported in user-added 
-            properties for this processor. See the NiFi Expression Language Guide to learn how to formulate proper expression language statements to perform the desired functions.
+            <strong>Combining the Advanced Usage with Stateful</strong>
         </p>
 
         <p>
-            If an Attribute is added with the name <strong>alternate.identifier</strong> and that attribute's value is a URI, an ADD_INFO Provenance Event will be registered,
-            correlating the FlowFile with the given alternate identifier.
+            The UpdateAttribute processor allows you to use both advanced usage changes (i.e., conditional) in addition to storing the values in state at the same time. This allows UpdateAttribute to
+            act as a stateful rules engine to enable powerful concepts such as a Finite-State machine or keeping track of a min/max value.
+
+            Working with both is relatively simple, when the processor would normally update an attribute on the processor (ie. it matches a conditional rule) the same update is stored to state.
+            Referencing state via the advanced tab is done in the same way too, using "getStateValue".
+
+            Note: In the event the "use clone" policy is set and the state is failed to set, no clones will be generated and only the original FlowFile will be transferred to "set state fail".
         </p>
 
         <p>
-            <strong>Relationships:</strong>
+            <strong>Notes about Concurrency and Stateful Usage</strong></p>
+        <p>
+            When using the stateful option, concurrent tasks should be used with caution. If every incoming FlowFile will update state then it will be much more efficient to have only one
+            task. This is because the first thing the onTrigger does is get the state and the last thing it does is store the state if there are an updates. If it does not have the most up to date
+            initial state when it goes to update it will fail and send the FlowFile to "set state fail". This is done so that the update is successful when it was done with the most recent information.
+            If it didn't do it in this mock-atomic way, there'd be no guarantee that the state is accurate.
+
+            When considering Concurrency, the use-cases generally fall into one of three categories:
         </p>
         <ul>
-            <li>success
-                <ul>
-                    <li>If the processor successfully updates the specified attribute(s), then the FlowFile follows this relationship.</li>
-                </ul>
+            <li>A data stream where each FlowFile updates state ex. updating a counter
             </li>
-            <li>set state fail
-                <ul>
-                    <li>If the processor is running statefully, and fails to set the state after adding attributes to the FlowFile, then the FlowFile will be routed to this relationship.</li>
-                </ul>
+
+            <li>A data stream where a FlowFile doesn't always update state ex. a Finite-State machine
+            </li>
+
+            <li>A data stream that doesn't update state, and a second "control" stream that one updates every time but is rare compared to the data stream ex. a trigger
             </li>
         </ul>
+        <p>
+            The first and last cases are relatively clear-cut in their guidance. For the first, concurrency should not be used. Doing so will just waste CPU and any benefits of concurrency will be wiped
+            due to misses in state. For the last case, it can easily be done using concurrency. Since updates are rare in the first place it will be even more rare that two updates are processed at
+            the same time that cause problems.
+
+            The second case is a bit of a grey area. If updates are rare then concurrency can probably be used. If updates are frequent then concurrency would probably cause more problems than benefits.
+            Regardless, testing to determine the appropriate tuning is the only true answer.
+        </p>
 
     </body>
 </html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8acbe9aa/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
index 1812a50..5c00a20 100644
--- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
+++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
@@ -19,6 +19,7 @@ package org.apache.nifi.update.attributes;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -26,7 +27,10 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.regex.PatternSyntaxException;
 
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processors.attributes.UpdateAttribute;
+import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.update.attributes.serde.CriteriaSerDe;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -153,6 +157,43 @@ public class TestUpdateAttribute {
     }
 
     @Test
+    public void testStateFailures() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
+        final UpdateAttribute processor = (UpdateAttribute) runner.getProcessor();
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+        MockStateManager mockStateManager = runner.getStateManager();
+
+        runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY);
+        runner.setProperty("count", "${getStateValue('count'):plus(1)}");
+        runner.setProperty("sum", "${getStateValue('sum'):plus(${pencils})}");
+        runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0");
+
+        processor.onScheduled(runner.getProcessContext());
+
+        final Map<String, String> attributes2 = new HashMap<>();
+        attributes2.put("pencils", "2");
+
+        mockStateManager.setFailOnStateGet(Scope.LOCAL, true);
+
+        runner.enqueue(new byte[0],attributes2);
+        processor.onTrigger(runner.getProcessContext(), processSessionFactory.createSession());
+
+        runner.assertQueueNotEmpty();
+
+        mockStateManager.setFailOnStateGet(Scope.LOCAL, false);
+        mockStateManager.setFailOnStateSet(Scope.LOCAL, true);
+
+        processor.onTrigger(runner.getProcessContext(), processSessionFactory.createSession());
+
+        runner.assertQueueEmpty();
+
+        runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_FAILED_SET_STATE, 1);
+        runner.getFlowFilesForRelationship(UpdateAttribute.REL_FAILED_SET_STATE).get(0).assertAttributeEquals("count", "1");
+        runner.getFlowFilesForRelationship(UpdateAttribute.REL_FAILED_SET_STATE).get(0).assertAttributeEquals("sum", "2");
+    }
+
+
+    @Test
     public void testStateWithInitValue() throws Exception {
         final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
         runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY);
@@ -219,6 +260,99 @@ public class TestUpdateAttribute {
     }
 
     @Test
+    public void testStateFailuresWithRulesUsingOriginal() throws Exception {
+        final Criteria criteria = getCriteria();
+        criteria.setFlowFilePolicy(FlowFilePolicy.USE_ORIGINAL);
+        addRule(criteria, "rule", Collections.singletonList(
+                // conditions
+                "${getStateValue('maxValue'):lt(${value})}"), getMap(
+                // actions
+                "maxValue", "${value}"));
+        addRule(criteria, "rule2", Collections.singletonList(
+                // conditions
+                "${getStateValue('maxValue2'):lt(${value})}"), getMap(
+                // actions
+                "maxValue2", "${value}"));
+
+        TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
+        final UpdateAttribute processor = (UpdateAttribute) runner.getProcessor();
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+        MockStateManager mockStateManager = runner.getStateManager();
+
+        runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY);
+        runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0");
+        runner.setAnnotationData(serialize(criteria));
+
+
+        processor.onScheduled(runner.getProcessContext());
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("value", "1");
+        runner.enqueue(new byte[0], attributes);
+
+        mockStateManager.setFailOnStateGet(Scope.LOCAL, true);
+        processor.onTrigger(runner.getProcessContext(), processSessionFactory.createSession());
+
+        runner.assertQueueNotEmpty();
+        mockStateManager.setFailOnStateGet(Scope.LOCAL, false);
+        mockStateManager.setFailOnStateSet(Scope.LOCAL, true);
+
+        processor.onTrigger(runner.getProcessContext(), processSessionFactory.createSession());
+
+        runner.assertQueueEmpty();
+
+        runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_FAILED_SET_STATE, 1);
+        runner.getFlowFilesForRelationship(UpdateAttribute.REL_FAILED_SET_STATE).get(0).assertAttributeEquals("maxValue", "1");
+        runner.getFlowFilesForRelationship(UpdateAttribute.REL_FAILED_SET_STATE).get(0).assertAttributeEquals("maxValue2", "1");
+    }
+
+    @Test
+    public void testStateFailuresWithRulesUsingClone() throws Exception {
+        final Criteria criteria = getCriteria();
+        criteria.setFlowFilePolicy(FlowFilePolicy.USE_CLONE);
+        addRule(criteria, "rule", Collections.singletonList(
+                // conditions
+                "${getStateValue('maxValue'):lt(${value})}"), getMap(
+                // actions
+                "maxValue", "${value}"));
+        addRule(criteria, "rule2", Collections.singletonList(
+                // conditions
+                "${getStateValue('maxValue2'):lt(${value})}"), getMap(
+                // actions
+                "maxValue2", "${value}"));
+
+        TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
+        final UpdateAttribute processor = (UpdateAttribute) runner.getProcessor();
+        final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
+        MockStateManager mockStateManager = runner.getStateManager();
+
+        runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY);
+        runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0");
+        runner.setAnnotationData(serialize(criteria));
+
+
+        processor.onScheduled(runner.getProcessContext());
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("value", "1");
+        runner.enqueue(new byte[0], attributes);
+
+        mockStateManager.setFailOnStateGet(Scope.LOCAL, true);
+        processor.onTrigger(runner.getProcessContext(), processSessionFactory.createSession());
+
+        runner.assertQueueNotEmpty();
+        mockStateManager.setFailOnStateGet(Scope.LOCAL, false);
+        mockStateManager.setFailOnStateSet(Scope.LOCAL, true);
+
+        processor.onTrigger(runner.getProcessContext(), processSessionFactory.createSession());
+
+        runner.assertQueueEmpty();
+
+        runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_FAILED_SET_STATE, 1);
+        runner.getFlowFilesForRelationship(UpdateAttribute.REL_FAILED_SET_STATE).get(0).assertAttributeEquals("maxValue", "1");
+        runner.getFlowFilesForRelationship(UpdateAttribute.REL_FAILED_SET_STATE).get(0).assertAttributeNotExists("maxValue2");
+    }
+    @Test
     public void testRuleHitWithStateWithDefault() throws Exception {
         final Criteria criteria = getCriteria();
         addRule(criteria, "rule", Arrays.asList(
@@ -289,11 +423,17 @@ public class TestUpdateAttribute {
     @Test
     public void testMultipleRulesWithStateAndDelete() throws Exception {
         final Criteria criteria = getCriteria();
-        addRule(criteria, "rule", Arrays.asList(
+        criteria.setFlowFilePolicy(FlowFilePolicy.USE_ORIGINAL);
+        addRule(criteria, "rule", Collections.singletonList(
                 // conditions
                 "${getStateValue('maxValue'):lt(${value})}"), getMap(
                 // actions
                 "maxValue", "${value}"));
+        addRule(criteria, "rule2", Collections.singletonList(
+                // conditions
+                "${value:mod(2):equals(0)}"), getMap(
+                // actions
+                "whatIsIt", "even"));
 
         TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
         runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY);
@@ -301,6 +441,8 @@ public class TestUpdateAttribute {
         runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0");
         runner.setAnnotationData(serialize(criteria));
         runner.setProperty("maxValue", "${getStateValue('maxValue')}");
+        runner.setProperty("whatIsIt", "odd");
+        runner.setProperty("whatWasIt", "${getStateValue('whatIsIt')}");
         runner.setProperty("theCount", "${getStateValue('theCount'):plus(1)}");
 
         final Map<String, String> attributes = new HashMap<>();
@@ -322,7 +464,14 @@ public class TestUpdateAttribute {
         final List<MockFlowFile> result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS);
         result.get(3).assertAttributeEquals("maxValue", "5");
         result.get(3).assertAttributeEquals("theCount", "4");
+
         result.get(0).assertAttributeEquals("badValue", null);
+
+        result.get(0).assertAttributeEquals("whatIsIt", "odd");
+        result.get(1).assertAttributeEquals("whatIsIt", "even");
+
+        result.get(2).assertAttributeEquals("whatWasIt", "even");
+        result.get(3).assertAttributeEquals("whatWasIt", "odd");
     }
 
     @Test