You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/12/28 20:07:10 UTC
nifi git commit: NIFI-3249 - UpdateAttribute performance improvements
Repository: nifi
Updated Branches:
refs/heads/master 16898668c -> 35e8bedcc
NIFI-3249 - UpdateAttribute performance improvements
This closes #1356
Signed-off-by: jpercivall <JP...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/35e8bedc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/35e8bedc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/35e8bedc
Branch: refs/heads/master
Commit: 35e8bedcc878ceb67638a50ff6c2506a7bb92c75
Parents: 1689866
Author: Bryan Rosander <br...@apache.org>
Authored: Thu Dec 22 12:02:31 2016 -0500
Committer: jpercivall <JP...@apache.org>
Committed: Wed Dec 28 14:54:05 2016 -0500
----------------------------------------------------------------------
.../processors/attributes/UpdateAttribute.java | 115 +++++++++++--------
.../additionalDetails.html | 6 +
2 files changed, 75 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/35e8bedc/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
index 86f523a..4dee379 100644
--- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
+++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
@@ -44,6 +44,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
@@ -144,16 +145,20 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
};
// static properties
+ public static final String DELETE_ATTRIBUTES_EXPRESSION_NAME = "Delete Attributes Expression";
public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder()
- .name("Delete Attributes Expression")
- .description("Regular expression for attributes to be deleted from FlowFiles.")
+ .name(DELETE_ATTRIBUTES_EXPRESSION_NAME)
+ .displayName(DELETE_ATTRIBUTES_EXPRESSION_NAME)
+ .description("Regular expression for attributes to be deleted from FlowFiles. Existing attributes that match will be deleted regardless of whether they are updated by this processor.")
.required(false)
.addValidator(DELETE_PROPERTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
+ public static final String STORE_STATE_NAME = "Store State";
public static final PropertyDescriptor STORE_STATE = new PropertyDescriptor.Builder()
- .name("Store State")
+ .name(STORE_STATE_NAME)
+ .displayName(STORE_STATE_NAME)
.description("Select whether or not state will be stored. Selecting 'Stateless' will offer the default functionality of purely updating the attributes on a " +
"FlowFile in a stateless manner. Selecting a stateful option will not only store the attributes on the FlowFile but also in the Processors " +
"state. See the 'Stateful Usage' topic of the 'Additional Details' section of this processor's documentation for more information")
@@ -161,14 +166,20 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
.allowableValues(DO_NOT_STORE_STATE, STORE_STATE_LOCALLY)
.defaultValue(DO_NOT_STORE_STATE)
.build();
+
+ public static final String STATEFUL_VARIABLES_INIT_VALUE_NAME = "Stateful Variables Initial Value";
public static final PropertyDescriptor STATEFUL_VARIABLES_INIT_VALUE = new PropertyDescriptor.Builder()
- .name("Stateful Variables Initial Value")
+ .name(STATEFUL_VARIABLES_INIT_VALUE_NAME)
+ .displayName(STATEFUL_VARIABLES_INIT_VALUE_NAME)
.description("If using state to set/reference variables then this value is used to set the initial value of the stateful variable. This will only be used in the @OnScheduled method " +
"when state does not contain a value for the variable. This is required if running statefully but can be empty if needed.")
.required(false)
.addValidator(Validator.VALID)
.build();
+ private volatile Map<String, Action> defaultActions;
+ private volatile boolean debugEnabled;
+
public UpdateAttribute() {
relationships = statelessRelationshipSet;
@@ -259,6 +270,14 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
context.getStateManager().setState(tempMap, Scope.LOCAL);
}
+
+ defaultActions = getDefaultActions(context.getProperties());
+ debugEnabled = getLogger().isDebugEnabled();
+ }
+
+ @OnStopped
+ public void onStopped() {
+ defaultActions = null;
}
@Override
@@ -397,11 +416,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
return;
}
- final Map<PropertyDescriptor, String> properties = context.getProperties();
-
- // get the default actions
- final Map<String, Action> defaultActions = getDefaultActions(properties);
-
// record which rule should be applied to which flow file - when operating
// in 'use clone' mode, this collection will contain a number of entries
// that map to single element lists. this is because the original flowfile
@@ -427,6 +441,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
return;
}
+ Map<String, Action> defaultActions = this.defaultActions;
+
// if there is update criteria specified, evaluate it
if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules, statefulAttributes)) {
// apply the actions for each rule and transfer the flowfile
@@ -493,7 +509,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
rulesForFlowFile.add(rule);
// log if appropriate
- if (logger.isDebugEnabled()) {
+ if (debugEnabled) {
logger.debug(this + " all conditions met for rule '" + rule.getName() + "'. Using flow file - " + flowfileToUse);
}
}
@@ -517,16 +533,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
private PropertyValue getPropertyValue(final String text, final ProcessContext context) {
- PropertyValue currentValue = propertyValues.get(text);
- if (currentValue == null) {
- currentValue = context.newPropertyValue(text);
- PropertyValue previousValue = propertyValues.putIfAbsent(text, currentValue);
- if (previousValue != null) {
- currentValue = previousValue;
- }
- }
-
- return currentValue;
+ return propertyValues.computeIfAbsent(text, k -> context.newPropertyValue(text));
}
// Evaluates the specified condition on the specified flowfile.
@@ -580,29 +587,11 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
statefulAttributesToSet = null;
}
-
// go through each action
+ boolean debugEnabled = this.debugEnabled;
for (final Action action : actions.values()) {
- if (!action.getAttribute().equals(DELETE_ATTRIBUTES.getName())) {
- try {
- final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue();
-
- // log if appropriate
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, flowfile, ruleName));
- }
-
- if (statefulAttributesToSet != null) {
- if(!action.getAttribute().equals("UpdateAttribute.matchedRule")) {
- statefulAttributesToSet.put(action.getAttribute(), newAttributeValue);
- }
- }
-
- attributesToUpdate.put(action.getAttribute(), newAttributeValue);
- } catch (final ProcessException pe) {
- throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe);
- }
- } else {
+ String attribute = action.getAttribute();
+ if (DELETE_ATTRIBUTES_EXPRESSION_NAME.equals(attribute)) {
try {
final String actionValue = action.getValue();
final String regex = (actionValue == null) ? null :
@@ -614,17 +603,43 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
if (pattern.matcher(key).matches()) {
// log if appropriate
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("%s deleting attribute '%s' for %s per regex '%s'.", this,
- key, flowfile, regex));
+ if (debugEnabled) {
+ logger.debug(String.format("%s deleting attribute '%s' for %s per regex '%s'.", this, key, flowfile, regex));
}
attributesToDelete.add(key);
}
}
+ // No point in updating if they will be removed
+ attributesToUpdate.keySet().removeAll(attributesToDelete);
}
} catch (final ProcessException pe) {
- throw new ProcessException(String.format("Unable to delete attribute '%s': %s.", action.getAttribute(), pe), pe);
+ throw new ProcessException(String.format("Unable to delete attribute '%s': %s.", attribute, pe), pe);
+ }
+ } else {
+ boolean notDeleted = !attributesToDelete.contains(attribute);
+ boolean setStatefulAttribute = statefulAttributesToSet != null && !attribute.equals("UpdateAttribute.matchedRule");
+
+ if (notDeleted || setStatefulAttribute) {
+ try {
+ final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue();
+
+ // log if appropriate
+ if (debugEnabled) {
+ logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, attribute, newAttributeValue, flowfile, ruleName));
+ }
+
+ if (setStatefulAttribute) {
+ statefulAttributesToSet.put(attribute, newAttributeValue);
+ }
+
+ // No point in updating if it will be removed
+ if (notDeleted) {
+ attributesToUpdate.put(attribute, newAttributeValue);
+ }
+ } catch (final ProcessException pe) {
+ throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", attribute, pe), pe);
+ }
}
}
}
@@ -644,7 +659,15 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable {
}
// update and delete the FlowFile attributes
- FlowFile returnFlowfile = session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete);
+ FlowFile returnFlowfile = flowfile;
+
+ if (attributesToUpdate.size() > 0) {
+ returnFlowfile = session.putAllAttributes(returnFlowfile, attributesToUpdate);
+ }
+
+ if (attributesToDelete.size() > 0) {
+ returnFlowfile = session.removeAllAttributes(returnFlowfile, attributesToDelete);
+ }
if(statefulAttributesToSet != null) {
context.getStateManager().setState(statefulAttributesToSet, Scope.LOCAL);
http://git-wip-us.apache.org/repos/asf/nifi/blob/35e8bedc/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
index 8a60c8f..e2a9d83 100644
--- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html
@@ -33,6 +33,12 @@
</p>
<p>
+ Please note that "Delete Attributes Expression" supersedes any updates that occur. If an existing attribute matches the "Delete Attributes Expression", it will be removed whether it
+ was updated or not. The "Delete Attributes Expression" only applies to attributes that exist in the input FlowFile, if it is added by this processor, the "Delete Attributes Expression"
+ will not detect it.
+ </p>
+
+ <p>
<strong>Basic Usage</strong>
</p>