You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/01/11 17:29:33 UTC
[nifi] branch main updated: NIFI-10887: Addressed performance concerned. Use String.indexOf() instead of Pattern.matcher() when using Literal Replace. Use a NonFlushableOutputStream when ProcessSession.write() is called. Implemented hashCode() on AbstractConnection. Updated default Run Schedule on ReplaceText from 0 ms to 25 ms. Added a Surround Replacement strategy that allows both prepending and appending text. Updated unit tests to account for this.
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d5c79fdcd1 NIFI-10887: Addressed performance concerned. Use String.indexOf() instead of Pattern.matcher() when using Literal Replace. Use a NonFlushableOutputStream when ProcessSession.write() is called. Implemented hashCode() on AbstractConnection. Updated default Run Schedule on ReplaceText from 0 ms to 25 ms. Added a Surround Replacement strategy that allows both prepending and appending text. Updated unit tests to account for this.
d5c79fdcd1 is described below
commit d5c79fdcd1c806f6376c101f392e333c4a86b805
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Nov 28 13:37:53 2022 -0500
NIFI-10887: Addressed performance concerned. Use String.indexOf() instead of Pattern.matcher() when using Literal Replace. Use a NonFlushableOutputStream when ProcessSession.write() is called. Implemented hashCode() on AbstractConnection. Updated default Run Schedule on ReplaceText from 0 ms to 25 ms. Added a Surround Replacement strategy that allows both prepending and appending text. Updated unit tests to account for this.
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #6724
---
.../repository/StandardProcessSession.java | 9 ++-
.../service/StandardConfigurationContext.java | 13 ++-
.../controller/queue/AbstractFlowFileQueue.java | 5 ++
.../nifi/processors/standard/ReplaceText.java | 92 +++++++++++++++++-----
.../nifi/processors/standard/TestReplaceText.java | 35 ++++++++
5 files changed, 126 insertions(+), 28 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 43b8155f4e..9234d20edf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2989,7 +2989,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
ensureNotAppending(newClaim);
final OutputStream rawStream = claimCache.write(newClaim);
- final OutputStream disableOnClose = new DisableOnCloseOutputStream(rawStream);
+ final OutputStream nonFlushable = new NonFlushableOutputStream(rawStream);
+ final OutputStream disableOnClose = new DisableOnCloseOutputStream(nonFlushable);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose);
final FlowFile sourceFlowFile = source;
@@ -3125,7 +3126,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
ensureNotAppending(newClaim);
try (final OutputStream stream = claimCache.write(newClaim);
- final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream);
+ final NonFlushableOutputStream nonFlushableOutputStream = new NonFlushableOutputStream(stream);
+ final OutputStream disableOnClose = new DisableOnCloseOutputStream(nonFlushableOutputStream);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose)) {
try {
writeRecursionSet.add(source);
@@ -3417,7 +3419,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final ByteCountingInputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
final OutputStream os = claimCache.write(newClaim);
- final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
+ final OutputStream nonFlushableOut = new NonFlushableOutputStream(os);
+ final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(nonFlushableOut);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) {
writeRecursionSet.add(source);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
index 1a2e63eca8..8298c5c3dc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
@@ -116,10 +116,15 @@ public class StandardConfigurationContext implements ConfigurationContext {
public PropertyValue getProperty(final PropertyDescriptor property) {
final String configuredValue = properties.get(property);
- // We need to get the 'canonical representation' of the property descriptor from the component itself,
- // since the supplied PropertyDescriptor may not have the proper default value.
- final PropertyDescriptor resolvedDescriptor = component.getPropertyDescriptor(property.getName());
- final String resolvedValue = (configuredValue == null) ? resolvedDescriptor.getDefaultValue() : configuredValue;
+ final String resolvedValue;
+ if (configuredValue == null) {
+ // We need to get the 'canonical representation' of the property descriptor from the component itself,
+ // since the supplied PropertyDescriptor may not have the proper default value.
+ final PropertyDescriptor resolvedDescriptor = component.getPropertyDescriptor(property.getName());
+ resolvedValue = resolvedDescriptor.getDefaultValue();
+ } else {
+ resolvedValue = configuredValue;
+ }
final ResourceContext resourceContext = new StandardResourceContext(new StandardResourceReferenceFactory(), property);
return new StandardPropertyValue(resourceContext, resolvedValue, serviceLookup, component.getParameterLookup(), preparedQueries.get(property), variableRegistry);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
index 609734a36a..e4038603f8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
@@ -500,4 +500,9 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}
+
+ @Override
+ public int hashCode() {
+ return identifier.hashCode();
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index 2b1effbc73..177eddab0d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.commons.text.StringSubstitutor;
+import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -74,11 +75,11 @@ import java.util.regex.Pattern;
@EventDriven
@SideEffectFree
-@SupportsBatching
+@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Text", "Regular Expression", "Update", "Change", "Replace", "Modify", "Regex"})
-@CapabilityDescription("Updates the content of a FlowFile by evaluating a Regular Expression (regex) against it and replacing the section of "
- + "the content that matches the Regular Expression with some alternate value.")
+@CapabilityDescription("Updates the content of a FlowFile by searching for some textual value in the FlowFile content (via Regular Expression/regex, or literal value) and replacing the " +
+ "section of the content that matches with some alternate value. It can also be used to append or prepend text to the contents of a FlowFile.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class ReplaceText extends AbstractProcessor {
@@ -94,6 +95,7 @@ public class ReplaceText extends AbstractProcessor {
public static final String ENTIRE_TEXT = "Entire text";
public static final String prependValue = "Prepend";
public static final String appendValue = "Append";
+ public static final String surroundValue = "Surround";
public static final String regexReplaceValue = "Regex Replace";
public static final String literalReplaceValue = "Literal Replace";
public static final String alwaysReplace = "Always Replace";
@@ -114,6 +116,9 @@ public class ReplaceText extends AbstractProcessor {
+ "the value will be appended to each line. Similarly, for \"First-Line\", \"Last-Line\", \"Except-Last-Line\" and \"Except-First-Line\" Evaluation Modes,"
+ "the value will be appended to header alone, footer alone, all lines except header and all lines except footer respectively. For \"Entire Text\" evaluation mode,"
+ "the value will be appended to the entire text.");
+ static final AllowableValue SURROUND = new AllowableValue(surroundValue, surroundValue,
+ "Prepends text before the start of the FlowFile (or the start of each line, depending on the configuration of the Evaluation Mode property) " +
+ "as well as appending text to the end of the FlowFile (or the end of each line, depending on the configuration of the Evaluation Mode property)");
static final AllowableValue LITERAL_REPLACE = new AllowableValue(literalReplaceValue, literalReplaceValue,
"Search for all instances of the Search Value and replace the matches with the Replacement Value.");
static final AllowableValue REGEX_REPLACE = new AllowableValue(regexReplaceValue, regexReplaceValue,
@@ -127,6 +132,14 @@ public class ReplaceText extends AbstractProcessor {
"Substitute variable references (specified in ${var} form) using FlowFile attributes for looking up the replacement value by variable name. "
+ "When this strategy is chosen, both the <Search Value> and <Replacement Value> properties are ignored.");
+
+ public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Replacement Strategy")
+ .description("The strategy for how and what to replace within the FlowFile's text content.")
+ .allowableValues(PREPEND, APPEND, SURROUND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE, SUBSTITUTE_VARIABLES)
+ .defaultValue(REGEX_REPLACE.getValue())
+ .required(true)
+ .build();
public static final PropertyDescriptor SEARCH_VALUE = new PropertyDescriptor.Builder()
.name("Regular Expression")
.displayName("Search Value")
@@ -134,6 +147,7 @@ public class ReplaceText extends AbstractProcessor {
.required(true)
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .dependsOn(REPLACEMENT_STRATEGY, REGEX_REPLACE, LITERAL_REPLACE)
.defaultValue(DEFAULT_REGEX)
.build();
public static final PropertyDescriptor REPLACEMENT_VALUE = new PropertyDescriptor.Builder()
@@ -145,6 +159,25 @@ public class ReplaceText extends AbstractProcessor {
.required(true)
.defaultValue(DEFAULT_REPLACEMENT_VALUE)
.addValidator(Validator.VALID)
+ .dependsOn(REPLACEMENT_STRATEGY, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE, PREPEND, APPEND)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ static final PropertyDescriptor PREPEND_TEXT = new PropertyDescriptor.Builder()
+ .name("Text to Prepend")
+ .displayName("Text to Prepend")
+ .description("The text to prepend to the start of the FlowFile, or each line, depending on teh configured value of the Evaluation Mode property")
+ .required(true)
+ .addValidator(Validator.VALID)
+ .dependsOn(REPLACEMENT_STRATEGY, SURROUND)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+ static final PropertyDescriptor APPEND_TEXT = new PropertyDescriptor.Builder()
+ .name("Text to Append")
+ .displayName("Text to Append")
+ .description("The text to append to the end of the FlowFile, or each line, depending on teh configured value of the Evaluation Mode property")
+ .required(true)
+ .addValidator(Validator.VALID)
+ .dependsOn(REPLACEMENT_STRATEGY, SURROUND)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
@@ -166,13 +199,6 @@ public class ReplaceText extends AbstractProcessor {
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
- public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder()
- .name("Replacement Strategy")
- .description("The strategy for how and what to replace within the FlowFile's text content.")
- .allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE, SUBSTITUTE_VARIABLES)
- .defaultValue(REGEX_REPLACE.getValue())
- .required(true)
- .build();
public static final PropertyDescriptor EVALUATION_MODE = new PropertyDescriptor.Builder()
.name("Evaluation Mode")
.description("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or buffer the entire file "
@@ -191,6 +217,8 @@ public class ReplaceText extends AbstractProcessor {
.required(false)
.build();
+
+
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -209,11 +237,13 @@ public class ReplaceText extends AbstractProcessor {
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(REPLACEMENT_STRATEGY);
properties.add(SEARCH_VALUE);
properties.add(REPLACEMENT_VALUE);
+ properties.add(PREPEND_TEXT);
+ properties.add(APPEND_TEXT);
properties.add(CHARACTER_SET);
properties.add(MAX_BUFFER_SIZE);
- properties.add(REPLACEMENT_STRATEGY);
properties.add(EVALUATION_MODE);
properties.add(LINE_BY_LINE_EVALUATION_MODE);
this.properties = Collections.unmodifiableList(properties);
@@ -271,7 +301,10 @@ public class ReplaceText extends AbstractProcessor {
replacementStrategyExecutor = new PrependReplace();
break;
case appendValue:
- replacementStrategyExecutor = new AppendReplace();
+ replacementStrategyExecutor = new SurroundReplace(null, REPLACEMENT_VALUE);
+ break;
+ case surroundValue:
+ replacementStrategyExecutor = new SurroundReplace(PREPEND_TEXT, APPEND_TEXT);
break;
case regexReplaceValue:
// for backward compatibility - if replacement regex is ".*" then we will simply always replace the content.
@@ -454,23 +487,39 @@ public class ReplaceText extends AbstractProcessor {
}
- private static class AppendReplace implements ReplacementStrategyExecutor {
+ private static class SurroundReplace implements ReplacementStrategyExecutor {
+ private final PropertyDescriptor prependValueDescriptor;
+ private final PropertyDescriptor appendValueDescriptor;
+
+ public SurroundReplace(final PropertyDescriptor prependValueDescriptor, final PropertyDescriptor appendValueDescriptor) {
+ this.prependValueDescriptor = prependValueDescriptor;
+ this.appendValueDescriptor = appendValueDescriptor;
+ }
@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
- final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
+ final String prependValue = (prependValueDescriptor == null) ? null : context.getProperty(prependValueDescriptor).evaluateAttributeExpressions(flowFile).getValue();
+ final String appendValue = context.getProperty(appendValueDescriptor).evaluateAttributeExpressions(flowFile).getValue();
if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
+ if (prependValue != null && !prependValue.isEmpty()) {
+ out.write(prependValue.getBytes(charset));
+ }
+
IOUtils.copy(in, out);
- out.write(replacementValue.getBytes(charset));
+ out.write(appendValue.getBytes(charset));
}
});
} else {
flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> {
+ if (prependValue != null && !prependValue.isEmpty()) {
+ bw.write(prependValue);
+ }
+
// we need to find the first carriage return or new-line so that we can append the new value
// before the line separate. However, we don't want to do this using a regular expression due
// to performance concerns. So we will find the first occurrence of either \r or \n and use
@@ -484,7 +533,7 @@ public class ReplaceText extends AbstractProcessor {
}
if (c == '\r' || c == '\n') {
- bw.write(replacementValue);
+ bw.write(appendValue);
foundNewLine = true;
}
@@ -492,7 +541,7 @@ public class ReplaceText extends AbstractProcessor {
}
if (!foundNewLine) {
- bw.write(replacementValue);
+ bw.write(appendValue);
}
}));
}
@@ -641,13 +690,14 @@ public class ReplaceText extends AbstractProcessor {
int matches = 0;
int lastEnd = 0;
- final Matcher matcher = searchPattern.matcher(oneLine);
- while (matcher.find()) {
- bw.write(oneLine, lastEnd, matcher.start() - lastEnd);
+ int index = oneLine.indexOf(searchValue, lastEnd);
+ while (index >= 0) {
+ bw.write(oneLine, lastEnd, index - lastEnd);
bw.write(replacementValue);
matches++;
- lastEnd = matcher.end();
+ lastEnd = index + searchValue.length();
+ index = oneLine.indexOf(searchValue, lastEnd);
}
if (matches > 0) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
index 1b5b6e53e5..9023b37835 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
@@ -1631,6 +1631,41 @@ public class TestReplaceText {
runner.assertValid();
}
+ @Test
+ public void testSurroundWithEntireText() {
+ final TestRunner runner = getRunner();
+ runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.SURROUND);
+ runner.setProperty(ReplaceText.PREPEND_TEXT, "<pre>");
+ runner.setProperty(ReplaceText.APPEND_TEXT, "<post>");
+ runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.ENTIRE_TEXT);
+
+ final String input = "Hello\nThere\nHow are you\nToday?";
+ runner.enqueue(input);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
+
+ final MockFlowFile output = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
+ output.assertContentEquals("<pre>" + input + "<post>");
+ }
+
+ @Test
+ public void testSurroundLineByLine() {
+ final TestRunner runner = getRunner();
+ runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.SURROUND);
+ runner.setProperty(ReplaceText.PREPEND_TEXT, "<pre>");
+ runner.setProperty(ReplaceText.APPEND_TEXT, "<post>");
+ runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.LINE_BY_LINE);
+
+ final String input = "Hello\nThere\nHow are you\nToday?";
+ runner.enqueue(input);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
+
+ final MockFlowFile output = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
+ output.assertContentEquals("<pre>Hello<post>\n<pre>There<post>\n<pre>How are you<post>\n<pre>Today?<post>");
+ }
+
+
@Test
public void testBackReferenceEscapeWithRegexReplaceUsingEL() {
final TestRunner runner = getRunner();