You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ze...@apache.org on 2018/08/09 17:12:20 UTC

nifi git commit: NIFI-5448 Added failure relationship to UpdateAttributes to handle bad expression language logic.

Repository: nifi
Updated Branches:
  refs/heads/master 451084e11 -> 32ee552ad


NIFI-5448 Added failure relationship to UpdateAttributes to handle bad expression language logic.

This closes #2914

Signed-off-by: zenfenan <ze...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/32ee552a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/32ee552a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/32ee552a

Branch: refs/heads/master
Commit: 32ee552ada328ed1189ed2bd0a2af18ed213ddc8
Parents: 451084e
Author: Mike Thomsen <mi...@gmail.com>
Authored: Tue Jul 24 08:49:16 2018 -0400
Committer: zenfenan <si...@gmail.com>
Committed: Thu Aug 9 22:41:50 2018 +0530

----------------------------------------------------------------------
 .../processors/attributes/UpdateAttribute.java  | 82 ++++++++++++++------
 .../update/attributes/TestUpdateAttribute.java  | 29 +++++++
 2 files changed, 87 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/32ee552a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
index 9ffd450..0147608 100644
--- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
+++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
@@ -45,6 +45,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
@@ -99,17 +100,21 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
     // relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .description("All successful FlowFiles are routed to this relationship").name("success").build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .description("All flowfiles that cannot be updated are routed to this relationship").name("failure").autoTerminateDefault(true).build();
     public static final Relationship REL_FAILED_SET_STATE = new Relationship.Builder()
             .description("A failure to set the state after adding the attributes to the FlowFile will route the FlowFile here.").name("set state fail").build();
 
     static {
         Set<Relationship> tempStatelessSet = new HashSet<>();
         tempStatelessSet.add(REL_SUCCESS);
+        tempStatelessSet.add(REL_FAILURE);
 
         statelessRelationshipSet = Collections.unmodifiableSet(tempStatelessSet);
 
         Set<Relationship> tempStatefulSet = new HashSet<>();
         tempStatefulSet.add(REL_SUCCESS);
+        tempStatefulSet.add(REL_FAILURE);
         tempStatefulSet.add(REL_FAILED_SET_STATE);
 
         statefulRelationshipSet = Collections.unmodifiableSet(tempStatefulSet);
@@ -144,6 +149,21 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
         }
     };
 
+    public static final AllowableValue FAIL_STOP = new AllowableValue("stop", "Penalize", "Penalize FlowFiles." +
+            "This is based on the original behavior of the processor to allow for a smooth transition.");
+    public static final AllowableValue FAIL_ROUTE = new AllowableValue("route", "Route to Failure Relationship",
+            "If chosen, failed FlowFiles will be routed to the failure relationship.");
+    public static final PropertyDescriptor FAILURE_BEHAVIOR = new PropertyDescriptor.Builder()
+        .name("update-attribute-failure-behavior")
+        .displayName("Failure Behavior")
+        .description("Control how to handle errors in Expression Language evaluation. The default behavior is to stop evaluation. It can be " +
+                "changed by the user to route to a failure relationship instead.")
+        .allowableValues(FAIL_STOP, FAIL_ROUTE)
+        .defaultValue(FAIL_STOP.getValue())
+        .required(true)
+        .build();
+
+
     // static properties
     public static final String DELETE_ATTRIBUTES_EXPRESSION_NAME = "Delete Attributes Expression";
     public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder()
@@ -197,6 +217,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
         descriptors.add(DELETE_ATTRIBUTES);
         descriptors.add(STORE_STATE);
         descriptors.add(STATEFUL_VARIABLES_INIT_VALUE);
+        descriptors.add(FAILURE_BEHAVIOR);
         return Collections.unmodifiableList(descriptors);
     }
 
@@ -444,40 +465,52 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
         Map<String, Action> defaultActions = this.defaultActions;
         List<FlowFile> flowFilesToTransfer = new LinkedList<>();
 
-        // if there is update criteria specified, evaluate it
-        if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) {
-            // apply the actions for each rule and transfer the flowfile
-            for (final Map.Entry<FlowFile, List<Rule>> entry : matchedRules.entrySet()) {
-                FlowFile match = entry.getKey();
-                final List<Rule> rules = entry.getValue();
-                boolean updateWorking = incomingFlowFile.equals(match);
+        boolean routeToFailure = context.getProperty(FAILURE_BEHAVIOR).getValue().equals(FAIL_ROUTE.getValue());
+        try {
+            // if there is update criteria specified, evaluate it
+            if (criteria != null && evaluateCriteria(session, context, criteria, incomingFlowFile, matchedRules, stateInitialAttributes)) {
+                // apply the actions for each rule and transfer the flowfile
+                for (final Map.Entry<FlowFile, List<Rule>> entry : matchedRules.entrySet()) {
+                    FlowFile match = entry.getKey();
+                    final List<Rule> rules = entry.getValue();
+                    boolean updateWorking = incomingFlowFile.equals(match);
+
+                    // execute each matching rule(s)
+                    match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes);
+
+                    if (updateWorking) {
+                        incomingFlowFile = match;
+                    }
 
-                // execute each matching rule(s)
-                match = executeActions(session, context, rules, defaultActions, match, stateInitialAttributes, stateWorkingAttributes);
+                    if (debugEnabled) {
+                        logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()});
+                    }
 
-                if (updateWorking) {
-                    incomingFlowFile = match;
+                    // add the match to the list to transfer
+                    flowFilesToTransfer.add(match);
                 }
+            } else {
+                // Either we're running without any rules or the FlowFile didn't match any
+                incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes);
 
                 if (debugEnabled) {
-                    logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()});
+                    logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()});
                 }
 
-                // add the match to the list to transfer
-                flowFilesToTransfer.add(match);
+                // add the flowfile to the list to transfer
+                flowFilesToTransfer.add(incomingFlowFile);
             }
-        } else {
-            // Either we're running without any rules or the FlowFile didn't match any
-            incomingFlowFile = executeActions(session, context, null, defaultActions, incomingFlowFile, stateInitialAttributes, stateWorkingAttributes);
-
-            if (debugEnabled) {
-                logger.debug("Updated attributes for {}; transferring to '{}'", new Object[]{incomingFlowFile, REL_SUCCESS.getName()});
+        } catch (ProcessException pe) {
+            if (routeToFailure) {
+                session.transfer(incomingFlowFile, REL_FAILURE);
+                getLogger().error("Failed to update flowfile attribute(s).", pe);
+                return;
+            } else {
+                throw pe;
             }
-
-            // add the flowfile to the list to transfer
-            flowFilesToTransfer.add(incomingFlowFile);
         }
 
+
         if (stateInitialAttributes != null) {
             try {
                 // Able to use "equals()" since we're just checking if the map was modified at all
@@ -713,7 +746,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
         final Map<String, Action> defaultActions = new HashMap<>();
 
         for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
-            if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE) {
+            if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE
+                && entry.getKey() != FAILURE_BEHAVIOR) {
                 final Action action = new Action();
                 action.setAttribute(entry.getKey().getName());
                 action.setValue(entry.getValue());

http://git-wip-us.apache.org/repos/asf/nifi/blob/32ee552a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
index 50938e6..35b5536 100644
--- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
+++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java
@@ -1005,4 +1005,33 @@ public class TestUpdateAttribute {
         }
     }
 
+    @Test
+    public void testInvalidExpressionLanguage() {
+        final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute());
+        runner.setVariable("test", "Squirrel!!1!");
+        runner.setProperty("bad_attr", "${test:toDate('yyyy-MM-dd')}");
+        runner.setProperty(UpdateAttribute.FAILURE_BEHAVIOR, UpdateAttribute.FAIL_ROUTE);
+        runner.assertValid();
+
+        runner.enqueue("Test");
+        runner.run();
+
+        runner.assertTransferCount(UpdateAttribute.REL_SUCCESS, 0);
+        runner.assertTransferCount(UpdateAttribute.REL_FAILURE, 1);
+
+        runner.clearTransferState();
+
+        Throwable ex = null;
+        try {
+            runner.setProperty(UpdateAttribute.FAILURE_BEHAVIOR, UpdateAttribute.FAIL_STOP);
+            runner.enqueue("Test");
+            runner.run();
+        } catch (Throwable t) {
+            ex = t;
+        } finally {
+            Assert.assertNotNull(ex);
+            Assert.assertTrue(ex.getCause() instanceof ProcessException);
+            runner.assertQueueNotEmpty();
+        }
+    }
 }