You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/04/08 07:33:40 UTC
[nifi] 02/02: NIFI-5979 Add Line-by-Line Evaluation Mode to
ReplaceText
This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 1c588f10b276dc8d2937b2c78caf0e034bc43fa8
Author: Koji Kawamura <ij...@apache.org>
AuthorDate: Mon Apr 8 11:35:30 2019 +0900
NIFI-5979 Add Line-by-Line Evaluation Mode to ReplaceText
Refactored to use functions to better handle strategy specific variables
via closure.
---
.../nifi/processors/standard/ReplaceText.java | 233 ++++++++++-----------
1 file changed, 105 insertions(+), 128 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index 851770e..773458b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@ -53,7 +53,6 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.LineDemarcator;
import org.apache.nifi.util.StopWatch;
-import javax.annotation.Nullable;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
@@ -386,31 +385,28 @@ public class ReplaceText extends AbstractProcessor {
}
});
} else {
- flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, null));
- }
+ flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
+ ((bw, oneLine) -> {
+ // We need to determine what line ending was used and use that after our replacement value.
+ lineEndingBuilder.setLength(0);
+ for (int i = oneLine.length() - 1; i >= 0; i--) {
+ final char c = oneLine.charAt(i);
+ if (c == '\r' || c == '\n') {
+ lineEndingBuilder.append(c);
+ } else {
+ break;
+ }
+ }
- return flowFile;
- }
+ bw.write(replacementValue);
- public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
- final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
- final StringBuilder lineEndingBuilder = new StringBuilder(2);
- // We need to determine what line ending was used and use that after our replacement value.
- lineEndingBuilder.setLength(0);
- for (int i = oneLine.length() - 1; i >= 0; i--) {
- final char c = oneLine.charAt(i);
- if (c == '\r' || c == '\n') {
- lineEndingBuilder.append(c);
- } else {
- break;
- }
+ // Preserve original line endings. Reverse string because we iterated over original line ending in reverse order, appending to builder.
+ // So if builder has multiple characters, they are now reversed from the original string's ordering.
+ bw.write(lineEndingBuilder.reverse().toString());
+ })));
}
- bw.write(replacementValue);
-
- // Preserve original line endings. Reverse string because we iterated over original line ending in reverse order, appending to builder.
- // So if builder has multiple characters, they are now reversed from the original string's ordering.
- bw.write(lineEndingBuilder.reverse().toString());
+ return flowFile;
}
@Override
@@ -433,7 +429,8 @@ public class ReplaceText extends AbstractProcessor {
}
});
} else {
- flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, null));
+ flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
+ (bw, oneLine) -> bw.write(replacementValue.concat(oneLine))));
}
return flowFile;
}
@@ -443,11 +440,6 @@ public class ReplaceText extends AbstractProcessor {
return false;
}
- @Override
- public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
- final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
- bw.write(replacementValue.concat(oneLine));
- }
}
private class AppendReplace implements ReplacementStrategyExecutor {
@@ -465,36 +457,34 @@ public class ReplaceText extends AbstractProcessor {
}
});
} else {
- flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, null));
- }
- return flowFile;
- }
-
- public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
- String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
- // we need to find the first carriage return or new-line so that we can append the new value
- // before the line separate. However, we don't want to do this using a regular expression due
- // to performance concerns. So we will find the first occurrence of either \r or \n and use
- // that to insert the replacement value.
- boolean foundNewLine = false;
- for (int i = 0; i < oneLine.length(); i++) {
- final char c = oneLine.charAt(i);
- if (foundNewLine) {
- bw.write(c);
- continue;
- }
-
- if (c == '\r' || c == '\n') {
- bw.write(replacementValue);
- foundNewLine = true;
- }
-
- bw.write(c);
- }
+ flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
+ (bw, oneLine) -> {
+ // we need to find the first carriage return or new-line so that we can append the new value
+ // before the line separate. However, we don't want to do this using a regular expression due
+ // to performance concerns. So we will find the first occurrence of either \r or \n and use
+ // that to insert the replacement value.
+ boolean foundNewLine = false;
+ for (int i = 0; i < oneLine.length(); i++) {
+ final char c = oneLine.charAt(i);
+ if (foundNewLine) {
+ bw.write(c);
+ continue;
+ }
+
+ if (c == '\r' || c == '\n') {
+ bw.write(replacementValue);
+ foundNewLine = true;
+ }
+
+ bw.write(c);
+ }
- if (!foundNewLine) {
- bw.write(replacementValue);
+ if (!foundNewLine) {
+ bw.write(replacementValue);
+ }
+ }));
}
+ return flowFile;
}
@Override
@@ -580,46 +570,42 @@ public class ReplaceText extends AbstractProcessor {
}
} else {
- updatedFlowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, searchPattern));
- }
+ final Matcher matcher = searchPattern.matcher("");
+ updatedFlowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
+ (bw, oneLine) -> {
+ additionalAttrs.clear();
+ matcher.reset(oneLine);
- return updatedFlowFile;
- }
+ int matches = 0;
+ StringBuffer sb = new StringBuffer();
- public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
- additionalAttrs.clear();
- if (matcher == null) {
- matcher = searchPattern.matcher(oneLine);
- } else {
- matcher.reset(oneLine);
- }
+ while (matcher.find()) {
+ matches++;
- int matches = 0;
- StringBuffer sb = new StringBuffer();
+ for (int i=0; i <= matcher.groupCount(); i++) {
+ additionalAttrs.put("$" + i, matcher.group(i));
+ }
- while (matcher.find()) {
- matches++;
+ String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue();
+ replacement = escapeLiteralBackReferences(replacement, numCapturingGroups);
+ String replacementFinal = normalizeReplacementString(replacement);
- for (int i=0; i <= matcher.groupCount(); i++) {
- additionalAttrs.put("$" + i, matcher.group(i));
- }
+ matcher.appendReplacement(sb, replacementFinal);
+ }
- String replacement = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, additionalAttrs, escapeBackRefDecorator).getValue();
- replacement = escapeLiteralBackReferences(replacement, numCapturingGroups);
- String replacementFinal = normalizeReplacementString(replacement);
+ if (matches > 0) {
+ matcher.appendTail(sb);
- matcher.appendReplacement(sb, replacementFinal);
+ final String updatedValue = sb.toString();
+ bw.write(updatedValue);
+ } else {
+ // No match. Just write out the line as it was.
+ bw.write(oneLine);
+ }
+ }));
}
- if (matches > 0) {
- matcher.appendTail(sb);
-
- final String updatedValue = sb.toString();
- bw.write(updatedValue);
- } else {
- // No match. Just write out the line as it was.
- bw.write(oneLine);
- }
+ return updatedFlowFile;
}
@Override
@@ -657,30 +643,28 @@ public class ReplaceText extends AbstractProcessor {
} else {
final Pattern searchPattern = Pattern.compile(searchValue, Pattern.LITERAL);
- flowFile = session.write(flowFile, new StreamReplaceCallback(this, charset, maxBufferSize, context, flowFile, searchPattern));
- }
- return flowFile;
- }
-
- public void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException {
- String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
- int matches = 0;
- int lastEnd = 0;
-
+ flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
+ (bw, oneLine) -> {
+ int matches = 0;
+ int lastEnd = 0;
- while (matcher.find()) {
- bw.write(oneLine, lastEnd, matcher.start() - lastEnd);
- bw.write(replacementValue);
- matches++;
+ final Matcher matcher = searchPattern.matcher(oneLine);
+ while (matcher.find()) {
+ bw.write(oneLine, lastEnd, matcher.start() - lastEnd);
+ bw.write(replacementValue);
+ matches++;
- lastEnd = matcher.end();
- }
+ lastEnd = matcher.end();
+ }
- if (matches > 0) {
- bw.write(oneLine, lastEnd, oneLine.length() - lastEnd);
- } else {
- bw.write(oneLine);
+ if (matches > 0) {
+ bw.write(oneLine, lastEnd, oneLine.length() - lastEnd);
+ } else {
+ bw.write(oneLine);
+ }
+ }));
}
+ return flowFile;
}
@Override
@@ -706,49 +690,43 @@ public class ReplaceText extends AbstractProcessor {
FlowFile replace(FlowFile flowFile, ProcessSession session, ProcessContext context, String evaluateMode, Charset charset, int maxBufferSize);
boolean isAllDataBufferedForEntireText();
+ }
- void replaceInLine(BufferedWriter bw, String oneLine, @Nullable Matcher matcher, @Nullable Pattern searchPattern, ProcessContext context, FlowFile flowFile) throws IOException ;
+ @FunctionalInterface
+ private interface ReplaceLine {
+ void apply(BufferedWriter bw, String oneLine) throws IOException;
}
private class StreamReplaceCallback implements StreamCallback {
private final Charset charset;
private final int maxBufferSize;
- private final ProcessContext context;
- private final FlowFile flowFile;
- private final ReplacementStrategyExecutor replacementStrategyExecutor;
- private final Pattern searchPattern;
+ private final String lineByLineEvaluationMode;
+ private final ReplaceLine replaceLine;
- public StreamReplaceCallback(ReplacementStrategyExecutor replacementStrategyExecutor,
- Charset charset,
+ private StreamReplaceCallback(Charset charset,
int maxBufferSize,
- ProcessContext context,
- FlowFile flowFile,
- @Nullable Pattern searchPattern) {
- this.replacementStrategyExecutor = replacementStrategyExecutor;
+ String lineByLineEvaluationMode,
+ ReplaceLine replaceLine) {
this.charset = charset;
this.maxBufferSize = maxBufferSize;
- this.context = context;
- this.flowFile = flowFile;
- this.searchPattern = searchPattern;
+ this.lineByLineEvaluationMode = lineByLineEvaluationMode;
+ this.replaceLine = replaceLine;
}
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
- final String lineByLineEvaluationMode = context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue();
try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192);
final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
String precedingLine = demarcator.nextLine();
String succeedingLine;
- Matcher matcher = null;
boolean firstLine = true;
while (null != (succeedingLine = demarcator.nextLine())) {
- matcher = null != searchPattern ? searchPattern.matcher(precedingLine) : null;
if(firstLine && lineByLineEvaluationMode.equalsIgnoreCase(FIRST_LINE)){
- replacementStrategyExecutor.replaceInLine(bw, precedingLine, matcher, searchPattern, context, flowFile);
+ replaceLine.apply(bw, precedingLine);
firstLine = false;
} else if(firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE)) {
firstLine = false;
@@ -757,7 +735,7 @@ public class ReplaceText extends AbstractProcessor {
|| lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_LAST_LINE)
|| lineByLineEvaluationMode.equalsIgnoreCase(ALL)
|| (!firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE))) {
- replacementStrategyExecutor.replaceInLine(bw, precedingLine, matcher, searchPattern, context, flowFile);
+ replaceLine.apply(bw, precedingLine);
} else {
bw.write(precedingLine);
}
@@ -771,8 +749,7 @@ public class ReplaceText extends AbstractProcessor {
|| (firstLine && lineByLineEvaluationMode.equalsIgnoreCase(EXCEPT_FIRST_LINE))) {
bw.write(precedingLine);
} else {
- matcher = null != searchPattern ? searchPattern.matcher(precedingLine) : null;
- replacementStrategyExecutor.replaceInLine(bw, precedingLine, matcher, searchPattern, context, flowFile);
+ replaceLine.apply(bw, precedingLine);
}
}
}