You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/12/28 20:07:10 UTC

nifi git commit: NIFI-3249 - UpdateAttribute performance improvements

Repository: nifi
Updated Branches:
  refs/heads/master 16898668c -> 35e8bedcc


NIFI-3249 - UpdateAttribute performance improvements

This closes #1356

Signed-off-by: jpercivall <JP...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/35e8bedc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/35e8bedc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/35e8bedc

Branch: refs/heads/master
Commit: 35e8bedcc878ceb67638a50ff6c2506a7bb92c75
Parents: 1689866
Author: Bryan Rosander <br...@apache.org>
Authored: Thu Dec 22 12:02:31 2016 -0500
Committer: jpercivall <JP...@apache.org>
Committed: Wed Dec 28 14:54:05 2016 -0500

----------------------------------------------------------------------
 .../processors/attributes/UpdateAttribute.java  | 115 +++++++++++--------
 .../additionalDetails.html                      |   6 +
 2 files changed, 75 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/35e8bedc/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
index 86f523a..4dee379 100644
--- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
+++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
@@ -44,6 +44,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
@@ -144,16 +145,20 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
     };
 
     // static properties
+    public static final String DELETE_ATTRIBUTES_EXPRESSION_NAME = "Delete Attributes Expression";
     public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder()
-            .name("Delete Attributes Expression")
-            .description("Regular expression for attributes to be deleted from FlowFiles.")
+            .name(DELETE_ATTRIBUTES_EXPRESSION_NAME)
+            .displayName(DELETE_ATTRIBUTES_EXPRESSION_NAME)
+            .description("Regular expression for attributes to be deleted from FlowFiles.  Existing attributes that match will be deleted regardless of whether they are updated by this processor.")
             .required(false)
             .addValidator(DELETE_PROPERTY_VALIDATOR)
             .expressionLanguageSupported(true)
             .build();
 
+    public static final String STORE_STATE_NAME = "Store State";
     public static final PropertyDescriptor STORE_STATE = new PropertyDescriptor.Builder()
-            .name("Store State")
+            .name(STORE_STATE_NAME)
+            .displayName(STORE_STATE_NAME)
             .description("Select whether or not state will be stored. Selecting 'Stateless' will offer the default functionality of purely updating the attributes on a " +
                     "FlowFile in a stateless manner. Selecting a stateful option will not only store the attributes on the FlowFile but also in the Processors " +
                     "state. See the 'Stateful Usage' topic of the 'Additional Details' section of this processor's documentation for more information")
@@ -161,14 +166,20 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
             .allowableValues(DO_NOT_STORE_STATE, STORE_STATE_LOCALLY)
             .defaultValue(DO_NOT_STORE_STATE)
             .build();
+
+    public static final String STATEFUL_VARIABLES_INIT_VALUE_NAME = "Stateful Variables Initial Value";
     public static final PropertyDescriptor STATEFUL_VARIABLES_INIT_VALUE = new PropertyDescriptor.Builder()
-            .name("Stateful Variables Initial Value")
+            .name(STATEFUL_VARIABLES_INIT_VALUE_NAME)
+            .displayName(STATEFUL_VARIABLES_INIT_VALUE_NAME)
             .description("If using state to set/reference variables then this value is used to set the initial value of the stateful variable. This will only be used in the @OnScheduled method " +
                     "when state does not contain a value for the variable. This is required if running statefully but can be empty if needed.")
             .required(false)
             .addValidator(Validator.VALID)
             .build();
 
+    private volatile Map<String, Action> defaultActions;
+    private volatile boolean debugEnabled;
+
 
     public UpdateAttribute() {
         relationships = statelessRelationshipSet;
@@ -259,6 +270,14 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
 
             context.getStateManager().setState(tempMap, Scope.LOCAL);
         }
+
+        defaultActions = getDefaultActions(context.getProperties());
+        debugEnabled = getLogger().isDebugEnabled();
+    }
+
+    @OnStopped
+    public void onStopped() {
+        defaultActions = null;
     }
 
     @Override
@@ -397,11 +416,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
             return;
         }
 
-        final Map<PropertyDescriptor, String> properties = context.getProperties();
-
-        // get the default actions
-        final Map<String, Action> defaultActions = getDefaultActions(properties);
-
         // record which rule should be applied to which flow file - when operating
         // in 'use clone' mode, this collection will contain a number of entries
         // that map to single element lists. this is because the original flowfile
@@ -427,6 +441,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
             return;
         }
 
+        Map<String, Action> defaultActions = this.defaultActions;
+
         // if there is update criteria specified, evaluate it
         if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules, statefulAttributes)) {
             // apply the actions for each rule and transfer the flowfile
@@ -493,7 +509,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
                 rulesForFlowFile.add(rule);
 
                 // log if appropriate
-                if (logger.isDebugEnabled()) {
+                if (debugEnabled) {
                     logger.debug(this + " all conditions met for rule '" + rule.getName() + "'. Using flow file - " + flowfileToUse);
                 }
             }
@@ -517,16 +533,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
     }
 
     private PropertyValue getPropertyValue(final String text, final ProcessContext context) {
-        PropertyValue currentValue = propertyValues.get(text);
-        if (currentValue == null) {
-            currentValue = context.newPropertyValue(text);
-            PropertyValue previousValue = propertyValues.putIfAbsent(text, currentValue);
-            if (previousValue != null) {
-                currentValue = previousValue;
-            }
-        }
-
-        return currentValue;
+        return propertyValues.computeIfAbsent(text, k -> context.newPropertyValue(text));
     }
 
     // Evaluates the specified condition on the specified flowfile.
@@ -580,29 +587,11 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
             statefulAttributesToSet = null;
         }
 
-
         // go through each action
+        boolean debugEnabled = this.debugEnabled;
         for (final Action action : actions.values()) {
-            if (!action.getAttribute().equals(DELETE_ATTRIBUTES.getName())) {
-                try {
-                    final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue();
-
-                    // log if appropriate
-                    if (logger.isDebugEnabled()) {
-                        logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, flowfile, ruleName));
-                    }
-
-                    if (statefulAttributesToSet != null) {
-                        if(!action.getAttribute().equals("UpdateAttribute.matchedRule")) {
-                            statefulAttributesToSet.put(action.getAttribute(), newAttributeValue);
-                        }
-                    }
-
-                    attributesToUpdate.put(action.getAttribute(), newAttributeValue);
-                } catch (final ProcessException pe) {
-                    throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe);
-                }
-            } else {
+            String attribute = action.getAttribute();
+            if (DELETE_ATTRIBUTES_EXPRESSION_NAME.equals(attribute)) {
                 try {
                     final String actionValue = action.getValue();
                     final String regex = (actionValue == null) ? null :
@@ -614,17 +603,43 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
                             if (pattern.matcher(key).matches()) {
 
                                 // log if appropriate
-                                if (logger.isDebugEnabled()) {
-                                    logger.debug(String.format("%s deleting attribute '%s' for %s per regex '%s'.", this,
-                                            key, flowfile, regex));
+                                if (debugEnabled) {
+                                    logger.debug(String.format("%s deleting attribute '%s' for %s per regex '%s'.", this, key, flowfile, regex));
                                 }
 
                                 attributesToDelete.add(key);
                             }
                         }
+                        // No point in updating if they will be removed
+                        attributesToUpdate.keySet().removeAll(attributesToDelete);
                     }
                 } catch (final ProcessException pe) {
-                    throw new ProcessException(String.format("Unable to delete attribute '%s': %s.", action.getAttribute(), pe), pe);
+                    throw new ProcessException(String.format("Unable to delete attribute '%s': %s.", attribute, pe), pe);
+                }
+            } else {
+                boolean notDeleted = !attributesToDelete.contains(attribute);
+                boolean setStatefulAttribute = statefulAttributesToSet != null && !attribute.equals("UpdateAttribute.matchedRule");
+
+                if (notDeleted || setStatefulAttribute) {
+                    try {
+                        final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue();
+
+                        // log if appropriate
+                        if (debugEnabled) {
+                            logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, attribute, newAttributeValue, flowfile, ruleName));
+                        }
+
+                        if (setStatefulAttribute) {
+                            statefulAttributesToSet.put(attribute, newAttributeValue);
+                        }
+
+                        // No point in updating if it will be removed
+                        if (notDeleted) {
+                            attributesToUpdate.put(attribute, newAttributeValue);
+                        }
+                    } catch (final ProcessException pe) {
+                        throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", attribute, pe), pe);
+                    }
                 }
             }
         }
@@ -644,7 +659,15 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
         }
 
         // update and delete the FlowFile attributes
-        FlowFile returnFlowfile = session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete);
+        FlowFile returnFlowfile = flowfile;
+
+        if (attributesToUpdate.size() > 0) {
+            returnFlowfile = session.putAllAttributes(returnFlowfile, attributesToUpdate);
+        }
+
+        if (attributesToDelete.size() > 0) {
+            returnFlowfile = session.removeAllAttributes(returnFlowfile, attributesToDelete);
+        }
 
         if(statefulAttributesToSet != null) {
             context.getStateManager().setState(statefulAttributesToSet, Scope.LOCAL);

http://git-wip-us.apache.org/repos/asf/nifi/blob/35e8bedc/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
index 8a60c8f..e2a9d83 100644
--- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
@@ -33,6 +33,12 @@
         </p>
 
         <p>
+            Please note that "Delete Attributes Expression" supersedes any updates that occur. If an existing attribute matches the "Delete Attributes Expression", it will be removed whether it
+            was updated or not.  The "Delete Attributes Expression" only applies to attributes that exist in the input FlowFile, if it is added by this processor, the "Delete Attributes Expression"
+            will not detect it.
+        </p>
+
+        <p>
             <strong>Basic Usage</strong>
         </p>