You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/11/11 20:45:56 UTC
[2/2] nifi git commit: NIFI-2851 initial commit of perf improvements
on SplitText
NIFI-2851 initial commit of perf improvements on SplitText
- introduced org.apache.nifi.stream.io.util.TextLineDemarcator
- refactored SplitText to use org.apache.nifi.stream.io.util.TextLineDemarcator
- updated SplitText's capability discription to provide more clarity around splits with headers.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/41f519e8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/41f519e8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/41f519e8
Branch: refs/heads/master
Commit: 41f519e84cb5d0bf8be4ce93e26a042d16cda273
Parents: b73ba7f
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Fri Oct 7 11:37:32 2016 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Nov 11 15:45:40 2016 -0500
----------------------------------------------------------------------
.../nifi/stream/io/util/TextLineDemarcator.java | 253 ++++++
.../stream/io/util/TextLineDemarcatorTest.java | 242 ++++++
.../nifi/processors/standard/SplitText.java | 770 +++++++------------
.../nifi/processors/standard/TestSplitText.java | 20 +-
4 files changed, 772 insertions(+), 513 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/41f519e8/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java
new file mode 100644
index 0000000..e4c213a
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Implementation of demarcator of text lines in the provided
+ * {@link InputStream}. It works similar to the {@link BufferedReader} and its
+ * {@link BufferedReader#readLine()} methods except that it does not create a
+ * String representing the text line and instead returns the offset info for the
+ * computed text line. See {@link #nextOffsetInfo()} and
+ * {@link #nextOffsetInfo(byte[])} for more details.
+ * <p>
+ * This class is NOT thread-safe.
+ * </p>
+ */
+public class TextLineDemarcator {
+
+ private final static int INIT_BUFFER_SIZE = 8192;
+
+ private final InputStream is;
+
+ private final int initialBufferSize;
+
+ private byte[] buffer;
+
+ private int index;
+
+ private int mark;
+
+ private long offset;
+
+ private int bufferLength;
+
+ /**
+ * Constructs an instance of demarcator with provided {@link InputStream}
+ * and default buffer size.
+ */
+ public TextLineDemarcator(InputStream is) {
+ this(is, INIT_BUFFER_SIZE);
+ }
+
+ /**
+ * Constructs an instance of demarcator with provided {@link InputStream}
+ * and initial buffer size.
+ */
+ public TextLineDemarcator(InputStream is, int initialBufferSize) {
+ if (is == null) {
+ throw new IllegalArgumentException("'is' must not be null.");
+ }
+ if (initialBufferSize < 1) {
+ throw new IllegalArgumentException("'initialBufferSize' must be > 0.");
+ }
+ this.is = is;
+ this.initialBufferSize = initialBufferSize;
+ this.buffer = new byte[initialBufferSize];
+ }
+
+ /**
+ * Will compute the next <i>offset info</i> for a text line (line terminated
+ * by either '\r', '\n' or '\r\n'). <br>
+ * The <i>offset info</i> computed and returned as {@link OffsetInfo} where
+ * {@link OffsetInfo#isStartsWithMatch()} will always return true.
+ *
+ * @return offset info
+ */
+ public OffsetInfo nextOffsetInfo() {
+ return this.nextOffsetInfo(null);
+ }
+
+ /**
+ * Will compute the next <i>offset info</i> for a text line (line terminated
+ * by either '\r', '\n' or '\r\n'). <br>
+ * The <i>offset info</i> computed and returned as {@link OffsetInfo} where
+ * {@link OffsetInfo#isStartsWithMatch()} will return true if
+ * <code>startsWith</code> was successfully matched with the stsarting bytes
+ * of the text line.
+ *
+ * @return offset info
+ */
+ public OffsetInfo nextOffsetInfo(byte[] startsWith) {
+ OffsetInfo offsetInfo = null;
+ int lineLength = 0;
+ byte[] token = null;
+ lineLoop:
+ while (this.bufferLength != -1) {
+ if (this.index >= this.bufferLength) {
+ this.fill();
+ }
+ if (this.bufferLength != -1) {
+ int i;
+ byte byteVal;
+ for (i = this.index; i < this.bufferLength; i++) {
+ byteVal = this.buffer[i];
+ lineLength++;
+ int crlfLength = isEol(byteVal, i);
+ if (crlfLength > 0) {
+ i += crlfLength;
+ if (crlfLength == 2) {
+ lineLength++;
+ }
+ offsetInfo = new OffsetInfo(this.offset, lineLength, crlfLength);
+ if (startsWith != null) {
+ token = this.extractDataToken(lineLength);
+ }
+ this.index = i;
+ this.mark = this.index;
+ break lineLoop;
+ }
+ }
+ this.index = i;
+ }
+ }
+ // EOF where last char(s) are not CRLF.
+ if (lineLength > 0 && offsetInfo == null) {
+ offsetInfo = new OffsetInfo(this.offset, lineLength, 0);
+ if (startsWith != null) {
+ token = this.extractDataToken(lineLength);
+ }
+ }
+ this.offset += lineLength;
+
+ // checks if the new line starts with 'startsWith' chars
+ if (startsWith != null) {
+ for (int i = 0; i < startsWith.length; i++) {
+ byte sB = startsWith[i];
+ if (token != null && sB != token[i]) {
+ offsetInfo.setStartsWithMatch(0);
+ break;
+ }
+ }
+ }
+ return offsetInfo;
+ }
+
+ private int isEol(byte currentByte, int currentIndex) {
+ int crlfLength = 0;
+ if (currentByte == '\n') {
+ crlfLength = 1;
+ } else if (currentByte == '\r') {
+ if ((currentIndex + 1) >= this.bufferLength) {
+ this.index = currentIndex + 1;
+ this.fill();
+ }
+ currentByte = this.buffer[currentIndex + 1];
+ crlfLength = currentByte == '\n' ? 2 : 1;
+ }
+ return crlfLength;
+ }
+
+ private byte[] extractDataToken(int length) {
+ byte[] data = null;
+ if (length > 0) {
+ data = new byte[length];
+ System.arraycopy(this.buffer, this.mark, data, 0, data.length);
+ }
+ return data;
+ }
+
+ /**
+ * Will fill the current buffer from current 'index' position, expanding it
+ * and or shuffling it if necessary
+ */
+ private void fill() {
+ if (this.index >= this.buffer.length) {
+ if (this.mark == 0) { // expand
+ byte[] newBuff = new byte[this.buffer.length + this.initialBufferSize];
+ System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
+ this.buffer = newBuff;
+ } else { // shuffle
+ int length = this.index - this.mark;
+ System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
+ this.index = length;
+ this.mark = 0;
+ }
+ }
+
+ try {
+ int bytesRead;
+ do {
+ bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
+ } while (bytesRead == 0);
+ this.bufferLength = bytesRead != -1 ? this.index + bytesRead : -1;
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed while reading InputStream", e);
+ }
+ }
+
+ /**
+ * Container to hold offset and meta info for a computed text line.
+ * The offset and meta info is represented with the following 4 values:
+ * <ul>
+ * <li><i>startOffset</i> - the offset in the overall stream which represents the beginning of the text line</li>
+ * <li><i>length</i> - length of the text line including CRLF characters</li>
+ * <li><i>crlfLength</i> - the length of the CRLF.
+ * Value 0 is returned if text line represents the last text line in the
+ * {@link InputStream} (i.e., EOF) and such line does not terminate with CR or LF or the combination of the two.
+ * Value 1 is returned if text line ends with '\n' or '\r'.
+ * Value 2 is returned if line ends with '\r\n').</li>
+ * <li><i>startsWithMatch</i> - <code>true</code> by default unless <code>startWith</code> bytes are provided and not matched.
+ * See {@link #nextOffsetInfo(byte[])} for more info.</li>
+ * </ul>
+ **/
+ public static class OffsetInfo {
+ private final long startOffset, length;
+ private final int crlfLength;
+
+ private boolean startsWithMatch = true;
+
+ OffsetInfo(long startOffset, long length, int crlfLength) {
+ this.startOffset = startOffset;
+ this.length = length;
+ this.crlfLength = crlfLength;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public int getCrlfLength() {
+ return this.crlfLength;
+ }
+
+ public boolean isStartsWithMatch() {
+ return this.startsWithMatch;
+ }
+
+ void setStartsWithMatch(int startsWithMatch) {
+ this.startsWithMatch = startsWithMatch == 1 ? true : false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/41f519e8/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java
new file mode 100644
index 0000000..3b64a17
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
+import org.junit.Test;
+
+public class TextLineDemarcatorTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nullStream() {
+ new TextLineDemarcator(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void illegalBufferSize() {
+ new TextLineDemarcator(mock(InputStream.class), -234);
+ }
+
+ @Test
+ public void emptyStreamNoStartWithFilter() {
+ String data = "";
+ InputStream is = stringToIs(data);
+ TextLineDemarcator demarcator = new TextLineDemarcator(is);
+ assertNull(demarcator.nextOffsetInfo());
+ }
+
+ @Test
+ public void emptyStreamAndStartWithFilter() {
+ String data = "";
+ InputStream is = stringToIs(data);
+ TextLineDemarcator demarcator = new TextLineDemarcator(is);
+ assertNull(demarcator.nextOffsetInfo("hello".getBytes()));
+ }
+
+ @Test
+ public void singleCR() {
+ InputStream is = stringToIs("\r");
+ TextLineDemarcator demarcator = new TextLineDemarcator(is);
+ OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
+ assertEquals(0, offsetInfo.getStartOffset());
+ assertEquals(1, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+ assertTrue(offsetInfo.isStartsWithMatch());
+ }
+
+ @Test
+ public void singleLF() {
+ InputStream is = stringToIs("\n");
+ TextLineDemarcator demarcator = new TextLineDemarcator(is);
+ OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
+ assertEquals(0, offsetInfo.getStartOffset());
+ assertEquals(1, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+ assertTrue(offsetInfo.isStartsWithMatch());
+ }
+
+ @Test
+ // essentially validates the internal 'isEol()' operation to ensure it will perform read-ahead
+ public void crlfWhereLFdoesNotFitInInitialBuffer() throws Exception {
+ InputStream is = stringToIs("oleg\r\njoe");
+ TextLineDemarcator demarcator = new TextLineDemarcator(is, 5);
+ OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
+ assertEquals(0, offsetInfo.getStartOffset());
+ assertEquals(6, offsetInfo.getLength());
+ assertEquals(2, offsetInfo.getCrlfLength());
+ assertTrue(offsetInfo.isStartsWithMatch());
+
+ offsetInfo = demarcator.nextOffsetInfo();
+ assertEquals(6, offsetInfo.getStartOffset());
+ assertEquals(3, offsetInfo.getLength());
+ assertEquals(0, offsetInfo.getCrlfLength());
+ assertTrue(offsetInfo.isStartsWithMatch());
+ }
+
+ @Test
+ public void mixedCRLF() throws Exception {
+ InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n");
+ TextLineDemarcator demarcator = new TextLineDemarcator(is, 4);
+ OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
+ assertEquals(0, offsetInfo.getStartOffset());
+ assertEquals(5, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+ assertTrue(offsetInfo.isStartsWithMatch());
+
+ offsetInfo = demarcator.nextOffsetInfo();
+ assertEquals(5, offsetInfo.getStartOffset());
+ assertEquals(4, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+ assertTrue(offsetInfo.isStartsWithMatch());
+
+ offsetInfo = demarcator.nextOffsetInfo();
+ assertEquals(9, offsetInfo.getStartOffset());
+ assertEquals(6, offsetInfo.getLength());
+ assertEquals(2, offsetInfo.getCrlfLength());
+ assertTrue(offsetInfo.isStartsWithMatch());
+
+ offsetInfo = demarcator.nextOffsetInfo();
+ assertEquals(15, offsetInfo.getStartOffset());
+ assertEquals(11, offsetInfo.getLength());
+ assertEquals(2, offsetInfo.getCrlfLength());
+ assertTrue(offsetInfo.isStartsWithMatch());
+ }
+
+ @Test
+ public void consecutiveAndMixedCRLF() throws Exception {
+ InputStream is = stringToIs("oleg\r\r\njoe\n\n\rjack\n\r\nstacymike\r\n\n\n\r");
+ TextLineDemarcator demarcator = new TextLineDemarcator(is, 4);
+
+ OffsetInfo offsetInfo = demarcator.nextOffsetInfo(); // oleg\r
+ assertEquals(5, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+
+ offsetInfo = demarcator.nextOffsetInfo(); // \r\n
+ assertEquals(2, offsetInfo.getLength());
+ assertEquals(2, offsetInfo.getCrlfLength());
+
+ offsetInfo = demarcator.nextOffsetInfo(); // joe\n
+ assertEquals(4, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+
+ offsetInfo = demarcator.nextOffsetInfo(); // \n
+ assertEquals(1, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+
+ offsetInfo = demarcator.nextOffsetInfo(); // \r
+ assertEquals(1, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+
+ offsetInfo = demarcator.nextOffsetInfo(); // jack\n
+ assertEquals(5, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+
+ offsetInfo = demarcator.nextOffsetInfo(); // \r\n
+ assertEquals(2, offsetInfo.getLength());
+ assertEquals(2, offsetInfo.getCrlfLength());
+
+ offsetInfo = demarcator.nextOffsetInfo(); // stacymike\r\n
+ assertEquals(11, offsetInfo.getLength());
+ assertEquals(2, offsetInfo.getCrlfLength());
+
+ offsetInfo = demarcator.nextOffsetInfo(); // \n
+ assertEquals(1, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+
+ offsetInfo = demarcator.nextOffsetInfo(); // \n
+ assertEquals(1, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+
+ offsetInfo = demarcator.nextOffsetInfo(); // \r
+ assertEquals(1, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+ }
+
+ @Test
+ public void startWithNoMatchOnWholeStream() throws Exception {
+ InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n");
+ TextLineDemarcator demarcator = new TextLineDemarcator(is, 4);
+
+ OffsetInfo offsetInfo = demarcator.nextOffsetInfo("foojhkj".getBytes());
+ assertEquals(0, offsetInfo.getStartOffset());
+ assertEquals(5, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+ assertFalse(offsetInfo.isStartsWithMatch());
+
+ offsetInfo = demarcator.nextOffsetInfo("foo".getBytes());
+ assertEquals(5, offsetInfo.getStartOffset());
+ assertEquals(4, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+ assertFalse(offsetInfo.isStartsWithMatch());
+
+ offsetInfo = demarcator.nextOffsetInfo("joe".getBytes());
+ assertEquals(9, offsetInfo.getStartOffset());
+ assertEquals(6, offsetInfo.getLength());
+ assertEquals(2, offsetInfo.getCrlfLength());
+ assertFalse(offsetInfo.isStartsWithMatch());
+
+ offsetInfo = demarcator.nextOffsetInfo("stasy".getBytes());
+ assertEquals(15, offsetInfo.getStartOffset());
+ assertEquals(11, offsetInfo.getLength());
+ assertEquals(2, offsetInfo.getCrlfLength());
+ assertFalse(offsetInfo.isStartsWithMatch());
+ }
+
+ @Test
+ public void startWithSomeMatches() throws Exception {
+ InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n");
+ TextLineDemarcator demarcator = new TextLineDemarcator(is, 7);
+
+ OffsetInfo offsetInfo = demarcator.nextOffsetInfo("foojhkj".getBytes());
+ assertEquals(0, offsetInfo.getStartOffset());
+ assertEquals(5, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+ assertFalse(offsetInfo.isStartsWithMatch());
+
+ offsetInfo = demarcator.nextOffsetInfo("jo".getBytes());
+ assertEquals(5, offsetInfo.getStartOffset());
+ assertEquals(4, offsetInfo.getLength());
+ assertEquals(1, offsetInfo.getCrlfLength());
+ assertTrue(offsetInfo.isStartsWithMatch());
+
+ offsetInfo = demarcator.nextOffsetInfo("joe".getBytes());
+ assertEquals(9, offsetInfo.getStartOffset());
+ assertEquals(6, offsetInfo.getLength());
+ assertEquals(2, offsetInfo.getCrlfLength());
+ assertFalse(offsetInfo.isStartsWithMatch());
+
+ offsetInfo = demarcator.nextOffsetInfo("stacy".getBytes());
+ assertEquals(15, offsetInfo.getStartOffset());
+ assertEquals(11, offsetInfo.getLength());
+ assertEquals(2, offsetInfo.getCrlfLength());
+ assertTrue(offsetInfo.isStartsWithMatch());
+ }
+
+ private InputStream stringToIs(String data) {
+ return new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/41f519e8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
index 81c06f8..2cbcd44 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
@@ -16,13 +16,11 @@
*/
package org.apache.nifi.processors.standard;
-import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.BitSet;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -31,9 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.EventDriven;
@@ -46,26 +42,22 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.stream.io.util.TextLineDemarcator;
+import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
@EventDriven
@SideEffectFree
@@ -76,7 +68,12 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream;
+ "or total size of fragment. Each output split file will contain no more than the configured number of lines or bytes. "
+ "If both Line Split Count and Maximum Fragment Size are specified, the split occurs at whichever limit is reached first. "
+ "If the first line of a fragment exceeds the Maximum Fragment Size, that line will be output in a single split file which "
- +" exceeds the configured maximum size limit.")
+ + "exceeds the configured maximum size limit. This component also allows one to specify that each split should include a header "
+ + "lines. Header lines can be computed by either specifying the amount of lines that should constitute a header or by using header "
+ + "marker to match against the read lines. If such match happens then the corresponding line will be treated as header. Keep in mind "
+ + "that upon the first failure of header marker match, no more marches will be performed and the rest of the data will be parsed as "
+ + "regular lines for a given split. If after computation of the header there are no more data, the resulting split will consists "
+ + "of only header lines.")
@WritesAttributes({
@WritesAttribute(attribute = "text.line.count", description = "The number of lines of text from the original FlowFile that were copied to this FlowFile"),
@WritesAttribute(attribute = "fragment.size", description = "The number of bytes from the original FlowFile that were copied to this FlowFile, " +
@@ -87,7 +84,6 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream;
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")})
@SeeAlso(MergeContent.class)
public class SplitText extends AbstractProcessor {
-
// attribute keys
public static final String SPLIT_LINE_COUNT = "text.line.count";
public static final String FRAGMENT_SIZE = "fragment.size";
@@ -150,548 +146,316 @@ public class SplitText extends AbstractProcessor {
.description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
.build();
- private List<PropertyDescriptor> properties;
- private Set<Relationship> relationships;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(LINE_SPLIT_COUNT);
- properties.add(FRAGMENT_MAX_SIZE);
- properties.add(HEADER_LINE_COUNT);
- properties.add(HEADER_MARKER);
- properties.add(REMOVE_TRAILING_NEWLINES);
- this.properties = Collections.unmodifiableList(properties);
-
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_ORIGINAL);
- relationships.add(REL_SPLITS);
- relationships.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
+ private static final List<PropertyDescriptor> properties;
+ private static final Set<Relationship> relationships;
+
+ static {
+ properties = Collections.unmodifiableList(Arrays.asList(new PropertyDescriptor[]{
+ LINE_SPLIT_COUNT,
+ FRAGMENT_MAX_SIZE,
+ HEADER_LINE_COUNT,
+ HEADER_MARKER,
+ REMOVE_TRAILING_NEWLINES
+ }));
+
+ relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(new Relationship[]{
+ REL_ORIGINAL,
+ REL_SPLITS,
+ REL_FAILURE
+ })));
}
- @Override
- protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
- List<ValidationResult> results = new ArrayList<>();
+ private volatile boolean removeTrailingNewLines;
- final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
- && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
+ private volatile long maxSplitSize;
- results.add(new ValidationResult.Builder()
- .subject("Maximum Fragment Size")
- .valid(!invalidState)
- .explanation("Property must be specified when Line Split Count is 0")
- .build()
- );
- return results;
- }
+ private volatile int lineCount;
+
+ private volatile int headerLineCount;
+
+ private volatile String headerMarker;
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
- final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
- final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
- byte[] leadingBytes = leadingNewLineBytes;
- int numLines = 0;
- long totalBytes = 0L;
- for (int i = 0; i < maxNumLines; i++) {
- final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
- final long bytes = eolMarker.getBytesConsumed();
- leadingBytes = eolMarker.getLeadingNewLineBytes();
-
- if (includeLineDelimiter && out != null) {
- if (leadingBytes != null) {
- out.write(leadingBytes);
- leadingBytes = null;
- }
- eolBuffer.drainTo(out);
- }
- totalBytes += bytes;
- if (bytes <= 0) {
- return numLines;
- }
- numLines++;
- if (totalBytes >= maxByteCount) {
- break;
- }
- }
- return numLines;
+ @OnScheduled
+ public void onSchedule(ProcessContext context) {
+ this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
+ ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
+ this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
+ ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
+ this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
+ this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
+ this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
}
- private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
- final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
- long bytesRead = 0L;
- final ByteArrayOutputStream buffer;
- if (out != null) {
- buffer = new ByteArrayOutputStream();
- } else {
- buffer = null;
+ /**
+ * Will split the incoming stream releasing all splits as FlowFile at once.
+ */
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
+ FlowFile sourceFlowFile = processSession.get();
+ if (sourceFlowFile == null) {
+ return;
}
- byte[] bytesToWriteFirst = leadingNewLineBytes;
-
- in.mark(Integer.MAX_VALUE);
- while (true) {
- final int nextByte = in.read();
-
- // if we hit end of stream we're done
- if (nextByte == -1) {
- if (buffer != null) {
- buffer.writeTo(out);
- buffer.close();
- }
- return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst); // bytesToWriteFirst should be "null"?
- }
-
- // Verify leading bytes do not violate size limitation
- if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
- return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
- }
- // Write leadingNewLines, if appropriate
- if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
- bytesRead += bytesToWriteFirst.length;
- buffer.write(bytesToWriteFirst);
- bytesToWriteFirst = null;
- }
- // buffer the output
- bytesRead++;
- if (buffer != null && nextByte != '\n' && nextByte != '\r') {
- if (bytesToWriteFirst != null) {
- buffer.write(bytesToWriteFirst);
- }
- bytesToWriteFirst = null;
- eolBuffer.drainTo(buffer);
- eolBuffer.clear();
- buffer.write(nextByte);
- }
-
- // check the size limit
- if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
- in.reset();
- if (buffer != null) {
- buffer.close();
- }
- return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
- }
-
- // if we have a new line, then we're done
- if (nextByte == '\n') {
- if (buffer != null) {
- buffer.writeTo(out);
- buffer.close();
- eolBuffer.addEndOfLine(false, true);
- }
- return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
- }
-
- // Determine if \n follows \r; in either case, end of line has been reached
- if (nextByte == '\r') {
- if (buffer != null) {
- buffer.writeTo(out);
- buffer.close();
- }
- in.mark(1);
- final int lookAheadByte = in.read();
- if (lookAheadByte == '\n') {
- eolBuffer.addEndOfLine(true, true);
- return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
- } else {
- in.reset();
- eolBuffer.addEndOfLine(true, false);
- return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
+ AtomicBoolean error = new AtomicBoolean();
+ List<SplitInfo> computedSplitsInfo = new ArrayList<>();
+ AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
+ processSession.read(sourceFlowFile, new InputStreamCallback() {
+ @Override
+ public void process(InputStream in) throws IOException {
+ TextLineDemarcator demarcator = new TextLineDemarcator(in);
+ SplitInfo splitInfo = null;
+ long startOffset = 0;
+
+ // Compute fragment representing the header (if available)
+ long start = System.nanoTime();
+ try {
+ if (SplitText.this.headerLineCount > 0) {
+ splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
+ if (splitInfo.lineCount < SplitText.this.headerLineCount) {
+ error.set(true);
+ getLogger().error("Unable to split " + sourceFlowFile + " due to insufficient amount of header lines. Required "
+ + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
+ }
+ } else if (SplitText.this.headerMarker != null) {
+ splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
+ }
+ headerSplitInfoRef.set(splitInfo);
+ } catch (IllegalStateException e) {
+ error.set(true);
+ getLogger().error(e.getMessage() + " Routing to failure.", e);
}
- }
- }
- }
- private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
- final long bufferedBytes) throws IOException {
- final SplitInfo info = new SplitInfo();
- final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
- int lastByte = -1;
- info.lengthBytes = bufferedBytes;
- long lastEolBufferLength = 0L;
-
- while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
- && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
- && eolBuffer.length() < maxSize) {
- in.mark(1);
- final int nextByte = in.read();
- // Check for \n following \r on last line
- if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
- in.reset();
- break;
- }
- switch (nextByte) {
- case -1:
- info.endOfStream = true;
- if (keepAllNewLines) {
- info.lengthBytes += eolBuffer.length();
+ // Compute and collect fragments representing the individual splits
+ if (!error.get()) {
+ if (headerSplitInfoRef.get() != null) {
+ startOffset = headerSplitInfoRef.get().length;
}
- if (lastByte != '\r') {
- info.lengthLines++;
+ long preAccumulatedLength = startOffset;
+ while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
+ computedSplitsInfo.add(splitInfo);
+ startOffset += splitInfo.length;
}
- info.bufferedBytes = 0;
- return info;
- case '\r':
- eolBuffer.addEndOfLine(true, false);
- info.lengthLines++;
- info.bufferedBytes = 0;
- break;
- case '\n':
- eolBuffer.addEndOfLine(false, true);
- if (lastByte != '\r') {
- info.lengthLines++;
+ long stop = System.nanoTime();
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
}
- info.bufferedBytes = 0;
- break;
- default:
- if (eolBuffer.length() > 0) {
- info.lengthBytes += eolBuffer.length();
- lastEolBufferLength = eolBuffer.length();
- eolBuffer.clear();
- }
- info.lengthBytes++;
- info.bufferedBytes++;
- break;
+ }
}
- lastByte = nextByte;
- }
- // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
- if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
- info.lengthBytes -= lastEolBufferLength;
- }
- if (keepAllNewLines) {
- info.lengthBytes += eolBuffer.length();
- }
- return info;
- }
+ });
- private int countHeaderLines(final ByteCountingInputStream in,
- final String headerMarker) throws IOException {
- int headerInfo = 0;
-
- final BufferedReader br = new BufferedReader(new InputStreamReader(in));
- in.mark(Integer.MAX_VALUE);
- String line = br.readLine();
- while (line != null) {
- // if line is not a header line, reset stream and return header counts
- if (!line.startsWith(headerMarker)) {
- in.reset();
- return headerInfo;
- } else {
- headerInfo++;
- }
- line = br.readLine();
+ if (error.get()){
+ processSession.transfer(sourceFlowFile, REL_FAILURE);
+ } else {
+ List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
+ processSession.transfer(sourceFlowFile, REL_ORIGINAL);
+ processSession.transfer(splitFlowFiles, REL_SPLITS);
}
- in.reset();
- return headerInfo;
}
@Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) {
- final FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final ComponentLog logger = getLogger();
- final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
- final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
- ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
- final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
- ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
- final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
- final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
-
- final AtomicReference<String> errorMessage = new AtomicReference<>(null);
- final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
-
- final long startNanos = System.nanoTime();
- final List<FlowFile> splits = new ArrayList<>();
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream rawIn) throws IOException {
- try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
- final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
-
- long bufferedPartialLine = 0;
-
- // if we have header lines, copy them into a ByteArrayOutputStream
- final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
- // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
- int headerInfoLineCount = 0;
- if (headerCount > 0) {
- headerInfoLineCount = headerCount;
- } else {
- if (headerMarker != null) {
- headerInfoLineCount = countHeaderLines(in, headerMarker);
- }
- }
-
- final byte[] headerNewLineBytes;
- final byte[] headerBytesWithoutTrailingNewLines;
- if (headerInfoLineCount > 0) {
- final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
-
- if (headerLinesCopied < headerInfoLineCount) {
- errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
- return;
- }
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ List<ValidationResult> results = new ArrayList<>();
+ boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
+ && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
+ results.add(new ValidationResult.Builder().subject("Maximum Fragment Size").valid(!invalidState)
+ .explanation("Property must be specified when Line Split Count is 0").build());
+ return results;
+ }
- // Break header apart into trailing newlines and remaining text
- final byte[] headerBytes = headerStream.toByteArray();
- int headerNewLineByteCount = 0;
- for (int i = headerBytes.length - 1; i >= 0; i--) {
- final byte headerByte = headerBytes[i];
-
- if (headerByte == '\r' || headerByte == '\n') {
- headerNewLineByteCount++;
- } else {
- break;
- }
- }
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
- if (headerNewLineByteCount == 0) {
- headerNewLineBytes = null;
- headerBytesWithoutTrailingNewLines = headerBytes;
+ /**
+ * Generates the list of {@link FlowFile}s representing splits. If
+ * {@link SplitInfo} provided as an argument to this operation is not null
+ * it signifies the header information and its contents will be included in
+ * each and every computed split.
+ */
+ private List<FlowFile> generateSplitFlowFiles(FlowFile sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
+ List<FlowFile> splitFlowFiles = new ArrayList<>();
+ FlowFile headerFlowFile = null;
+ long headerCrlfLength = 0;
+ if (splitInfo != null) {
+ headerFlowFile = processSession.clone(sourceFlowFile, splitInfo.startOffset, splitInfo.length);
+ headerCrlfLength = splitInfo.trimmedLength;
+ }
+ int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
+ String fragmentId = UUID.randomUUID().toString();
+
+ if (computedSplitsInfo.size() == 0) {
+ FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
+ splitFlowFile = SplitText.this.updateAttributes(processSession, splitFlowFile, 0, splitFlowFile.getSize(),
+ fragmentId, fragmentIndex++, 0, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ splitFlowFiles.add(splitFlowFile);
+ } else {
+ for (SplitInfo computedSplitInfo : computedSplitsInfo) {
+ long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length;
+ boolean proceedWithClone = headerFlowFile != null || length > 0;
+ if (proceedWithClone) {
+ FlowFile splitFlowFile = null;
+ if (headerFlowFile != null) {
+ if (length > 0) {
+ splitFlowFile = processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length);
+ splitFlowFile = processSession.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
} else {
- headerNewLineBytes = new byte[headerNewLineByteCount];
- System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
-
- headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
- System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
+ splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER
}
} else {
- headerBytesWithoutTrailingNewLines = null;
- headerNewLineBytes = null;
+ splitFlowFile = processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length);
}
- while (true) {
- if (headerInfoLineCount > 0) {
- // if we have header lines, create a new FlowFile, copy the header lines to that file,
- // and then start copying lines
- final AtomicInteger linesCopied = new AtomicInteger(0);
- final AtomicLong bytesCopied = new AtomicLong(0L);
- FlowFile splitFile = session.create(flowFile);
- try {
- splitFile = session.write(splitFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream rawOut) throws IOException {
- try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
- final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
- countingOut.write(headerBytesWithoutTrailingNewLines);
- //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
- linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
- includeLineDelimiter, headerNewLineBytes));
- bytesCopied.set(countingOut.getBytesWritten());
- }
- }
- });
- splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
- splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
- logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
- } finally {
- if (linesCopied.get() > 0) {
- splits.add(splitFile);
- } else {
- // if the number of content lines is a multiple of the SPLIT_LINE_COUNT,
- // the last flow file will contain just a header; don't forward that one
- session.remove(splitFile);
- }
- }
-
- // Check for EOF
- in.mark(1);
- if (in.read() == -1) {
- break;
- }
- in.reset();
-
- } else {
- // We have no header lines, so we can simply demarcate the original File via the
- // ProcessSession#clone method.
- long beforeReadingLines = in.getBytesConsumed() - bufferedPartialLine;
- final SplitInfo info = locateSplitPoint(in, maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
- if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
- bufferedPartialLine = info.bufferedBytes;
- }
- if (info.endOfStream) {
- // stream is out of data
- if (info.lengthBytes > 0) {
- info.offsetBytes = beforeReadingLines;
- splitInfos.add(info);
- final long procNanos = System.nanoTime() - startNanos;
- final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
- logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
- + "total splits = {}; total processing time = {} ms",
- new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
- }
- break;
- } else {
- if (info.lengthBytes != 0) {
- info.offsetBytes = beforeReadingLines;
- info.lengthBytes -= bufferedPartialLine;
- splitInfos.add(info);
- final long procNanos = System.nanoTime() - startNanos;
- final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
- logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
- + "total splits = {}; total processing time = {} ms",
- new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
- }
- }
- }
- }
+ splitFlowFile = SplitText.this.updateAttributes(processSession, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++,
+ computedSplitsInfo.size(), sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ splitFlowFiles.add(splitFlowFile);
}
}
- });
-
- if (errorMessage.get() != null) {
- logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
- session.transfer(flowFile, REL_FAILURE);
- if (!splits.isEmpty()) {
- session.remove(splits);
- }
- return;
}
- if (!splitInfos.isEmpty()) {
- // Create the splits
- for (final SplitInfo info : splitInfos) {
- FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes);
- split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines));
- split = session.putAttribute(split, FRAGMENT_SIZE, String.valueOf(info.lengthBytes));
- splits.add(split);
- }
+ getLogger().info("Split " + sourceFlowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : "."));
+ if (headerFlowFile != null) {
+ processSession.remove(headerFlowFile);
}
- finishFragmentAttributes(session, flowFile, splits);
-
- if (splits.size() > 10) {
- logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()});
- } else {
- logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits});
- }
-
- session.transfer(flowFile, REL_ORIGINAL);
- session.transfer(splits, REL_SPLITS);
+ return splitFlowFiles;
}
- private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
- final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
-
- final String fragmentId = UUID.randomUUID().toString();
- final ArrayList<FlowFile> newList = new ArrayList<>(splits);
- splits.clear();
- for (int i = 1; i <= newList.size(); i++) {
- FlowFile ff = newList.get(i - 1);
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(FRAGMENT_ID, fragmentId);
- attributes.put(FRAGMENT_INDEX, String.valueOf(i));
- attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
- attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
- FlowFile newFF = session.putAllAttributes(ff, attributes);
- splits.add(newFF);
- }
+ private FlowFile updateAttributes(ProcessSession processSession, FlowFile splitFlowFile, long splitLineCount, long splitFlowFileSize,
+ String splitId, int splitIndex, int splitCount, String origFileName) {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(SPLIT_LINE_COUNT, String.valueOf(splitLineCount));
+ attributes.put(FRAGMENT_SIZE, String.valueOf(splitFlowFile.getSize()));
+ attributes.put(FRAGMENT_ID, splitId);
+ attributes.put(FRAGMENT_INDEX, String.valueOf(splitIndex));
+ attributes.put(FRAGMENT_COUNT, String.valueOf(splitCount));
+ attributes.put(SEGMENT_ORIGINAL_FILENAME, origFileName);
+ return processSession.putAllAttributes(splitFlowFile, attributes);
}
- private static class SplitInfo {
-
- public long offsetBytes;
- public long lengthBytes;
- public long lengthLines;
- public long bufferedBytes;
- public boolean endOfStream;
-
- public SplitInfo() {
- this.offsetBytes = 0L;
- this.lengthBytes = 0L;
- this.lengthLines = 0L;
- this.bufferedBytes = 0L;
- this.endOfStream = false;
- }
- }
-
- public static class EndOfLineBuffer {
- private static final byte CARRIAGE_RETURN = (byte) '\r';
- private static final byte NEWLINE = (byte) '\n';
-
- private final BitSet buffer = new BitSet();
- private int index = 0;
-
- public void clear() {
- index = 0;
- }
-
- public void addEndOfLine(final boolean carriageReturn, final boolean newLine) {
- buffer.set(index++, carriageReturn);
- buffer.set(index++, newLine);
- }
-
- private void drainTo(final OutputStream out) throws IOException {
- for (int i = 0; i < index; i += 2) {
- final boolean cr = buffer.get(i);
- final boolean nl = buffer.get(i + 1);
-
- // we've consumed all data in the buffer
- if (!cr && !nl) {
- return;
- }
-
- if (cr) {
- out.write(CARRIAGE_RETURN);
+ /**
+ * Will generate {@link SplitInfo} for the next fragment that represents the
+ * header of the future split.
+ *
+ * If split size is controlled by the amount of lines in the split then the
+ * resulting {@link SplitInfo} line count will always be <= 'splitMaxLineCount'. It can only be less IF it reaches the EOF.
+ * If split size is controlled by the {@link #maxSplitSize}, then the resulting {@link SplitInfo} line count
+ * will vary but the length of the split will never be > {@link #maxSplitSize} and {@link IllegalStateException} will be thrown.
+ * This method also allows one to provide 'startsWithFilter' to allow headers to be determined via such filter (see {@link #HEADER_MARKER}.
+ */
+ private SplitInfo computeHeader(TextLineDemarcator demarcator, long startOffset, long splitMaxLineCount, byte[] startsWithFilter, SplitInfo previousSplitInfo) {
+ long length = 0;
+ long actualLineCount = 0;
+ OffsetInfo offsetInfo = null;
+ SplitInfo splitInfo = null;
+ OffsetInfo previousOffsetInfo = null;
+ long lastCrlfLength = 0;
+ while ((offsetInfo = demarcator.nextOffsetInfo(startsWithFilter)) != null) {
+ lastCrlfLength = offsetInfo.getCrlfLength();
+
+ if (startsWithFilter != null && !offsetInfo.isStartsWithMatch()) {
+ if (offsetInfo.getCrlfLength() != -1) {
+ previousOffsetInfo = offsetInfo;
}
-
- if (nl) {
- out.write(NEWLINE);
+ break;
+ } else {
+ if (length + offsetInfo.getLength() > this.maxSplitSize) {
+ throw new IllegalStateException( "Computing header resulted in header size being > MAX split size of " + this.maxSplitSize + ".");
+ } else {
+ length += offsetInfo.getLength();
+ actualLineCount++;
+ if (actualLineCount == splitMaxLineCount) {
+ break;
+ }
}
}
-
- clear();
}
- /**
- * @return the number of line endings in the buffer
- */
- public int length() {
- return index / 2;
+ if (actualLineCount > 0) {
+ splitInfo = new SplitInfo(startOffset, length, lastCrlfLength, actualLineCount, previousOffsetInfo);
}
+ return splitInfo;
}
- public static class EndOfLineMarker {
- private final long bytesConsumed;
- private final EndOfLineBuffer eolBuffer;
- private final boolean streamEnded;
- private final byte[] leadingNewLineBytes;
-
- public EndOfLineMarker(final long bytesCounted, final EndOfLineBuffer eolBuffer, final boolean streamEnded, final byte[] leadingNewLineBytes) {
- this.bytesConsumed = bytesCounted;
- this.eolBuffer = eolBuffer;
- this.streamEnded = streamEnded;
- this.leadingNewLineBytes = leadingNewLineBytes;
+ /**
+ * Will generate {@link SplitInfo} for the next split.
+ *
+ * If split size is controlled by the amount of lines in the split then the resulting
+ * {@link SplitInfo} line count will always be <= 'splitMaxLineCount'.
+ * If split size is controlled by the {@link #maxSplitSize}, then the resulting {@link SplitInfo}
+ * line count will vary but the length of the split will never be > {@link #maxSplitSize}.
+ */
+ private SplitInfo nextSplit(TextLineDemarcator demarcator, long startOffset, long splitMaxLineCount, SplitInfo remainderSplitInfo, long startingLength) {
+ long length = 0;
+ long trailingCrlfLength = 0;
+ long actualLineCount = 0;
+ OffsetInfo offsetInfo = null;
+ SplitInfo splitInfo = null;
+ // the remainder from the previous read after which it was determined that adding it would make
+ // the split size > 'maxSplitSize'. So it's being carried over to the next line.
+ if (remainderSplitInfo != null && remainderSplitInfo.remaningOffsetInfo != null) {
+ length += remainderSplitInfo.remaningOffsetInfo.getLength();
+ actualLineCount++;
}
+ OffsetInfo remaningOffsetInfo = null;
+ long lastCrlfLength = 0;
+ while ((offsetInfo = demarcator.nextOffsetInfo()) != null) {
+ lastCrlfLength = offsetInfo.getCrlfLength();
+
+ if (offsetInfo.getLength() == offsetInfo.getCrlfLength()) {
+ trailingCrlfLength += offsetInfo.getCrlfLength();
+ } else if (offsetInfo.getLength() > offsetInfo.getCrlfLength()) {
+ trailingCrlfLength = 0; // non-empty line came in, thus resetting counter
+ }
- public long getBytesConsumed() {
- return bytesConsumed;
+ if (length + offsetInfo.getLength() + startingLength > this.maxSplitSize) {
+ if (length == 0) { // single line per split
+ length += offsetInfo.getLength();
+ actualLineCount++;
+ } else {
+ remaningOffsetInfo = offsetInfo;
+ }
+ break;
+ } else {
+ length += offsetInfo.getLength();
+ actualLineCount++;
+ if (splitMaxLineCount > 0 && actualLineCount >= splitMaxLineCount) {
+ break;
+ }
+ }
}
- public EndOfLineBuffer getEndOfLineBuffer() {
- return eolBuffer;
+ if (actualLineCount > 0) {
+ if (length - trailingCrlfLength >= lastCrlfLength) {
+ trailingCrlfLength += lastCrlfLength; // trim CRLF from the last line
+ }
+ splitInfo = new SplitInfo(startOffset, length, length - trailingCrlfLength, actualLineCount, remaningOffsetInfo);
}
+ return splitInfo;
+ }
- public boolean isStreamEnded() {
- return streamEnded;
+ /**
+ * Container for hosting meta-information pertaining to the split so it can
+ * be used later to create {@link FlowFile} representing the split.
+ */
+ private class SplitInfo {
+ final long startOffset, length, trimmedLength, lineCount;
+ OffsetInfo remaningOffsetInfo;
+
+ SplitInfo(long startOffset, long length, long trimmedLength, long lineCount, OffsetInfo remaningOffsetInfo) {
+ this.startOffset = startOffset;
+ this.length = length;
+ this.lineCount = lineCount;
+ this.remaningOffsetInfo = remaningOffsetInfo;
+ this.trimmedLength = trimmedLength;
}
- public byte[] getLeadingNewLineBytes() {
- return leadingNewLineBytes;
+ @Override
+ public String toString() {
+ return "offset:" + startOffset + "; length:" + length + "; lineCount:" + lineCount;
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/41f519e8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
index c7ac4f6..1a2089c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
@@ -17,12 +17,6 @@
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-
-import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
@@ -30,6 +24,12 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
public class TestSplitText {
final String originalFilename = "original.txt";
@@ -176,7 +176,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
- runner.assertTransferCount(SplitText.REL_SPLITS, 0);
+ runner.assertTransferCount(SplitText.REL_SPLITS, 1);
// repeat with header cou8nt versus header marker
runner.clearTransferState();
@@ -189,7 +189,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
- runner.assertTransferCount(SplitText.REL_SPLITS, 0);
+ runner.assertTransferCount(SplitText.REL_SPLITS, 1);
// repeat single header line with no newline characters
runner.clearTransferState();
@@ -202,7 +202,7 @@ public class TestSplitText {
runner.assertTransferCount(SplitText.REL_FAILURE, 0);
runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
- runner.assertTransferCount(SplitText.REL_SPLITS, 0);
+ runner.assertTransferCount(SplitText.REL_SPLITS, 1);
}
@Test
@@ -813,7 +813,7 @@ public class TestSplitText {
final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
splits.get(0).assertContentEquals("\n\n1");
- splits.get(1).assertContentEquals("");
+ splits.get(1).assertContentEquals("\n");
}
}