You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pw...@apache.org on 2018/11/09 21:32:22 UTC
[1/2] nifi git commit: NIFI-5718: Implemented LineDemarcator and
removed NLKBufferedReader in order to improve performance
Repository: nifi
Updated Branches:
refs/heads/master 765df6781 -> 830f7aa84
NIFI-5718: Implemented LineDemarcator and removed NLKBufferedReader in order to improve performance
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/564ad0cd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/564ad0cd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/564ad0cd
Branch: refs/heads/master
Commit: 564ad0cd71e56b426e02757c7a0a70e28ccbea12
Parents: 765df67
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Oct 18 12:05:16 2018 -0400
Committer: Peter Wicks <pa...@gmail.com>
Committed: Fri Nov 9 14:26:20 2018 -0700
----------------------------------------------------------------------
.../nifi/stream/io/RepeatingInputStream.java | 103 +++++++++++++
.../stream/io/util/AbstractTextDemarcator.java | 147 +++++++++++++++++++
.../nifi/stream/io/util/LineDemarcator.java | 116 +++++++++++++++
.../nifi/stream/io/util/TestLineDemarcator.java | 120 +++++++++++++++
.../nifi/processors/standard/ReplaceText.java | 71 +++++----
.../nifi/processors/standard/RouteText.java | 49 +++----
.../standard/util/NLKBufferedReader.java | 76 ----------
7 files changed, 549 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/RepeatingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/RepeatingInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/RepeatingInputStream.java
new file mode 100644
index 0000000..f542741
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/RepeatingInputStream.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+
+public class RepeatingInputStream extends InputStream {
+ private final byte[] toRepeat;
+ private final int maxIterations;
+
+ private InputStream bais;
+ private int repeatCount;
+
+
+ public RepeatingInputStream(final byte[] toRepeat, final int iterations) {
+ if (iterations < 1) {
+ throw new IllegalArgumentException();
+ }
+ if (Objects.requireNonNull(toRepeat).length == 0) {
+ throw new IllegalArgumentException();
+ }
+
+ this.toRepeat = toRepeat;
+ this.maxIterations = iterations;
+
+ repeat();
+ bais = new ByteArrayInputStream(toRepeat);
+ repeatCount = 1;
+ }
+
+ @Override
+ public int read() throws IOException {
+ final int value = bais.read();
+ if (value > -1) {
+ return value;
+ }
+
+ final boolean repeated = repeat();
+ if (repeated) {
+ return bais.read();
+ }
+
+ return -1;
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ final int value = bais.read(b, off, len);
+ if (value > -1) {
+ return value;
+ }
+
+ final boolean repeated = repeat();
+ if (repeated) {
+ return bais.read(b, off, len);
+ }
+
+ return -1;
+ }
+
+ @Override
+ public int read(final byte[] b) throws IOException {
+ final int value = bais.read(b);
+ if (value > -1) {
+ return value;
+ }
+
+ final boolean repeated = repeat();
+ if (repeated) {
+ return bais.read(b);
+ }
+
+ return -1;
+ }
+
+ private boolean repeat() {
+ if (repeatCount >= maxIterations) {
+ return false;
+ }
+
+ repeatCount++;
+ bais = new ByteArrayInputStream(toRepeat);
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractTextDemarcator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractTextDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractTextDemarcator.java
new file mode 100644
index 0000000..f10f66d
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractTextDemarcator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import org.apache.nifi.stream.io.exception.TokenTooLargeException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.nio.BufferOverflowException;
+
+public abstract class AbstractTextDemarcator implements Closeable {
+
+ private static final int INIT_BUFFER_SIZE = 8192;
+
+ private final Reader reader;
+
+ /*
+ * The maximum allowed size of the token. In the event such size is exceeded
+ * TokenTooLargeException is thrown.
+ */
+ private final int maxDataSize;
+
+ /*
+ * Buffer into which the bytes are read from the provided stream. The size
+ * of the buffer is defined by the 'initialBufferSize' provided in the
+ * constructor or defaults to the value of INIT_BUFFER_SIZE constant.
+ */
+ char[] buffer;
+
+ /*
+ * Starting offset of the demarcated token within the current 'buffer'.
+ */
+ int index;
+
+ /*
+ * Starting offset of the demarcated token within the current 'buffer'. Keep
+ * in mind that while most of the time it is the same as the 'index' it may
+ * also have a value of 0 at which point it serves as a signal to the fill()
+ * operation that buffer needs to be expended if end of token is not reached
+ * (see fill() operation for more details).
+ */
+ int mark;
+
+ /*
+ * The length of the bytes valid for reading. It is different from the
+ * buffer length, since this number may be smaller (e.g., at he end of the
+ * stream) then actual buffer length. It is set by the fill() operation
+ * every time more bytes read into buffer.
+ */
+ int availableBytesLength;
+
+ /**
+ * Constructs an instance of demarcator with provided {@link InputStream}
+ * and max buffer size. Each demarcated token must fit within max buffer
+ * size, otherwise the exception will be raised.
+ */
+ AbstractTextDemarcator(Reader reader, int maxDataSize) {
+ this(reader, maxDataSize, INIT_BUFFER_SIZE);
+ }
+
+ /**
+ * Constructs an instance of demarcator with provided {@link InputStream}
+ * and max buffer size and initial buffer size. Each demarcated token must
+ * fit within max buffer size, otherwise the exception will be raised.
+ */
+ AbstractTextDemarcator(Reader reader, int maxDataSize, int initialBufferSize) {
+ this.validate(reader, maxDataSize, initialBufferSize);
+ this.reader = reader;
+ this.buffer = new char[initialBufferSize];
+ this.maxDataSize = maxDataSize;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ /**
+ * Will fill the current buffer from current 'index' position, expanding it
+ * and or shuffling it if necessary. If buffer exceeds max buffer size a
+ * {@link TokenTooLargeException} will be thrown.
+ *
+ * @throws IOException
+ * if unable to read from the stream
+ */
+ void fill() throws IOException {
+ if (this.index >= this.buffer.length) {
+ if (this.mark == 0) { // expand
+ long expandedSize = Math.min(this.buffer.length * 2, this.buffer.length + 1_048_576);
+ if (expandedSize > maxDataSize) {
+ throw new BufferOverflowException();
+ }
+
+ char[] newBuff = new char[(int) expandedSize];
+ System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
+ this.buffer = newBuff;
+ } else { // shift the data left in the buffer
+ int length = this.index - this.mark;
+ System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
+ this.index = length;
+ this.mark = 0;
+ }
+ }
+
+ int bytesRead;
+ /*
+ * The do/while pattern is used here similar to the way it is used in
+ * BufferedReader essentially protecting from assuming the EOS until it
+ * actually is since not every implementation of InputStream guarantees
+ * that bytes are always available while the stream is open.
+ */
+ do {
+ bytesRead = reader.read(this.buffer, this.index, this.buffer.length - this.index);
+ } while (bytesRead == 0);
+ this.availableBytesLength = bytesRead != -1 ? this.index + bytesRead : -1;
+ }
+
+
+ /**
+ * Validates prerequisites for constructor arguments
+ */
+ private void validate(Reader reader, int maxDataSize, int initialBufferSize) {
+ if (reader == null) {
+ throw new IllegalArgumentException("'reader' must not be null");
+ } else if (maxDataSize <= 0) {
+ throw new IllegalArgumentException("'maxDataSize' must be > 0");
+ } else if (initialBufferSize <= 0) {
+ throw new IllegalArgumentException("'initialBufferSize' must be > 0");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/LineDemarcator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/LineDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/LineDemarcator.java
new file mode 100644
index 0000000..c08b1a4
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/LineDemarcator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.Charset;
+
+/**
+ * A demarcator that scans an InputStream for line endings (carriage returns and new lines) and returns
+ * lines of text one-at-a-time. This is similar to BufferedReader but with a very important distinction: while
+ * BufferedReader returns the lines of text after stripping off any line endings, this class returns text including the
+ * line endings. So, for example, if the following text is provided:
+ *
+ * <code>ABC\rXYZ\nABCXYZ\r\nhello</code>
+ *
+ * Then calls to {@link #nextLine()} will result in 4 String values being returned:
+ *
+ * <ul>
+ * <li><code>ABC\r</code></li>
+ * <li><code>XYZ\n</code></li>
+ * <li><code>ABCXYZ\r\n</code></li>
+ * <li><code>hello</code></li>
+ * </ul>
+ *
+ * All subsequent calls to {@link #nextLine()} will return <code>null</code>.
+ */
+public class LineDemarcator extends AbstractTextDemarcator {
+ private static final char CARRIAGE_RETURN = '\r';
+ private static final char NEW_LINE = '\n';
+
+ private char lastChar;
+
+ public LineDemarcator(final InputStream in, final Charset charset, final int maxDataSize, final int initialBufferSize) {
+ this(new InputStreamReader(in, charset), maxDataSize, initialBufferSize);
+ }
+
+ public LineDemarcator(final Reader reader, final int maxDataSize, final int initialBufferSize) {
+ super(reader, maxDataSize, initialBufferSize);
+ }
+
+ /**
+ * Will read the next line of text from the {@link InputStream} returning null
+ * when it reaches the end of the stream.
+ *
+ * @throws IOException if unable to read from the stream
+ */
+ public String nextLine() throws IOException {
+ while (this.availableBytesLength != -1) {
+ if (this.index >= this.availableBytesLength) {
+ this.fill();
+ }
+
+ if (this.availableBytesLength != -1) {
+ char charVal;
+ int i;
+ for (i = this.index; i < this.availableBytesLength; i++) {
+ charVal = this.buffer[i];
+
+ try {
+ if (charVal == NEW_LINE) {
+ this.index = i + 1;
+
+ final int size = this.index - this.mark;
+ final String line = new String(this.buffer, mark, size);
+
+ this.mark = this.index;
+ return line;
+ } else if (lastChar == CARRIAGE_RETURN) {
+ // Point this.index to i+1 because that's the next byte that we want to consume.
+ this.index = i + 1;
+
+ // Size is equal to where the line began, up to index-1 because we don't want to consume the last byte encountered.
+ final int size = this.index - 1 - this.mark;
+ final String line = new String(this.buffer, mark, size);
+
+ // set 'mark' to index - 1 because we don't want to consume the last byte that we've encountered, since we're basing our
+ // line on the previous byte.
+ this.mark = this.index - 1;
+ return line;
+ }
+ } finally {
+ lastChar = charVal;
+ }
+ }
+
+ this.index = i;
+ } else {
+ final int size = this.index - this.mark;
+ if (size == 0) {
+ return null;
+ }
+
+ return new String(this.buffer, mark, size);
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java
new file mode 100644
index 0000000..768a60a
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestLineDemarcator {
+
+ @Test
+ public void testSingleCharacterLines() throws IOException {
+ final String input = "A\nB\nC\rD\r\nE\r\nF\r\rG";
+
+ final List<String> lines = getLines(input);
+ assertEquals(Arrays.asList("A\n", "B\n", "C\r", "D\r\n", "E\r\n", "F\r", "\r", "G"), lines);
+ }
+
+
+ @Test
+ public void testEmptyStream() throws IOException {
+ final List<String> lines = getLines("");
+ assertEquals(Collections.emptyList(), lines);
+ }
+
+ @Test
+ public void testOnlyEmptyLines() throws IOException {
+ final String input = "\r\r\r\n\n\n\r\n";
+
+ final List<String> lines = getLines(input);
+ assertEquals(Arrays.asList("\r", "\r", "\r\n", "\n", "\n", "\r\n"), lines);
+ }
+
+ @Test
+ public void testOnBufferSplit() throws IOException {
+ final String input = "ABC\r\nXYZ";
+ final List<String> lines = getLines(input, 10, 4);
+
+ assertEquals(Arrays.asList("ABC\r\n", "XYZ"), lines);
+ }
+
+ @Test
+ public void testEndsWithCarriageReturn() throws IOException {
+ final List<String> lines = getLines("ABC\r");
+ assertEquals(Arrays.asList("ABC\r"), lines);
+ }
+
+ @Test
+ public void testEndsWithNewLine() throws IOException {
+ final List<String> lines = getLines("ABC\n");
+ assertEquals(Arrays.asList("ABC\n"), lines);
+ }
+
+ @Test
+ public void testEndsWithCarriageReturnNewLine() throws IOException {
+ final List<String> lines = getLines("ABC\r\n");
+ assertEquals(Arrays.asList("ABC\r\n"), lines);
+ }
+
+ @Test
+ public void testReadAheadInIsEol() throws IOException {
+ final String input = "he\ra-to-a\rb-to-b\rc-to-c\r\nd-to-d";
+ final List<String> lines = getLines(input, 10, 10);
+
+ assertEquals(Arrays.asList("he\r", "a-to-a\r", "b-to-b\r", "c-to-c\r\n", "d-to-d"), lines);
+ }
+
+ @Test
+ public void testFirstCharMatchOnly() throws IOException {
+ final List<String> lines = getLines("\nThe quick brown fox jumped over the lazy dog.");
+ assertEquals(Arrays.asList("\n", "The quick brown fox jumped over the lazy dog."), lines);
+ }
+
+ private List<String> getLines(final String text) throws IOException {
+ return getLines(text, 8192, 8192);
+ }
+
+ private List<String> getLines(final String text, final int maxDataSize, final int bufferSize) throws IOException {
+ final byte[] bytes = text.getBytes(StandardCharsets.UTF_8);
+
+ final List<String> lines = new ArrayList<>();
+
+ try (final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ final Reader reader = new InputStreamReader(bais, StandardCharsets.UTF_8);
+ final LineDemarcator demarcator = new LineDemarcator(reader, maxDataSize, bufferSize)) {
+
+ String line;
+ while ((line = demarcator.nextLine()) != null) {
+ lines.add(line);
+ }
+ }
+
+ return lines;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index 3108a6c..c6aec0c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@ -49,14 +49,13 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.NLKBufferedReader;
import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.util.LineDemarcator;
import org.apache.nifi.util.StopWatch;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
@@ -342,7 +341,7 @@ public class ReplaceText extends AbstractProcessor {
final StringBuilder sb = new StringBuilder(value.length() + 1);
final int groupStart = backRefMatcher.start(1);
- sb.append(value.substring(0, groupStart - 1));
+ sb.append(value, 0, groupStart - 1);
sb.append("\\");
sb.append(value.substring(groupStart - 1));
value = sb.toString();
@@ -370,11 +369,11 @@ public class ReplaceText extends AbstractProcessor {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
- try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
+ try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192);
+ final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
String line;
- while ((line = br.readLine()) != null) {
+ while ((line = demarcator.nextLine()) != null) {
// We need to determine what line ending was used and use that after our replacement value.
lineEndingBuilder.setLength(0);
for (int i = line.length() - 1; i >= 0; i--) {
@@ -423,10 +422,11 @@ public class ReplaceText extends AbstractProcessor {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
- try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
+ try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192);
+ final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
+
String oneLine;
- while (null != (oneLine = br.readLine())) {
+ while (null != (oneLine = demarcator.nextLine())) {
final String updatedValue = replacementValue.concat(oneLine);
bw.write(updatedValue);
}
@@ -461,10 +461,11 @@ public class ReplaceText extends AbstractProcessor {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
- try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset));) {
+ try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192);
+ final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
+
String oneLine;
- while (null != (oneLine = br.readLine())) {
+ while (null != (oneLine = demarcator.nextLine())) {
// we need to find the first carriage return or new-line so that we can append the new value
// before the line separate. However, we don't want to do this using a regular expression due
// to performance concerns. So we will find the first occurrence of either \r or \n and use
@@ -582,14 +583,15 @@ public class ReplaceText extends AbstractProcessor {
updatedFlowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
- try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
+ try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, 8192);
+ final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
+
String oneLine;
final StringBuffer sb = new StringBuffer();
Matcher matcher = null;
- while (null != (oneLine = br.readLine())) {
+ while (null != (oneLine = demarcator.nextLine())) {
additionalAttrs.clear();
if (matcher == null) {
matcher = searchPattern.matcher(oneLine);
@@ -649,14 +651,7 @@ public class ReplaceText extends AbstractProcessor {
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
-
- final AttributeValueDecorator quotedAttributeDecorator = new AttributeValueDecorator() {
- @Override
- public String decorate(final String attributeValue) {
- return Pattern.quote(attributeValue);
- }
- };
-
+ final AttributeValueDecorator quotedAttributeDecorator = Pattern::quote;
final String searchValue = context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, quotedAttributeDecorator).getValue();
final int flowFileSize = (int) flowFile.getSize();
@@ -672,16 +667,34 @@ public class ReplaceText extends AbstractProcessor {
}
});
} else {
+ final int initialBufferSize = (int) Math.min(flowFile.getSize(), 8192);
+ final Pattern searchPattern = Pattern.compile(searchValue, Pattern.LITERAL);
+
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
- try (NLKBufferedReader br = new NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
- BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
+ try (final LineDemarcator demarcator = new LineDemarcator(in, charset, maxBufferSize, initialBufferSize);
+ final BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out, charset))) {
+
String oneLine;
- while (null != (oneLine = br.readLine())) {
- // Interpreting the search and replacement values as char sequences
- final String updatedValue = oneLine.replace(searchValue, replacementValue);
- bw.write(updatedValue);
+ while (null != (oneLine = demarcator.nextLine())) {
+ int matches = 0;
+ int lastEnd = 0;
+
+ final Matcher matcher = searchPattern.matcher(oneLine);
+ while (matcher.find()) {
+ bw.write(oneLine, lastEnd, matcher.start() - lastEnd);
+ bw.write(replacementValue);
+ matches++;
+
+ lastEnd = matcher.end();
+ }
+
+ if (matches > 0) {
+ bw.write(oneLine, lastEnd, oneLine.length() - lastEnd);
+ } else {
+ bw.write(oneLine);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
index a87434e..4fbbce6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
@@ -19,24 +19,6 @@ package org.apache.nifi.processors.standard;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.Reader;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
@@ -68,8 +50,24 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.NLKBufferedReader;
+import org.apache.nifi.stream.io.util.LineDemarcator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
@EventDriven
@SideEffectFree
@@ -291,7 +289,7 @@ public class RouteText extends AbstractProcessor {
final Set<String> allDynamicProps = this.dynamicPropertyNames;
final Set<Relationship> newRelationships = new HashSet<>();
final String routeStrategy = configuredRouteStrategy;
- if (ROUTE_TO_MATCHING_PROPERTY_NAME.equals(routeStrategy)) {
+ if (ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy)) {
for (final String propName : allDynamicProps) {
newRelationships.add(new Relationship.Builder().name(propName).build());
}
@@ -419,14 +417,13 @@ public class RouteText extends AbstractProcessor {
session.read(originalFlowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
- try (final Reader inReader = new InputStreamReader(in, charset);
- final NLKBufferedReader reader = new NLKBufferedReader(inReader)) {
+ try (final LineDemarcator demarcator = new LineDemarcator(in, charset, Integer.MAX_VALUE, 8192)) {
final Map<String, String> variables = new HashMap<>(2);
int lineCount = 0;
String line;
- while ((line = reader.readLine()) != null) {
+ while ((line = demarcator.nextLine()) != null) {
final String matchLine;
if (trim) {
@@ -550,11 +547,7 @@ public class RouteText extends AbstractProcessor {
private void appendLine(final ProcessSession session, final Map<Relationship, Map<Group, FlowFile>> flowFileMap, final Relationship relationship,
final FlowFile original, final String line, final Charset charset, final Group group) {
- Map<Group, FlowFile> groupToFlowFileMap = flowFileMap.get(relationship);
- if (groupToFlowFileMap == null) {
- groupToFlowFileMap = new HashMap<>();
- flowFileMap.put(relationship, groupToFlowFileMap);
- }
+ final Map<Group, FlowFile> groupToFlowFileMap = flowFileMap.computeIfAbsent(relationship, k -> new HashMap<>());
FlowFile flowFile = groupToFlowFileMap.get(group);
if (flowFile == null) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/NLKBufferedReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/NLKBufferedReader.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/NLKBufferedReader.java
deleted file mode 100644
index df8847f..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/NLKBufferedReader.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.Reader;
-
-//NLKBufferedReader = New Line Keeper Buffered Reader
-public class NLKBufferedReader extends BufferedReader {
- public NLKBufferedReader(Reader in, int sz) {
- super(in, sz);
- }
-
- public NLKBufferedReader(Reader in) {
- super(in);
- }
-
- /**
- * Reads a line of text in the same manner as {@link BufferedReader} except that any line-termination characters (\r and \n) are preserved in the String
- * that is returned from this reader, whereas {@link BufferedReader} will strip those out.
- *
- * @return A String containing the next line of text (including any line-termination characters) from the underlying Reader, or null if no more data is available
- *
- * @throws IOException If unable to read from teh underlying Reader
- */
- @Override
- public String readLine() throws IOException {
- final StringBuilder stringBuilder = new StringBuilder();
-
- int intchar = read();
- while (intchar != -1) {
- final char c = (char) intchar;
- stringBuilder.append(c);
-
- if (c == '\n') {
- break;
- } else if (c == '\r') {
- // Peek at next character, check if it's \n
- int charPeek = peek();
- if (charPeek == '\n') {
- stringBuilder.append((char) read());
- }
-
- break;
- }
-
- intchar = read();
- }
-
- final String result = stringBuilder.toString();
- return (result.length() == 0) ? null : result;
- }
-
- public int peek() throws IOException {
- mark(1);
- int readByte = read();
- reset();
-
- return readByte;
- }
-}
[2/2] nifi git commit: NIFI-5718: Added performance-based unit test
(Ignored) for LineDemarcator
Posted by pw...@apache.org.
NIFI-5718: Added performance-based unit test (Ignored) for LineDemarcator
Signed-off-by: Peter Wicks <pa...@gmail.com>
This closes #3100.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/830f7aa8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/830f7aa8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/830f7aa8
Branch: refs/heads/master
Commit: 830f7aa84d2f61781422daa43648a09c3a08f392
Parents: 564ad0c
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Nov 9 12:04:43 2018 -0500
Committer: Peter Wicks <pa...@gmail.com>
Committed: Fri Nov 9 14:27:32 2018 -0700
----------------------------------------------------------------------
.../nifi/stream/io/util/TestLineDemarcator.java | 28 ++++++++++++++++++++
1 file changed, 28 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/830f7aa8/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java
index 768a60a..85cf85e 100644
--- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java
@@ -16,10 +16,13 @@
*/
package org.apache.nifi.stream.io.util;
+import org.apache.nifi.stream.io.RepeatingInputStream;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
@@ -27,6 +30,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -95,6 +99,30 @@ public class TestLineDemarcator {
assertEquals(Arrays.asList("\n", "The quick brown fox jumped over the lazy dog."), lines);
}
+ @Test
+ @Ignore("Intended only for manual testing. While this can take a while to run, it can be very helpful for manual testing before and after a change to the class. However, we don't want this to " +
+ "run in automated tests because we have no way to compare from one run to another, so it will only slow down automated tests.")
+ public void testPerformance() throws IOException {
+ final String lines = "The\nquick\nbrown\nfox\njumped\nover\nthe\nlazy\ndog.\r\n\n";
+ final byte[] bytes = lines.getBytes(StandardCharsets.UTF_8);
+
+ for (int i=0; i < 100; i++) {
+ final long start = System.nanoTime();
+
+ long count = 0;
+ try (final InputStream in = new RepeatingInputStream(bytes, 1_000_000);
+ final LineDemarcator demarcator = new LineDemarcator(in, StandardCharsets.UTF_8, 8192, 8192)) {
+
+ while (demarcator.nextLine() != null) {
+ count++;
+ }
+ }
+
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ System.out.println("Took " + millis + " millis to demarcate " + count + " lines");
+ }
+ }
+
private List<String> getLines(final String text) throws IOException {
return getLines(text, 8192, 8192);
}