You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/04/08 07:33:40 UTC

[nifi] 02/02: NIFI-5979 Add Line-by-Line Evaluation Mode to ReplaceText

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 1c588f10b276dc8d2937b2c78caf0e034bc43fa8
Author: Koji Kawamura <ij...@apache.org>
AuthorDate: Mon Apr 8 11:35:30 2019 +0900

    NIFI-5979 Add Line-by-Line Evaluation Mode to ReplaceText
    
    Refactored to use functions to better handle strategy specific variables
    via closure.
---
 .../nifi/processors/standard/ReplaceText.java      | 233 ++++++++++-----------
 1 file changed, 105 insertions(+), 128 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index 851770e..773458b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@ -53,7 +53,6 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.util.LineDemarcator;
 import org.apache.nifi.util.StopWatch;
 
-import javax.annotation.Nullable;
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStream;
@@ -386,31 +385,28 @@ public class ReplaceText extends AbstractProcessor {
                     }
                 });
             } else {
-                flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile,  null));
-            }
+                flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
+                    ((bw, oneLine) -> {
+                        // We need to determine what line ending was used and use that after our replacement value.
+                        lineEndingBuilder.setLength(0);
+                        for (int i = oneLine.length() - 1; i >= 0; i--) {
+                            final char c = oneLine.charAt(i);
+                            if (c == '\r' || c == '\n') {
+                                lineEndingBuilder.append(c);
+                            } else {
+                                break;
+                            }
+                        }
 
-            return flowFile;
-        }
+                        bw.write(replacementValue);
 
-        public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
-            final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
-            final StringBuilder lineEndingBuilder = new StringBuilder(2);
-            // We need to determine what line ending was used and use that after our replacement value.
-            lineEndingBuilder.setLength(0);
-            for (int i = oneLine.length() - 1; i >= 0; i--) {
-                final char c = oneLine.charAt(i);
-                if (c == '\r' || c == '\n') {
-                    lineEndingBuilder.append(c);
-                } else {
-                    break;
-                }
+                        // Preserve original line endings. Reverse string because we iterated over original line ending in reverse order, appending to builder.
+                        // So if builder has multiple characters, they are now reversed from the original string's ordering.
+                        bw.write(lineEndingBuilder.reverse().toString());
+                    })));
             }
 
-            bw.write(replacementValue);
-
-            // Preserve original line endings. Reverse string because we iterated over original line ending in reverse order, appending to builder.
-            // So if builder has multiple characters, they are now reversed from the original string's ordering.
-            bw.write(lineEndingBuilder.reverse().toString());
+            return flowFile;
         }
 
         @Override
@@ -433,7 +429,8 @@ public class ReplaceText extends AbstractProcessor {
                     }
                 });
             } else {
-                flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile,  null));
+                flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
+                    (bw, oneLine) -> bw.write(replacementValue.concat(oneLine))));
             }
             return flowFile;
         }
@@ -443,11 +440,6 @@ public class ReplaceText extends AbstractProcessor {
             return false;
         }
 
-        @Override
-        public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
-            final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
-            bw.write(replacementValue.concat(oneLine));
-        }
     }
 
     private class AppendReplace implements ReplacementStrategyExecutor {
@@ -465,36 +457,34 @@ public class ReplaceText extends AbstractProcessor {
                     }
                 });
             } else {
-                flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile,  null));
-            }
-            return flowFile;
-        }
-
-        public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
-            String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
-            // we need to find the first carriage return or new-line so that we can append the new value
-            // before the line separate. However, we don't want to do this using a regular expression due
-            // to performance concerns. So we will find the first occurrence of either \r or \n and use
-            // that to insert the replacement value.
-            boolean foundNewLine = false;
-            for (int i = 0; i < oneLine.length(); i++) {
-                final char c = oneLine.charAt(i);
-                if (foundNewLine) {
-                    bw.write(c);
-                    continue;
-                }
-
-                if (c == '\r' || c == '\n') {
-                    bw.write(replacementValue);
-                    foundNewLine = true;
-                }
-
-                bw.write(c);
-            }
+                flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
+                    (bw, oneLine) -> {
+                        // we need to find the first carriage return or new-line so that we can append the new value
+                        // before the line separate. However, we don't want to do this using a regular expression due
+                        // to performance concerns. So we will find the first occurrence of either \r or \n and use
+                        // that to insert the replacement value.
+                        boolean foundNewLine = false;
+                        for (int i = 0; i < oneLine.length(); i++) {
+                            final char c = oneLine.charAt(i);
+                            if (foundNewLine) {
+                                bw.write(c);
+                                continue;
+                            }
+
+                            if (c == '\r' || c == '\n') {
+                                bw.write(replacementValue);
+                                foundNewLine = true;
+                            }
+
+                            bw.write(c);
+                        }
 
-            if (!foundNewLine) {
-                bw.write(replacementValue);
+                        if (!foundNewLine) {
+                            bw.write(replacementValue);
+                        }
+                    }));
             }
+            return flowFile;
         }
 
         @Override
@@ -580,46 +570,42 @@ public class ReplaceText extends AbstractProcessor {
                 }
 
             } else {
-                updatedFlowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile,  searchPattern));
-            }
+                final Matcher matcher = searchPattern.matcher("");
+                updatedFlowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
+                    (bw, oneLine) -> {
+                        additionalAttrs.clear();
+                            matcher.reset(oneLine);
 
-            return updatedFlowFile;
-        }
+                        int matches = 0;
+                        StringBuffer sb = new StringBuffer();
 
-        public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
-            additionalAttrs.clear();
-            if (matcher == null) {
-                matcher = searchPattern.matcher(oneLine);
-            } else {
-                matcher.reset(oneLine);
-            }
+                        while (matcher.find()) {
+                            matches++;
 
-            int matches = 0;
-            StringBuffer sb = new StringBuffer();
+                            for (int i=0; i <= matcher.groupCount(); i++) {
+                                additionalAttrs.put("$" + i, matcher.group(i));
+                            }
 
-            while (matcher.find()) {
-                matches++;
+                            String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue();
+                            replacement = escapeLiteralBackReferences(replacement, numCapturingGroups);
+                            String replacementFinal = normalizeReplacementString(replacement);
 
-                for (int i=0; i <= matcher.groupCount(); i++) {
-                    additionalAttrs.put("$" + i, matcher.group(i));
-                }
+                            matcher.appendReplacement(sb, replacementFinal);
+                        }
 
-                String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue();
-                replacement = escapeLiteralBackReferences(replacement, numCapturingGroups);
-                String replacementFinal = normalizeReplacementString(replacement);
+                        if (matches > 0) {
+                            matcher.appendTail(sb);
 
-                matcher.appendReplacement(sb, replacementFinal);
+                            final String updatedValue = sb.toString();
+                            bw.write(updatedValue);
+                        } else {
+                            // No match. Just write out the line as it was.
+                            bw.write(oneLine);
+                        }
+                    }));
             }
 
-            if (matches > 0) {
-                matcher.appendTail(sb);
-
-                final String updatedValue = sb.toString();
-                bw.write(updatedValue);
-            } else {
-                // No match. Just write out the line as it was.
-                bw.write(oneLine);
-            }
+            return updatedFlowFile;
         }
 
         @Override
@@ -657,30 +643,28 @@ public class ReplaceText extends AbstractProcessor {
             } else {
                 final Pattern searchPattern = Pattern.compile(searchValue, Pattern.LITERAL);
 
-                flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile,  searchPattern));
-            }
-            return flowFile;
-        }
-
-        public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
-            String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
-            int matches = 0;
-            int lastEnd = 0;
-
+                flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
+                    (bw, oneLine) -> {
+                        int matches = 0;
+                        int lastEnd = 0;
 
-            while (matcher.find()) {
-                bw.write(oneLine, lastEnd, matcher.start() - lastEnd);
-                bw.write(replacementValue);
-                matches++;
+                        final Matcher matcher = searchPattern.matcher(oneLine);
+                        while (matcher.find()) {
+                            bw.write(oneLine, lastEnd, matcher.start() - lastEnd);
+                            bw.write(replacementValue);
+                            matches++;
 
-                lastEnd = matcher.end();
-            }
+                            lastEnd = matcher.end();
+                        }
 
-            if (matches > 0) {
-                bw.write(oneLine, lastEnd, oneLine.length() - lastEnd);
-            } else {
-                bw.write(oneLine);
+                        if (matches > 0) {
+                            bw.write(oneLine, lastEnd, oneLine.length() - lastEnd);
+                        } else {
+                            bw.write(oneLine);
+                        }
+                    }));
             }
+            return flowFile;
         }
 
         @Override
@@ -706,49 +690,43 @@ public class ReplaceText extends AbstractProcessor {
         FlowFile replace(FlowFile flowFile, ProcessSession session, ProcessContext context, String evaluateMode, Charset charset, int maxBufferSize);
 
         boolean isAllDataBufferedForEntireText();
+    }
 
-        void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException ;
+    @FunctionalInterface
+    private interface ReplaceLine {
+        void apply(BufferedWriter bw, String oneLine) throws IOException;
     }
 
 
     private class StreamReplaceCallback implements StreamCallback {
         private final Charset charset;
         private final int maxBufferSize;
-        private final ProcessContext context;
-        private final FlowFile flowFile;
-        private final ReplacementStrategyExecutor replacementStrategyExecutor;
-        private final Pattern searchPattern;
+        private final String lineByLineEvaluationMode;
+        private final ReplaceLine replaceLine;
 
-        public StreamReplaceCallback(ReplacementStrategyExecutor replacementStrategyExecutor,
-                                     Charset charset,
+        private StreamReplaceCallback(Charset charset,
                                      int maxBufferSize,
-                                     ProcessContext context,
-                                     FlowFile flowFile,
-                                     @Nullable Pattern searchPattern) {
-            this.replacementStrategyExecutor = replacementStrategyExecutor;
+                                     String lineByLineEvaluationMode,
+                                     ReplaceLine replaceLine) {
             this.charset = charset;
             this.maxBufferSize = maxBufferSize;
-            this.context = context;
-            this.flowFile = flowFile;
-            this.searchPattern = searchPattern;
+            this.lineByLineEvaluationMode = lineByLineEvaluationMode;
+            this.replaceLine = replaceLine;
         }
 
         @Override
         public void process(final InputStream in, final OutputStream out) throws IOException {
-            final String lineByLineEvaluationMode = context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue();
             try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192);
                  final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
 
                 String precedingLine = demarcator.nextLine();
                 String succeedingLine;
-                Matcher matcher = null;
 
                 boolean firstLine = true;
 
                 while (null != (succeedingLine = demarcator.nextLine())) {
-                    matcher = null != searchPattern ? searchPattern.matcher(precedingLine) : null;
                     if(firstLine && lineByLineEvaluationMode.equalsIgnoreCase(FIRST_LINE)){
-                        replacementStrategyExecutor.replaceInLine(bw, precedingLine, matcher, searchPattern, context, flowFile);
+                        replaceLine.apply(bw, precedingLine);
                         firstLine = false;
                     } else if(firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE)) {
                         firstLine = false;
@@ -757,7 +735,7 @@ public class ReplaceText extends AbstractProcessor {
                         || lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_LAST_LINE)
                         || lineByLineEvaluationMode.equalsIgnoreCase(ALL)
                         || (!firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE))) {
-                        replacementStrategyExecutor.replaceInLine(bw, precedingLine, matcher, searchPattern, context, flowFile);
+                        replaceLine.apply(bw, precedingLine);
                     } else {
                         bw.write(precedingLine);
                     }
@@ -771,8 +749,7 @@ public class ReplaceText extends AbstractProcessor {
                         || (firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE))) {
                         bw.write(precedingLine);
                     } else {
-                        matcher = null != searchPattern ? searchPattern.matcher(precedingLine) : null;
-                        replacementStrategyExecutor.replaceInLine(bw, precedingLine, matcher, searchPattern, context, flowFile);
+                        replaceLine.apply(bw, precedingLine);
                     }
                 }
             }