You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2023/01/11 17:29:33 UTC

[nifi] branch main updated: NIFI-10887: Addressed performance concerned. Use String.indexOf() instead of Pattern.matcher() when using Literal Replace. Use a NonFlushableOutputStream when ProcessSession.write() is called. Implemented hashCode() on AbstractConnection. Updated default Run Schedule on ReplaceText from 0 ms to 25 ms. Added a Surround Replacement strategy that allows both prepending and appending text. Updated unit tests to account for this.

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d5c79fdcd1 NIFI-10887: Addressed performance concerned. Use String.indexOf() instead of Pattern.matcher() when using Literal Replace. Use a NonFlushableOutputStream when ProcessSession.write() is called. Implemented hashCode() on AbstractConnection. Updated default Run Schedule on ReplaceText from 0 ms to 25 ms. Added a Surround Replacement strategy that allows both prepending and appending text. Updated unit tests to account for this.
d5c79fdcd1 is described below

commit d5c79fdcd1c806f6376c101f392e333c4a86b805
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Nov 28 13:37:53 2022 -0500

    NIFI-10887: Addressed performance concerned. Use String.indexOf() instead of Pattern.matcher() when using Literal Replace. Use a NonFlushableOutputStream when ProcessSession.write() is called. Implemented hashCode() on AbstractConnection. Updated default Run Schedule on ReplaceText from 0 ms to 25 ms. Added a Surround Replacement strategy that allows both prepending and appending text. Updated unit tests to account for this.
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #6724
---
 .../repository/StandardProcessSession.java         |  9 ++-
 .../service/StandardConfigurationContext.java      | 13 ++-
 .../controller/queue/AbstractFlowFileQueue.java    |  5 ++
 .../nifi/processors/standard/ReplaceText.java      | 92 +++++++++++++++++-----
 .../nifi/processors/standard/TestReplaceText.java  | 35 ++++++++
 5 files changed, 126 insertions(+), 28 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 43b8155f4e..9234d20edf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2989,7 +2989,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
             ensureNotAppending(newClaim);
 
             final OutputStream rawStream = claimCache.write(newClaim);
-            final OutputStream disableOnClose = new DisableOnCloseOutputStream(rawStream);
+            final OutputStream nonFlushable = new NonFlushableOutputStream(rawStream);
+            final OutputStream disableOnClose = new DisableOnCloseOutputStream(nonFlushable);
             final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose);
 
             final FlowFile sourceFlowFile = source;
@@ -3125,7 +3126,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
 
             ensureNotAppending(newClaim);
             try (final OutputStream stream = claimCache.write(newClaim);
-                final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream);
+                final NonFlushableOutputStream nonFlushableOutputStream = new NonFlushableOutputStream(stream);
+                final OutputStream disableOnClose = new DisableOnCloseOutputStream(nonFlushableOutputStream);
                 final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose)) {
                 try {
                     writeRecursionSet.add(source);
@@ -3417,7 +3419,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
                 final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
                 final ByteCountingInputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
                 final OutputStream os = claimCache.write(newClaim);
-                final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
+                final OutputStream nonFlushableOut = new NonFlushableOutputStream(os);
+                final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(nonFlushableOut);
                 final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) {
 
                 writeRecursionSet.add(source);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
index 1a2e63eca8..8298c5c3dc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
@@ -116,10 +116,15 @@ public class StandardConfigurationContext implements ConfigurationContext {
     public PropertyValue getProperty(final PropertyDescriptor property) {
         final String configuredValue = properties.get(property);
 
-        // We need to get the 'canonical representation' of the property descriptor from the component itself,
-        // since the supplied PropertyDescriptor may not have the proper default value.
-        final PropertyDescriptor resolvedDescriptor = component.getPropertyDescriptor(property.getName());
-        final String resolvedValue = (configuredValue == null) ? resolvedDescriptor.getDefaultValue() : configuredValue;
+        final String resolvedValue;
+        if (configuredValue == null) {
+            // We need to get the 'canonical representation' of the property descriptor from the component itself,
+            // since the supplied PropertyDescriptor may not have the proper default value.
+            final PropertyDescriptor resolvedDescriptor = component.getPropertyDescriptor(property.getName());
+            resolvedValue = resolvedDescriptor.getDefaultValue();
+        } else {
+            resolvedValue = configuredValue;
+        }
 
         final ResourceContext resourceContext = new StandardResourceContext(new StandardResourceReferenceFactory(), property);
         return new StandardPropertyValue(resourceContext, resolvedValue, serviceLookup, component.getParameterLookup(), preparedQueries.get(property), variableRegistry);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
index 609734a36a..e4038603f8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
@@ -500,4 +500,9 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
     public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
         return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
     }
+
+    @Override
+    public int hashCode() {
+        return identifier.hashCode();
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index 2b1effbc73..177eddab0d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.text.StringSubstitutor;
+import org.apache.nifi.annotation.behavior.DefaultRunDuration;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -74,11 +75,11 @@ import java.util.regex.Pattern;
 
 @EventDriven
 @SideEffectFree
-@SupportsBatching
+@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Text", "Regular Expression", "Update", "Change", "Replace", "Modify", "Regex"})
-@CapabilityDescription("Updates the content of a FlowFile by evaluating a Regular Expression (regex) against it and replacing the section of "
-    + "the content that matches the Regular Expression with some alternate value.")
+@CapabilityDescription("Updates the content of a FlowFile by searching for some textual value in the FlowFile content (via Regular Expression/regex, or literal value) and replacing the " +
+    "section of the content that matches with some alternate value. It can also be used to append or prepend text to the contents of a FlowFile.")
 @SystemResourceConsideration(resource = SystemResource.MEMORY)
 public class ReplaceText extends AbstractProcessor {
 
@@ -94,6 +95,7 @@ public class ReplaceText extends AbstractProcessor {
     public static final String ENTIRE_TEXT = "Entire text";
     public static final String prependValue = "Prepend";
     public static final String appendValue = "Append";
+    public static final String surroundValue = "Surround";
     public static final String regexReplaceValue = "Regex Replace";
     public static final String literalReplaceValue = "Literal Replace";
     public static final String alwaysReplace = "Always Replace";
@@ -114,6 +116,9 @@ public class ReplaceText extends AbstractProcessor {
             + "the value will be appended to each line. Similarly, for \"First-Line\", \"Last-Line\", \"Except-Last-Line\" and \"Except-First-Line\" Evaluation Modes,"
             + "the value will be appended to header alone, footer alone, all lines except header and all lines except footer respectively. For \"Entire Text\" evaluation mode,"
             + "the value will be appended to the entire text.");
+    static final AllowableValue SURROUND = new AllowableValue(surroundValue, surroundValue,
+        "Prepends text before the start of the FlowFile (or the start of each line, depending on the configuration of the Evaluation Mode property) " +
+            "as well as appending text to the end of the FlowFile (or the end of each line, depending on the configuration of the Evaluation Mode property)");
     static final AllowableValue LITERAL_REPLACE = new AllowableValue(literalReplaceValue, literalReplaceValue,
         "Search for all instances of the Search Value and replace the matches with the Replacement Value.");
     static final AllowableValue REGEX_REPLACE = new AllowableValue(regexReplaceValue, regexReplaceValue,
@@ -127,6 +132,14 @@ public class ReplaceText extends AbstractProcessor {
             "Substitute variable references (specified in ${var} form) using FlowFile attributes for looking up the replacement value by variable name. "
                     + "When this strategy is chosen, both the <Search Value> and <Replacement Value> properties are ignored.");
 
+
+    public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder()
+        .name("Replacement Strategy")
+        .description("The strategy for how and what to replace within the FlowFile's text content.")
+        .allowableValues(PREPEND, APPEND, SURROUND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE, SUBSTITUTE_VARIABLES)
+        .defaultValue(REGEX_REPLACE.getValue())
+        .required(true)
+        .build();
     public static final PropertyDescriptor SEARCH_VALUE = new PropertyDescriptor.Builder()
         .name("Regular Expression")
         .displayName("Search Value")
@@ -134,6 +147,7 @@ public class ReplaceText extends AbstractProcessor {
         .required(true)
         .addValidator(Validator.VALID)
         .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .dependsOn(REPLACEMENT_STRATEGY, REGEX_REPLACE, LITERAL_REPLACE)
         .defaultValue(DEFAULT_REGEX)
         .build();
     public static final PropertyDescriptor REPLACEMENT_VALUE = new PropertyDescriptor.Builder()
@@ -145,6 +159,25 @@ public class ReplaceText extends AbstractProcessor {
         .required(true)
         .defaultValue(DEFAULT_REPLACEMENT_VALUE)
         .addValidator(Validator.VALID)
+        .dependsOn(REPLACEMENT_STRATEGY, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE, PREPEND, APPEND)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor PREPEND_TEXT = new PropertyDescriptor.Builder()
+        .name("Text to Prepend")
+        .displayName("Text to Prepend")
+        .description("The text to prepend to the start of the FlowFile, or each line, depending on teh configured value of the Evaluation Mode property")
+        .required(true)
+        .addValidator(Validator.VALID)
+        .dependsOn(REPLACEMENT_STRATEGY, SURROUND)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .build();
+    static final PropertyDescriptor APPEND_TEXT = new PropertyDescriptor.Builder()
+        .name("Text to Append")
+        .displayName("Text to Append")
+        .description("The text to append to the end of the FlowFile, or each line, depending on teh configured value of the Evaluation Mode property")
+        .required(true)
+        .addValidator(Validator.VALID)
+        .dependsOn(REPLACEMENT_STRATEGY, SURROUND)
         .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .build();
     public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
@@ -166,13 +199,6 @@ public class ReplaceText extends AbstractProcessor {
         .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
         .defaultValue("1 MB")
         .build();
-    public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder()
-        .name("Replacement Strategy")
-        .description("The strategy for how and what to replace within the FlowFile's text content.")
-        .allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE, SUBSTITUTE_VARIABLES)
-        .defaultValue(REGEX_REPLACE.getValue())
-        .required(true)
-        .build();
     public static final PropertyDescriptor EVALUATION_MODE = new PropertyDescriptor.Builder()
         .name("Evaluation Mode")
         .description("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or buffer the entire file "
@@ -191,6 +217,8 @@ public class ReplaceText extends AbstractProcessor {
         .required(false)
         .build();
 
+
+
     // Relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -209,11 +237,13 @@ public class ReplaceText extends AbstractProcessor {
     @Override
     protected void init(final ProcessorInitializationContext context) {
         final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(REPLACEMENT_STRATEGY);
         properties.add(SEARCH_VALUE);
         properties.add(REPLACEMENT_VALUE);
+        properties.add(PREPEND_TEXT);
+        properties.add(APPEND_TEXT);
         properties.add(CHARACTER_SET);
         properties.add(MAX_BUFFER_SIZE);
-        properties.add(REPLACEMENT_STRATEGY);
         properties.add(EVALUATION_MODE);
         properties.add(LINE_BY_LINE_EVALUATION_MODE);
         this.properties = Collections.unmodifiableList(properties);
@@ -271,7 +301,10 @@ public class ReplaceText extends AbstractProcessor {
                 replacementStrategyExecutor = new PrependReplace();
                 break;
             case appendValue:
-                replacementStrategyExecutor = new AppendReplace();
+                replacementStrategyExecutor = new SurroundReplace(null, REPLACEMENT_VALUE);
+                break;
+            case surroundValue:
+                replacementStrategyExecutor = new SurroundReplace(PREPEND_TEXT, APPEND_TEXT);
                 break;
             case regexReplaceValue:
                 // for backward compatibility - if replacement regex is ".*" then we will simply always replace the content.
@@ -454,23 +487,39 @@ public class ReplaceText extends AbstractProcessor {
 
     }
 
-    private static class AppendReplace implements ReplacementStrategyExecutor {
+    private static class SurroundReplace implements ReplacementStrategyExecutor {
+        private final PropertyDescriptor prependValueDescriptor;
+        private final PropertyDescriptor appendValueDescriptor;
+
+        public SurroundReplace(final PropertyDescriptor prependValueDescriptor, final PropertyDescriptor appendValueDescriptor) {
+            this.prependValueDescriptor = prependValueDescriptor;
+            this.appendValueDescriptor = appendValueDescriptor;
+        }
 
         @Override
         public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
-            final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
+            final String prependValue = (prependValueDescriptor == null) ? null : context.getProperty(prependValueDescriptor).evaluateAttributeExpressions(flowFile).getValue();
+            final String appendValue = context.getProperty(appendValueDescriptor).evaluateAttributeExpressions(flowFile).getValue();
 
             if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
                 flowFile = session.write(flowFile, new StreamCallback() {
                     @Override
                     public void process(final InputStream in, final OutputStream out) throws IOException {
+                        if (prependValue != null && !prependValue.isEmpty()) {
+                            out.write(prependValue.getBytes(charset));
+                        }
+
                         IOUtils.copy(in, out);
-                        out.write(replacementValue.getBytes(charset));
+                        out.write(appendValue.getBytes(charset));
                     }
                 });
             } else {
                 flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
                     (bw, oneLine) -> {
+                        if (prependValue != null && !prependValue.isEmpty()) {
+                            bw.write(prependValue);
+                        }
+
                         // we need to find the first carriage return or new-line so that we can append the new value
                         // before the line separate. However, we don't want to do this using a regular expression due
                         // to performance concerns. So we will find the first occurrence of either \r or \n and use
@@ -484,7 +533,7 @@ public class ReplaceText extends AbstractProcessor {
                             }
 
                             if (c == '\r' || c == '\n') {
-                                bw.write(replacementValue);
+                                bw.write(appendValue);
                                 foundNewLine = true;
                             }
 
@@ -492,7 +541,7 @@ public class ReplaceText extends AbstractProcessor {
                         }
 
                         if (!foundNewLine) {
-                            bw.write(replacementValue);
+                            bw.write(appendValue);
                         }
                     }));
             }
@@ -641,13 +690,14 @@ public class ReplaceText extends AbstractProcessor {
                         int matches = 0;
                         int lastEnd = 0;
 
-                        final Matcher matcher = searchPattern.matcher(oneLine);
-                        while (matcher.find()) {
-                            bw.write(oneLine, lastEnd, matcher.start() - lastEnd);
+                        int index = oneLine.indexOf(searchValue, lastEnd);
+                        while (index >= 0) {
+                            bw.write(oneLine, lastEnd, index - lastEnd);
                             bw.write(replacementValue);
                             matches++;
 
-                            lastEnd = matcher.end();
+                            lastEnd = index + searchValue.length();
+                            index = oneLine.indexOf(searchValue, lastEnd);
                         }
 
                         if (matches > 0) {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
index 1b5b6e53e5..9023b37835 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestReplaceText.java
@@ -1631,6 +1631,41 @@ public class TestReplaceText {
         runner.assertValid();
     }
 
+    @Test
+    public void testSurroundWithEntireText() {
+        final TestRunner runner = getRunner();
+        runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.SURROUND);
+        runner.setProperty(ReplaceText.PREPEND_TEXT, "<pre>");
+        runner.setProperty(ReplaceText.APPEND_TEXT, "<post>");
+        runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.ENTIRE_TEXT);
+
+        final String input = "Hello\nThere\nHow are you\nToday?";
+        runner.enqueue(input);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
+
+        final MockFlowFile output = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
+        output.assertContentEquals("<pre>" + input + "<post>");
+    }
+
+    @Test
+    public void testSurroundLineByLine() {
+        final TestRunner runner = getRunner();
+        runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.SURROUND);
+        runner.setProperty(ReplaceText.PREPEND_TEXT, "<pre>");
+        runner.setProperty(ReplaceText.APPEND_TEXT, "<post>");
+        runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.LINE_BY_LINE);
+
+        final String input = "Hello\nThere\nHow are you\nToday?";
+        runner.enqueue(input);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);
+
+        final MockFlowFile output = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
+        output.assertContentEquals("<pre>Hello<post>\n<pre>There<post>\n<pre>How are you<post>\n<pre>Today?<post>");
+    }
+
+
     @Test
     public void testBackReferenceEscapeWithRegexReplaceUsingEL() {
         final TestRunner runner = getRunner();