You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/11/11 20:45:55 UTC

[1/2] nifi git commit: NIFI-2851: Added additional unit test to ensure correctness of demarcation when demarcator falls between buffered data

Repository: nifi
Updated Branches:
  refs/heads/master b73ba7f8d -> ad9247459


NIFI-2851: Added additional unit test to ensure correctness of demarcation when demarcator falls between buffered data

This closes #1116.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ad924745
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ad924745
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ad924745

Branch: refs/heads/master
Commit: ad924745933c3e018e0d78282d9d085978b8cd2b
Parents: 41f519e
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Nov 11 15:03:44 2016 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Nov 11 15:45:40 2016 -0500

----------------------------------------------------------------------
 .../stream/io/util/TextLineDemarcatorTest.java  | 24 ++++++++++++++++++++
 1 file changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ad924745/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java
index 3b64a17..ce66cad 100644
--- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java
@@ -18,11 +18,13 @@ package org.apache.nifi.stream.io.util;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 
@@ -236,6 +238,28 @@ public class TextLineDemarcatorTest {
         assertTrue(offsetInfo.isStartsWithMatch());
     }
 
+    @Test
+    public void testOnBufferSplitNoTrailingDelimiter() throws IOException {
+        final byte[] inputData = "Yes\nNo".getBytes(StandardCharsets.UTF_8);
+        final ByteArrayInputStream is = new ByteArrayInputStream(inputData);
+        final TextLineDemarcator demarcator = new TextLineDemarcator(is, 3);
+
+        final OffsetInfo first = demarcator.nextOffsetInfo();
+        final OffsetInfo second = demarcator.nextOffsetInfo();
+        final OffsetInfo third = demarcator.nextOffsetInfo();
+        assertNotNull(first);
+        assertNotNull(second);
+        assertNull(third);
+
+        assertEquals(0, first.getStartOffset());
+        assertEquals(4, first.getLength());
+        assertEquals(1, first.getCrlfLength());
+
+        assertEquals(4, second.getStartOffset());
+        assertEquals(2, second.getLength());
+        assertEquals(0, second.getCrlfLength());
+    }
+
     private InputStream stringToIs(String data) {
         return new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
     }


[2/2] nifi git commit: NIFI-2851 initial commit of perf improvements on SplitText

Posted by ma...@apache.org.
NIFI-2851 initial commit of perf improvements on SplitText

- introduced org.apache.nifi.stream.io.util.TextLineDemarcator
- refactored SplitText to use org.apache.nifi.stream.io.util.TextLineDemarcator
- updated SplitText's capability discription to provide more clarity around splits with headers.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/41f519e8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/41f519e8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/41f519e8

Branch: refs/heads/master
Commit: 41f519e84cb5d0bf8be4ce93e26a042d16cda273
Parents: b73ba7f
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Fri Oct 7 11:37:32 2016 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Nov 11 15:45:40 2016 -0500

----------------------------------------------------------------------
 .../nifi/stream/io/util/TextLineDemarcator.java | 253 ++++++
 .../stream/io/util/TextLineDemarcatorTest.java  | 242 ++++++
 .../nifi/processors/standard/SplitText.java     | 770 +++++++------------
 .../nifi/processors/standard/TestSplitText.java |  20 +-
 4 files changed, 772 insertions(+), 513 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/41f519e8/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java
new file mode 100644
index 0000000..e4c213a
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Implementation of demarcator of text lines in the provided
+ * {@link InputStream}. It works similar to the {@link BufferedReader} and its
+ * {@link BufferedReader#readLine()} methods except that it does not create a
+ * String representing the text line and instead returns the offset info for the
+ * computed text line. See {@link #nextOffsetInfo()} and
+ * {@link #nextOffsetInfo(byte[])} for more details.
+ * <p>
+ * This class is NOT thread-safe.
+ * </p>
+ */
+public class TextLineDemarcator {
+
+    private final static int INIT_BUFFER_SIZE = 8192;
+
+    private final InputStream is;
+
+    private final int initialBufferSize;
+
+    private byte[] buffer;
+
+    private int index;
+
+    private int mark;
+
+    private long offset;
+
+    private int bufferLength;
+
+    /**
+     * Constructs an instance of demarcator with provided {@link InputStream}
+     * and default buffer size.
+     */
+    public TextLineDemarcator(InputStream is) {
+        this(is, INIT_BUFFER_SIZE);
+    }
+
+    /**
+     * Constructs an instance of demarcator with provided {@link InputStream}
+     * and initial buffer size.
+     */
+    public TextLineDemarcator(InputStream is, int initialBufferSize) {
+        if (is == null) {
+            throw new IllegalArgumentException("'is' must not be null.");
+        }
+        if (initialBufferSize < 1) {
+            throw new IllegalArgumentException("'initialBufferSize' must be > 0.");
+        }
+        this.is = is;
+        this.initialBufferSize = initialBufferSize;
+        this.buffer = new byte[initialBufferSize];
+    }
+
+    /**
+     * Will compute the next <i>offset info</i> for a text line (line terminated
+     * by either '\r', '\n' or '\r\n'). <br>
+     * The <i>offset info</i> computed and returned as {@link OffsetInfo} where
+     * {@link OffsetInfo#isStartsWithMatch()} will always return true.
+     *
+     * @return offset info
+     */
+    public OffsetInfo nextOffsetInfo() {
+        return this.nextOffsetInfo(null);
+    }
+
+    /**
+     * Will compute the next <i>offset info</i> for a text line (line terminated
+     * by either '\r', '\n' or '\r\n'). <br>
+     * The <i>offset info</i> computed and returned as {@link OffsetInfo} where
+     * {@link OffsetInfo#isStartsWithMatch()} will return true if
+     * <code>startsWith</code> was successfully matched with the stsarting bytes
+     * of the text line.
+     *
+     * @return offset info
+     */
+    public OffsetInfo nextOffsetInfo(byte[] startsWith) {
+        OffsetInfo offsetInfo = null;
+        int lineLength = 0;
+        byte[] token = null;
+        lineLoop:
+        while (this.bufferLength != -1) {
+            if (this.index >= this.bufferLength) {
+                this.fill();
+            }
+            if (this.bufferLength != -1) {
+                int i;
+                byte byteVal;
+                for (i = this.index; i < this.bufferLength; i++) {
+                    byteVal = this.buffer[i];
+                    lineLength++;
+                    int crlfLength = isEol(byteVal, i);
+                    if (crlfLength > 0) {
+                        i += crlfLength;
+                        if (crlfLength == 2) {
+                            lineLength++;
+                        }
+                        offsetInfo = new OffsetInfo(this.offset, lineLength, crlfLength);
+                        if (startsWith != null) {
+                            token = this.extractDataToken(lineLength);
+                        }
+                        this.index = i;
+                        this.mark = this.index;
+                        break lineLoop;
+                    }
+                }
+                this.index = i;
+            }
+        }
+        // EOF where last char(s) are not CRLF.
+        if (lineLength > 0 && offsetInfo == null) {
+            offsetInfo = new OffsetInfo(this.offset, lineLength, 0);
+            if (startsWith != null) {
+                token = this.extractDataToken(lineLength);
+            }
+        }
+        this.offset += lineLength;
+
+        // checks if the new line starts with 'startsWith' chars
+        if (startsWith != null) {
+            for (int i = 0; i < startsWith.length; i++) {
+                byte sB = startsWith[i];
+                if (token != null && sB != token[i]) {
+                    offsetInfo.setStartsWithMatch(0);
+                    break;
+                }
+            }
+        }
+        return offsetInfo;
+    }
+
+    private int isEol(byte currentByte, int currentIndex) {
+        int crlfLength = 0;
+        if (currentByte == '\n') {
+            crlfLength = 1;
+        } else if (currentByte == '\r') {
+            if ((currentIndex + 1) >= this.bufferLength) {
+                this.index = currentIndex + 1;
+                this.fill();
+            }
+            currentByte = this.buffer[currentIndex + 1];
+            crlfLength = currentByte == '\n' ? 2 : 1;
+        }
+        return crlfLength;
+    }
+
+    private byte[] extractDataToken(int length) {
+        byte[] data = null;
+        if (length > 0) {
+            data = new byte[length];
+            System.arraycopy(this.buffer, this.mark, data, 0, data.length);
+        }
+        return data;
+    }
+
+    /**
+     * Will fill the current buffer from current 'index' position, expanding it
+     * and or shuffling it if necessary
+     */
+    private void fill() {
+        if (this.index >= this.buffer.length) {
+            if (this.mark == 0) { // expand
+                byte[] newBuff = new byte[this.buffer.length + this.initialBufferSize];
+                System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
+                this.buffer = newBuff;
+            } else { // shuffle
+                int length = this.index - this.mark;
+                System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
+                this.index = length;
+                this.mark = 0;
+            }
+        }
+
+        try {
+            int bytesRead;
+            do {
+                bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
+            } while (bytesRead == 0);
+            this.bufferLength = bytesRead != -1 ? this.index + bytesRead : -1;
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed while reading InputStream", e);
+        }
+    }
+
+    /**
+     * Container to hold offset and meta info for a computed text line.
+     * The offset and meta info is represented with the following 4 values:
+     *  <ul>
+     *    <li><i>startOffset</i> - the offset in the overall stream which represents the beginning of the text line</li>
+     *    <li><i>length</i> - length of the text line including CRLF characters</li>
+     *    <li><i>crlfLength</i> - the length of the CRLF.
+     *                            Value 0 is returned if text line represents the last text line in the
+     *                            {@link InputStream} (i.e., EOF) and such line does not terminate with CR or LF or the combination of the two.
+     *                            Value 1 is returned if text line ends with '\n' or '\r'.
+     *                            Value 2 is returned if line ends with '\r\n').</li>
+     *    <li><i>startsWithMatch</i> - <code>true</code> by default unless <code>startWith</code> bytes are provided and not matched. 
+     *                                 See {@link #nextOffsetInfo(byte[])} for more info.</li>
+     *  </ul>
+     **/
+    public static class OffsetInfo {
+        private final long startOffset, length;
+        private final int crlfLength;
+
+        private boolean startsWithMatch = true;
+
+        OffsetInfo(long startOffset, long length, int crlfLength) {
+            this.startOffset = startOffset;
+            this.length = length;
+            this.crlfLength = crlfLength;
+        }
+
+        public long getStartOffset() {
+            return startOffset;
+        }
+
+        public long getLength() {
+            return length;
+        }
+
+        public int getCrlfLength() {
+            return this.crlfLength;
+        }
+
+        public boolean isStartsWithMatch() {
+            return this.startsWithMatch;
+        }
+
+        void setStartsWithMatch(int startsWithMatch) {
+            this.startsWithMatch = startsWithMatch == 1 ? true : false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/41f519e8/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java
new file mode 100644
index 0000000..3b64a17
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TextLineDemarcatorTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
+import org.junit.Test;
+
+public class TextLineDemarcatorTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void nullStream() {
+        new TextLineDemarcator(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void illegalBufferSize() {
+        new TextLineDemarcator(mock(InputStream.class), -234);
+    }
+
+    @Test
+    public void emptyStreamNoStartWithFilter() {
+        String data = "";
+        InputStream is = stringToIs(data);
+        TextLineDemarcator demarcator = new TextLineDemarcator(is);
+        assertNull(demarcator.nextOffsetInfo());
+    }
+
+    @Test
+    public void emptyStreamAndStartWithFilter() {
+        String data = "";
+        InputStream is = stringToIs(data);
+        TextLineDemarcator demarcator = new TextLineDemarcator(is);
+        assertNull(demarcator.nextOffsetInfo("hello".getBytes()));
+    }
+
+    @Test
+    public void singleCR() {
+        InputStream is = stringToIs("\r");
+        TextLineDemarcator demarcator = new TextLineDemarcator(is);
+        OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
+        assertEquals(0, offsetInfo.getStartOffset());
+        assertEquals(1, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+        assertTrue(offsetInfo.isStartsWithMatch());
+    }
+
+    @Test
+    public void singleLF() {
+        InputStream is = stringToIs("\n");
+        TextLineDemarcator demarcator = new TextLineDemarcator(is);
+        OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
+        assertEquals(0, offsetInfo.getStartOffset());
+        assertEquals(1, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+        assertTrue(offsetInfo.isStartsWithMatch());
+    }
+
+    @Test
+    // essentially validates the internal 'isEol()' operation to ensure it will perform read-ahead
+    public void crlfWhereLFdoesNotFitInInitialBuffer() throws Exception {
+        InputStream is = stringToIs("oleg\r\njoe");
+        TextLineDemarcator demarcator = new TextLineDemarcator(is, 5);
+        OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
+        assertEquals(0, offsetInfo.getStartOffset());
+        assertEquals(6, offsetInfo.getLength());
+        assertEquals(2, offsetInfo.getCrlfLength());
+        assertTrue(offsetInfo.isStartsWithMatch());
+
+        offsetInfo = demarcator.nextOffsetInfo();
+        assertEquals(6, offsetInfo.getStartOffset());
+        assertEquals(3, offsetInfo.getLength());
+        assertEquals(0, offsetInfo.getCrlfLength());
+        assertTrue(offsetInfo.isStartsWithMatch());
+    }
+
+    @Test
+    public void mixedCRLF() throws Exception {
+        InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n");
+        TextLineDemarcator demarcator = new TextLineDemarcator(is, 4);
+        OffsetInfo offsetInfo = demarcator.nextOffsetInfo();
+        assertEquals(0, offsetInfo.getStartOffset());
+        assertEquals(5, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+        assertTrue(offsetInfo.isStartsWithMatch());
+
+        offsetInfo = demarcator.nextOffsetInfo();
+        assertEquals(5, offsetInfo.getStartOffset());
+        assertEquals(4, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+        assertTrue(offsetInfo.isStartsWithMatch());
+
+        offsetInfo = demarcator.nextOffsetInfo();
+        assertEquals(9, offsetInfo.getStartOffset());
+        assertEquals(6, offsetInfo.getLength());
+        assertEquals(2, offsetInfo.getCrlfLength());
+        assertTrue(offsetInfo.isStartsWithMatch());
+
+        offsetInfo = demarcator.nextOffsetInfo();
+        assertEquals(15, offsetInfo.getStartOffset());
+        assertEquals(11, offsetInfo.getLength());
+        assertEquals(2, offsetInfo.getCrlfLength());
+        assertTrue(offsetInfo.isStartsWithMatch());
+    }
+
+    @Test
+    public void consecutiveAndMixedCRLF() throws Exception {
+        InputStream is = stringToIs("oleg\r\r\njoe\n\n\rjack\n\r\nstacymike\r\n\n\n\r");
+        TextLineDemarcator demarcator = new TextLineDemarcator(is, 4);
+
+        OffsetInfo offsetInfo = demarcator.nextOffsetInfo(); // oleg\r
+        assertEquals(5, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+
+        offsetInfo = demarcator.nextOffsetInfo();        // \r\n
+        assertEquals(2, offsetInfo.getLength());
+        assertEquals(2, offsetInfo.getCrlfLength());
+
+        offsetInfo = demarcator.nextOffsetInfo();        // joe\n
+        assertEquals(4, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+
+        offsetInfo = demarcator.nextOffsetInfo();        // \n
+        assertEquals(1, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+
+        offsetInfo = demarcator.nextOffsetInfo();        // \r
+        assertEquals(1, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+
+        offsetInfo = demarcator.nextOffsetInfo();        // jack\n
+        assertEquals(5, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+
+        offsetInfo = demarcator.nextOffsetInfo();        // \r\n
+        assertEquals(2, offsetInfo.getLength());
+        assertEquals(2, offsetInfo.getCrlfLength());
+
+        offsetInfo = demarcator.nextOffsetInfo();        // stacymike\r\n
+        assertEquals(11, offsetInfo.getLength());
+        assertEquals(2, offsetInfo.getCrlfLength());
+
+        offsetInfo = demarcator.nextOffsetInfo();        // \n
+        assertEquals(1, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+
+        offsetInfo = demarcator.nextOffsetInfo();        // \n
+        assertEquals(1, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+
+        offsetInfo = demarcator.nextOffsetInfo();        // \r
+        assertEquals(1, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+    }
+
+    @Test
+    public void startWithNoMatchOnWholeStream() throws Exception {
+        InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n");
+        TextLineDemarcator demarcator = new TextLineDemarcator(is, 4);
+
+        OffsetInfo offsetInfo = demarcator.nextOffsetInfo("foojhkj".getBytes());
+        assertEquals(0, offsetInfo.getStartOffset());
+        assertEquals(5, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+        assertFalse(offsetInfo.isStartsWithMatch());
+
+        offsetInfo = demarcator.nextOffsetInfo("foo".getBytes());
+        assertEquals(5, offsetInfo.getStartOffset());
+        assertEquals(4, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+        assertFalse(offsetInfo.isStartsWithMatch());
+
+        offsetInfo = demarcator.nextOffsetInfo("joe".getBytes());
+        assertEquals(9, offsetInfo.getStartOffset());
+        assertEquals(6, offsetInfo.getLength());
+        assertEquals(2, offsetInfo.getCrlfLength());
+        assertFalse(offsetInfo.isStartsWithMatch());
+
+        offsetInfo = demarcator.nextOffsetInfo("stasy".getBytes());
+        assertEquals(15, offsetInfo.getStartOffset());
+        assertEquals(11, offsetInfo.getLength());
+        assertEquals(2, offsetInfo.getCrlfLength());
+        assertFalse(offsetInfo.isStartsWithMatch());
+    }
+
+    @Test
+    public void startWithSomeMatches() throws Exception {
+        InputStream is = stringToIs("oleg\rjoe\njack\r\nstacymike\r\n");
+        TextLineDemarcator demarcator = new TextLineDemarcator(is, 7);
+
+        OffsetInfo offsetInfo = demarcator.nextOffsetInfo("foojhkj".getBytes());
+        assertEquals(0, offsetInfo.getStartOffset());
+        assertEquals(5, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+        assertFalse(offsetInfo.isStartsWithMatch());
+
+        offsetInfo = demarcator.nextOffsetInfo("jo".getBytes());
+        assertEquals(5, offsetInfo.getStartOffset());
+        assertEquals(4, offsetInfo.getLength());
+        assertEquals(1, offsetInfo.getCrlfLength());
+        assertTrue(offsetInfo.isStartsWithMatch());
+
+        offsetInfo = demarcator.nextOffsetInfo("joe".getBytes());
+        assertEquals(9, offsetInfo.getStartOffset());
+        assertEquals(6, offsetInfo.getLength());
+        assertEquals(2, offsetInfo.getCrlfLength());
+        assertFalse(offsetInfo.isStartsWithMatch());
+
+        offsetInfo = demarcator.nextOffsetInfo("stacy".getBytes());
+        assertEquals(15, offsetInfo.getStartOffset());
+        assertEquals(11, offsetInfo.getLength());
+        assertEquals(2, offsetInfo.getCrlfLength());
+        assertTrue(offsetInfo.isStartsWithMatch());
+    }
+
+    private InputStream stringToIs(String data) {
+        return new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/41f519e8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
index 81c06f8..2cbcd44 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
@@ -16,13 +16,11 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.BitSet;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,9 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.nifi.annotation.behavior.EventDriven;
@@ -46,26 +42,22 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.stream.io.util.TextLineDemarcator;
+import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
 
 @EventDriven
 @SideEffectFree
@@ -76,7 +68,12 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream;
         + "or total size of fragment. Each output split file will contain no more than the configured number of lines or bytes. "
         + "If both Line Split Count and Maximum Fragment Size are specified, the split occurs at whichever limit is reached first. "
         + "If the first line of a fragment exceeds the Maximum Fragment Size, that line will be output in a single split file which "
-        +" exceeds the configured maximum size limit.")
+        + "exceeds the configured maximum size limit. This component also allows one to specify that each split should include a header "
+        + "lines. Header lines can be computed by either specifying the amount of lines that should constitute a header or by using header "
+        + "marker to match against the read lines. If such match happens then the corresponding line will be treated as header. Keep in mind "
+        + "that upon the first failure of header marker match, no more marches will be performed and the rest of the data will be parsed as "
+        + "regular lines for a given split. If after computation of the header there are no more data, the resulting split will consists "
+        + "of only header lines.")
 @WritesAttributes({
     @WritesAttribute(attribute = "text.line.count", description = "The number of lines of text from the original FlowFile that were copied to this FlowFile"),
     @WritesAttribute(attribute = "fragment.size", description = "The number of bytes from the original FlowFile that were copied to this FlowFile, " +
@@ -87,7 +84,6 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream;
     @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")})
 @SeeAlso(MergeContent.class)
 public class SplitText extends AbstractProcessor {
-
     // attribute keys
     public static final String SPLIT_LINE_COUNT = "text.line.count";
     public static final String FRAGMENT_SIZE = "fragment.size";
@@ -150,548 +146,316 @@ public class SplitText extends AbstractProcessor {
             .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
             .build();
 
-    private List<PropertyDescriptor> properties;
-    private Set<Relationship> relationships;
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(LINE_SPLIT_COUNT);
-        properties.add(FRAGMENT_MAX_SIZE);
-        properties.add(HEADER_LINE_COUNT);
-        properties.add(HEADER_MARKER);
-        properties.add(REMOVE_TRAILING_NEWLINES);
-        this.properties = Collections.unmodifiableList(properties);
-
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_ORIGINAL);
-        relationships.add(REL_SPLITS);
-        relationships.add(REL_FAILURE);
-        this.relationships = Collections.unmodifiableSet(relationships);
+    private static final List<PropertyDescriptor> properties;
+    private static final Set<Relationship> relationships;
+
+    static {
+        properties = Collections.unmodifiableList(Arrays.asList(new PropertyDescriptor[]{
+                LINE_SPLIT_COUNT,
+                FRAGMENT_MAX_SIZE,
+                HEADER_LINE_COUNT,
+                HEADER_MARKER,
+                REMOVE_TRAILING_NEWLINES
+        }));
+
+        relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(new Relationship[]{
+                REL_ORIGINAL,
+                REL_SPLITS,
+                REL_FAILURE
+        })));
     }
 
-    @Override
-    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
-        List<ValidationResult> results = new ArrayList<>();
+    private volatile boolean removeTrailingNewLines;
 
-        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
-                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
+    private volatile long maxSplitSize;
 
-        results.add(new ValidationResult.Builder()
-            .subject("Maximum Fragment Size")
-            .valid(!invalidState)
-            .explanation("Property must be specified when Line Split Count is 0")
-            .build()
-        );
-        return results;
-    }
+    private volatile int lineCount;
+
+    private volatile int headerLineCount;
+
+    private volatile String headerMarker;
 
     @Override
     public Set<Relationship> getRelationships() {
         return relationships;
     }
 
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
-                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
-        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-
-        byte[] leadingBytes = leadingNewLineBytes;
-        int numLines = 0;
-        long totalBytes = 0L;
-        for (int i = 0; i < maxNumLines; i++) {
-            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
-            final long bytes = eolMarker.getBytesConsumed();
-            leadingBytes = eolMarker.getLeadingNewLineBytes();
-
-            if (includeLineDelimiter && out != null) {
-                if (leadingBytes != null) {
-                    out.write(leadingBytes);
-                    leadingBytes = null;
-                }
-                eolBuffer.drainTo(out);
-            }
-            totalBytes += bytes;
-            if (bytes <= 0) {
-                return numLines;
-            }
-            numLines++;
-            if (totalBytes >= maxByteCount) {
-                break;
-            }
-        }
-        return numLines;
+    @OnScheduled
+    public void onSchedule(ProcessContext context) {
+        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
+                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
+        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
+                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
+        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
+        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
+        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
     }
 
-    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
-                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
-        long bytesRead = 0L;
-        final ByteArrayOutputStream buffer;
-        if (out != null) {
-            buffer = new ByteArrayOutputStream();
-        } else {
-            buffer = null;
+    /**
+     * Will split the incoming stream releasing all splits as FlowFile at once.
+     */
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
+        FlowFile sourceFlowFile = processSession.get();
+        if (sourceFlowFile == null) {
+            return;
         }
-        byte[] bytesToWriteFirst = leadingNewLineBytes;
-
-        in.mark(Integer.MAX_VALUE);
-        while (true) {
-            final int nextByte = in.read();
-
-            // if we hit end of stream we're done
-            if (nextByte == -1) {
-                if (buffer != null) {
-                    buffer.writeTo(out);
-                    buffer.close();
-                }
-                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
-            }
-
-            // Verify leading bytes do not violate size limitation
-            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
-                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
-            }
-            // Write leadingNewLines, if appropriate
-            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
-                bytesRead += bytesToWriteFirst.length;
-                buffer.write(bytesToWriteFirst);
-                bytesToWriteFirst = null;
-            }
-            // buffer the output
-            bytesRead++;
-            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
-                if (bytesToWriteFirst != null) {
-                    buffer.write(bytesToWriteFirst);
-                }
-                bytesToWriteFirst = null;
-                eolBuffer.drainTo(buffer);
-                eolBuffer.clear();
-                buffer.write(nextByte);
-            }
-
-            // check the size limit
-            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
-                in.reset();
-                if (buffer != null) {
-                    buffer.close();
-                }
-                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
-            }
-
-            // if we have a new line, then we're done
-            if (nextByte == '\n') {
-                if (buffer != null) {
-                    buffer.writeTo(out);
-                    buffer.close();
-                    eolBuffer.addEndOfLine(false, true);
-                }
-                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
-            }
-
-            // Determine if \n follows \r; in either case, end of line has been reached
-            if (nextByte == '\r') {
-                if (buffer != null) {
-                    buffer.writeTo(out);
-                    buffer.close();
-                }
-                in.mark(1);
-                final int lookAheadByte = in.read();
-                if (lookAheadByte == '\n') {
-                    eolBuffer.addEndOfLine(true, true);
-                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
-                } else {
-                    in.reset();
-                    eolBuffer.addEndOfLine(true, false);
-                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
+        AtomicBoolean error = new AtomicBoolean();
+        List<SplitInfo> computedSplitsInfo = new ArrayList<>();
+        AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
+        processSession.read(sourceFlowFile, new InputStreamCallback() {
+            @Override
+            public void process(InputStream in) throws IOException {
+                TextLineDemarcator demarcator = new TextLineDemarcator(in);
+                SplitInfo splitInfo = null;
+                long startOffset = 0;
+
+                // Compute fragment representing the header (if available)
+                long start = System.nanoTime();
+                try {
+                    if (SplitText.this.headerLineCount > 0) {
+                        splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
+                        if (splitInfo.lineCount < SplitText.this.headerLineCount) {
+                            error.set(true);
+                            getLogger().error("Unable to split " + sourceFlowFile + " due to insufficient amount of header lines. Required "
+                                    + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
+                        }
+                    } else if (SplitText.this.headerMarker != null) {
+                        splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
+                    }
+                    headerSplitInfoRef.set(splitInfo);
+                } catch (IllegalStateException e) {
+                    error.set(true);
+                    getLogger().error(e.getMessage() + " Routing to failure.", e);
                 }
-            }
-        }
-    }
 
-    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
-                                       final long bufferedBytes) throws IOException {
-        final SplitInfo info = new SplitInfo();
-        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
-        int lastByte = -1;
-        info.lengthBytes = bufferedBytes;
-        long lastEolBufferLength = 0L;
-
-        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
-                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
-                && eolBuffer.length() < maxSize) {
-            in.mark(1);
-            final int nextByte = in.read();
-            // Check for \n following \r on last line
-            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
-                in.reset();
-                break;
-            }
-            switch (nextByte) {
-                case -1:
-                    info.endOfStream = true;
-                    if (keepAllNewLines) {
-                        info.lengthBytes += eolBuffer.length();
+                // Compute and collect fragments representing the individual splits
+                if (!error.get()) {
+                    if (headerSplitInfoRef.get() != null) {
+                        startOffset = headerSplitInfoRef.get().length;
                     }
-                    if (lastByte != '\r') {
-                        info.lengthLines++;
+                    long preAccumulatedLength = startOffset;
+                    while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
+                        computedSplitsInfo.add(splitInfo);
+                        startOffset += splitInfo.length;
                     }
-                    info.bufferedBytes = 0;
-                    return info;
-                case '\r':
-                    eolBuffer.addEndOfLine(true, false);
-                    info.lengthLines++;
-                    info.bufferedBytes = 0;
-                    break;
-                case '\n':
-                    eolBuffer.addEndOfLine(false, true);
-                    if (lastByte != '\r') {
-                        info.lengthLines++;
+                    long stop = System.nanoTime();
+                    if (getLogger().isDebugEnabled()) {
+                        getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
                     }
-                    info.bufferedBytes = 0;
-                    break;
-                default:
-                    if (eolBuffer.length() > 0) {
-                        info.lengthBytes += eolBuffer.length();
-                        lastEolBufferLength = eolBuffer.length();
-                        eolBuffer.clear();
-                    }
-                    info.lengthBytes++;
-                    info.bufferedBytes++;
-                    break;
+                }
             }
-            lastByte = nextByte;
-        }
-        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
-        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
-            info.lengthBytes -= lastEolBufferLength;
-        }
-        if (keepAllNewLines) {
-            info.lengthBytes += eolBuffer.length();
-        }
-        return info;
-    }
+        });
 
-    private int countHeaderLines(final ByteCountingInputStream in,
-                                 final String headerMarker) throws IOException {
-        int headerInfo = 0;
-
-        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
-        in.mark(Integer.MAX_VALUE);
-        String line = br.readLine();
-        while (line != null) {
-            // if line is not a header line, reset stream and return header counts
-            if (!line.startsWith(headerMarker)) {
-                in.reset();
-                return headerInfo;
-            } else {
-                headerInfo++;
-            }
-            line = br.readLine();
+        if (error.get()){
+            processSession.transfer(sourceFlowFile, REL_FAILURE);
+        } else {
+            List<FlowFile> splitFlowFiles = this.generateSplitFlowFiles(sourceFlowFile, headerSplitInfoRef.get(), computedSplitsInfo, processSession);
+            processSession.transfer(sourceFlowFile, REL_ORIGINAL);
+            processSession.transfer(splitFlowFiles, REL_SPLITS);
         }
-        in.reset();
-        return headerInfo;
     }
 
     @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) {
-        final FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final ComponentLog logger = getLogger();
-        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
-        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
-                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
-        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
-                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
-        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
-        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
-
-        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
-        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
-
-        final long startNanos = System.nanoTime();
-        final List<FlowFile> splits = new ArrayList<>();
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream rawIn) throws IOException {
-                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
-                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
-
-                    long bufferedPartialLine = 0;
-
-                    // if we have header lines, copy them into a ByteArrayOutputStream
-                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
-                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
-                    int headerInfoLineCount = 0;
-                    if (headerCount > 0) {
-                        headerInfoLineCount = headerCount;
-                    } else {
-                        if (headerMarker != null) {
-                            headerInfoLineCount = countHeaderLines(in, headerMarker);
-                        }
-                    }
-
-                    final byte[] headerNewLineBytes;
-                    final byte[] headerBytesWithoutTrailingNewLines;
-                    if (headerInfoLineCount > 0) {
-                        final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
-
-                        if (headerLinesCopied < headerInfoLineCount) {
-                            errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
-                            return;
-                        }
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        List<ValidationResult> results = new ArrayList<>();
+        boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
+                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
+        results.add(new ValidationResult.Builder().subject("Maximum Fragment Size").valid(!invalidState)
+                .explanation("Property must be specified when Line Split Count is 0").build());
+        return results;
+    }
 
-                        // Break header apart into trailing newlines and remaining text
-                        final byte[] headerBytes = headerStream.toByteArray();
-                        int headerNewLineByteCount = 0;
-                        for (int i = headerBytes.length - 1; i >= 0; i--) {
-                            final byte headerByte = headerBytes[i];
-
-                            if (headerByte == '\r' || headerByte == '\n') {
-                                headerNewLineByteCount++;
-                            } else {
-                                break;
-                            }
-                        }
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
 
-                        if (headerNewLineByteCount == 0) {
-                            headerNewLineBytes = null;
-                            headerBytesWithoutTrailingNewLines = headerBytes;
+    /**
+     * Generates the list of {@link FlowFile}s representing splits. If
+     * {@link SplitInfo} provided as an argument to this operation is not null
+     * it signifies the header information and its contents will be included in
+     * each and every computed split.
+     */
+    private List<FlowFile> generateSplitFlowFiles(FlowFile sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, ProcessSession processSession){
+        List<FlowFile> splitFlowFiles = new ArrayList<>();
+        FlowFile headerFlowFile = null;
+        long headerCrlfLength = 0;
+        if (splitInfo != null) {
+            headerFlowFile = processSession.clone(sourceFlowFile, splitInfo.startOffset, splitInfo.length);
+            headerCrlfLength = splitInfo.trimmedLength;
+        }
+        int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
+        String fragmentId = UUID.randomUUID().toString();
+
+        if (computedSplitsInfo.size() == 0) {
+            FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
+            splitFlowFile = SplitText.this.updateAttributes(processSession, splitFlowFile, 0, splitFlowFile.getSize(),
+                    fragmentId, fragmentIndex++, 0, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+            splitFlowFiles.add(splitFlowFile);
+        } else {
+            for (SplitInfo computedSplitInfo : computedSplitsInfo) {
+                long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length;
+                boolean proceedWithClone = headerFlowFile != null || length > 0;
+                if (proceedWithClone) {
+                    FlowFile splitFlowFile = null;
+                    if (headerFlowFile != null) {
+                        if (length > 0) {
+                            splitFlowFile = processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length);
+                            splitFlowFile = processSession.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
                         } else {
-                            headerNewLineBytes = new byte[headerNewLineByteCount];
-                            System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
-
-                            headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
-                            System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
+                            splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER
                         }
                     } else {
-                        headerBytesWithoutTrailingNewLines = null;
-                        headerNewLineBytes = null;
+                        splitFlowFile = processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length);
                     }
 
-                    while (true) {
-                        if (headerInfoLineCount > 0) {
-                            // if we have header lines, create a new FlowFile, copy the header lines to that file,
-                            // and then start copying lines
-                            final AtomicInteger linesCopied = new AtomicInteger(0);
-                            final AtomicLong bytesCopied = new AtomicLong(0L);
-                            FlowFile splitFile = session.create(flowFile);
-                            try {
-                                splitFile = session.write(splitFile, new OutputStreamCallback() {
-                                    @Override
-                                    public void process(final OutputStream rawOut) throws IOException {
-                                        try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
-                                                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
-                                            countingOut.write(headerBytesWithoutTrailingNewLines);
-                                            //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
-                                            linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
-                                                    includeLineDelimiter, headerNewLineBytes));
-                                            bytesCopied.set(countingOut.getBytesWritten());
-                                        }
-                                    }
-                                });
-                                splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
-                                splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
-                                logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
-                            } finally {
-                                if (linesCopied.get() > 0) {
-                                    splits.add(splitFile);
-                                } else {
-                                    // if the number of content lines is a multiple of the SPLIT_LINE_COUNT,
-                                    // the last flow file will contain just a header; don't forward that one
-                                    session.remove(splitFile);
-                                }
-                            }
-
-                            // Check for EOF
-                            in.mark(1);
-                            if (in.read() == -1) {
-                                break;
-                            }
-                            in.reset();
-
-                        } else {
-                            // We have no header lines, so we can simply demarcate the original File via the
-                            // ProcessSession#clone method.
-                            long beforeReadingLines = in.getBytesConsumed() - bufferedPartialLine;
-                            final SplitInfo info = locateSplitPoint(in, maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
-                            if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
-                                bufferedPartialLine = info.bufferedBytes;
-                            }
-                            if (info.endOfStream) {
-                                // stream is out of data
-                                if (info.lengthBytes > 0) {
-                                    info.offsetBytes = beforeReadingLines;
-                                    splitInfos.add(info);
-                                    final long procNanos = System.nanoTime() - startNanos;
-                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
-                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
-                                                    + "total splits = {}; total processing time = {} ms",
-                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
-                                }
-                                break;
-                            } else {
-                                if (info.lengthBytes != 0) {
-                                    info.offsetBytes = beforeReadingLines;
-                                    info.lengthBytes -= bufferedPartialLine;
-                                    splitInfos.add(info);
-                                    final long procNanos = System.nanoTime() - startNanos;
-                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
-                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
-                                                    + "total splits = {}; total processing time = {} ms",
-                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
-                                }
-                            }
-                        }
-                    }
+                    splitFlowFile = SplitText.this.updateAttributes(processSession, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++,
+                            computedSplitsInfo.size(), sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+                    splitFlowFiles.add(splitFlowFile);
                 }
             }
-        });
-
-        if (errorMessage.get() != null) {
-            logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
-            session.transfer(flowFile, REL_FAILURE);
-            if (!splits.isEmpty()) {
-                session.remove(splits);
-            }
-            return;
         }
 
-        if (!splitInfos.isEmpty()) {
-            // Create the splits
-            for (final SplitInfo info : splitInfos) {
-                FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes);
-                split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines));
-                split = session.putAttribute(split, FRAGMENT_SIZE, String.valueOf(info.lengthBytes));
-                splits.add(split);
-            }
+        getLogger().info("Split " + sourceFlowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : "."));
+        if (headerFlowFile != null) {
+            processSession.remove(headerFlowFile);
         }
-        finishFragmentAttributes(session, flowFile, splits);
-
-        if (splits.size() > 10) {
-            logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()});
-        } else {
-            logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits});
-        }
-
-        session.transfer(flowFile, REL_ORIGINAL);
-        session.transfer(splits, REL_SPLITS);
+        return splitFlowFiles;
     }
 
-    private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
-        final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
-
-        final String fragmentId = UUID.randomUUID().toString();
-        final ArrayList<FlowFile> newList = new ArrayList<>(splits);
-        splits.clear();
-        for (int i = 1; i <= newList.size(); i++) {
-            FlowFile ff = newList.get(i - 1);
-            final Map<String, String> attributes = new HashMap<>();
-            attributes.put(FRAGMENT_ID, fragmentId);
-            attributes.put(FRAGMENT_INDEX, String.valueOf(i));
-            attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
-            attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
-            FlowFile newFF = session.putAllAttributes(ff, attributes);
-            splits.add(newFF);
-        }
+    private FlowFile updateAttributes(ProcessSession processSession, FlowFile splitFlowFile, long splitLineCount, long splitFlowFileSize,
+            String splitId, int splitIndex, int splitCount, String origFileName) {
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(SPLIT_LINE_COUNT, String.valueOf(splitLineCount));
+        attributes.put(FRAGMENT_SIZE, String.valueOf(splitFlowFile.getSize()));
+        attributes.put(FRAGMENT_ID, splitId);
+        attributes.put(FRAGMENT_INDEX, String.valueOf(splitIndex));
+        attributes.put(FRAGMENT_COUNT, String.valueOf(splitCount));
+        attributes.put(SEGMENT_ORIGINAL_FILENAME, origFileName);
+        return processSession.putAllAttributes(splitFlowFile, attributes);
     }
 
-    private static class SplitInfo {
-
-        public long offsetBytes;
-        public long lengthBytes;
-        public long lengthLines;
-        public long bufferedBytes;
-        public boolean endOfStream;
-
-        public SplitInfo() {
-            this.offsetBytes = 0L;
-            this.lengthBytes = 0L;
-            this.lengthLines = 0L;
-            this.bufferedBytes = 0L;
-            this.endOfStream = false;
-        }
-    }
-
-    public static class EndOfLineBuffer {
-        private static final byte CARRIAGE_RETURN = (byte) '\r';
-        private static final byte NEWLINE = (byte) '\n';
-
-        private final BitSet buffer = new BitSet();
-        private int index = 0;
-
-        public void clear() {
-            index = 0;
-        }
-
-        public void addEndOfLine(final boolean carriageReturn, final boolean newLine) {
-            buffer.set(index++, carriageReturn);
-            buffer.set(index++, newLine);
-        }
-
-        private void drainTo(final OutputStream out) throws IOException {
-            for (int i = 0; i < index; i += 2) {
-                final boolean cr = buffer.get(i);
-                final boolean nl = buffer.get(i + 1);
-
-                // we've consumed all data in the buffer
-                if (!cr && !nl) {
-                    return;
-                }
-
-                if (cr) {
-                    out.write(CARRIAGE_RETURN);
+    /**
+     * Will generate {@link SplitInfo} for the next fragment that represents the
+     * header of the future split.
+     *
+     * If split size is controlled by the amount of lines in the split then the
+     * resulting {@link SplitInfo} line count will always be <= 'splitMaxLineCount'. It can only be less IF it reaches the EOF.
+     * If split size is controlled by the {@link #maxSplitSize}, then the resulting {@link SplitInfo} line count
+     * will vary but the length of the split will never be > {@link #maxSplitSize} and {@link IllegalStateException} will be thrown.
+     * This method also allows one to provide 'startsWithFilter' to allow headers to be determined via such filter (see {@link #HEADER_MARKER}.
+     */
+    private SplitInfo computeHeader(TextLineDemarcator demarcator, long startOffset, long splitMaxLineCount, byte[] startsWithFilter, SplitInfo previousSplitInfo) {
+        long length = 0;
+        long actualLineCount = 0;
+        OffsetInfo offsetInfo = null;
+        SplitInfo splitInfo = null;
+        OffsetInfo previousOffsetInfo = null;
+        long lastCrlfLength = 0;
+        while ((offsetInfo = demarcator.nextOffsetInfo(startsWithFilter)) != null) {
+            lastCrlfLength = offsetInfo.getCrlfLength();
+
+            if (startsWithFilter != null && !offsetInfo.isStartsWithMatch()) {
+                if (offsetInfo.getCrlfLength() != -1) {
+                    previousOffsetInfo = offsetInfo;
                 }
-
-                if (nl) {
-                    out.write(NEWLINE);
+                break;
+            } else {
+                if (length + offsetInfo.getLength() > this.maxSplitSize) {
+                    throw new IllegalStateException( "Computing header resulted in header size being > MAX split size of " + this.maxSplitSize + ".");
+                } else {
+                    length += offsetInfo.getLength();
+                    actualLineCount++;
+                    if (actualLineCount == splitMaxLineCount) {
+                        break;
+                    }
                 }
             }
-
-            clear();
         }
 
-        /**
-         * @return the number of line endings in the buffer
-         */
-        public int length() {
-            return index / 2;
+        if (actualLineCount > 0) {
+            splitInfo = new SplitInfo(startOffset, length, lastCrlfLength, actualLineCount, previousOffsetInfo);
         }
+        return splitInfo;
     }
 
-    public static class EndOfLineMarker {
-        private final long bytesConsumed;
-        private final EndOfLineBuffer eolBuffer;
-        private final boolean streamEnded;
-        private final byte[] leadingNewLineBytes;
-
-        public EndOfLineMarker(final long bytesCounted, final EndOfLineBuffer eolBuffer, final boolean streamEnded, final byte[] leadingNewLineBytes) {
-            this.bytesConsumed = bytesCounted;
-            this.eolBuffer = eolBuffer;
-            this.streamEnded = streamEnded;
-            this.leadingNewLineBytes = leadingNewLineBytes;
+    /**
+     * Will generate {@link SplitInfo} for the next split.
+     *
+     * If split size is controlled by the amount of lines in the split then the resulting
+     * {@link SplitInfo} line count will always be <= 'splitMaxLineCount'.
+     * If split size is controlled by the {@link #maxSplitSize}, then the resulting {@link SplitInfo}
+     * line count will vary but the length of the split will never be > {@link #maxSplitSize}.
+     */
+    private SplitInfo nextSplit(TextLineDemarcator demarcator, long startOffset, long splitMaxLineCount, SplitInfo remainderSplitInfo, long startingLength) {
+        long length = 0;
+        long trailingCrlfLength = 0;
+        long actualLineCount = 0;
+        OffsetInfo offsetInfo = null;
+        SplitInfo splitInfo = null;
+        // the remainder from the previous read after which it was determined that adding it would make
+        // the split size > 'maxSplitSize'. So it's being carried over to the next line.
+        if (remainderSplitInfo != null && remainderSplitInfo.remaningOffsetInfo != null) {
+            length += remainderSplitInfo.remaningOffsetInfo.getLength();
+            actualLineCount++;
         }
+        OffsetInfo remaningOffsetInfo = null;
+        long lastCrlfLength = 0;
+        while ((offsetInfo = demarcator.nextOffsetInfo()) != null) {
+            lastCrlfLength = offsetInfo.getCrlfLength();
+
+            if (offsetInfo.getLength() == offsetInfo.getCrlfLength()) {
+                trailingCrlfLength += offsetInfo.getCrlfLength();
+            } else if (offsetInfo.getLength() > offsetInfo.getCrlfLength()) {
+                trailingCrlfLength = 0; // non-empty line came in, thus resetting counter
+            }
 
-        public long getBytesConsumed() {
-            return bytesConsumed;
+            if (length + offsetInfo.getLength() + startingLength > this.maxSplitSize) {
+                if (length == 0) { // single line per split
+                    length += offsetInfo.getLength();
+                    actualLineCount++;
+                } else {
+                    remaningOffsetInfo = offsetInfo;
+                }
+                break;
+            } else {
+                length += offsetInfo.getLength();
+                actualLineCount++;
+                if (splitMaxLineCount > 0 && actualLineCount >= splitMaxLineCount) {
+                    break;
+                }
+            }
         }
 
-        public EndOfLineBuffer getEndOfLineBuffer() {
-            return eolBuffer;
+        if (actualLineCount > 0) {
+            if (length - trailingCrlfLength >= lastCrlfLength) {
+                trailingCrlfLength += lastCrlfLength; // trim CRLF from the last line
+            }
+            splitInfo = new SplitInfo(startOffset, length, length - trailingCrlfLength, actualLineCount, remaningOffsetInfo);
         }
+        return splitInfo;
+    }
 
-        public boolean isStreamEnded() {
-            return streamEnded;
+    /**
+     * Container for hosting meta-information pertaining to the split so it can
+     * be used later to create {@link FlowFile} representing the split.
+     */
+    private class SplitInfo {
+        final long startOffset, length, trimmedLength, lineCount;
+        OffsetInfo remaningOffsetInfo;
+
+        SplitInfo(long startOffset, long length, long trimmedLength, long lineCount, OffsetInfo remaningOffsetInfo) {
+            this.startOffset = startOffset;
+            this.length = length;
+            this.lineCount = lineCount;
+            this.remaningOffsetInfo = remaningOffsetInfo;
+            this.trimmedLength = trimmedLength;
         }
 
-        public byte[] getLeadingNewLineBytes() {
-            return leadingNewLineBytes;
+        @Override
+        public String toString() {
+            return "offset:" + startOffset + "; length:" + length + "; lineCount:" + lineCount;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/41f519e8/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
index c7ac4f6..1a2089c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java
@@ -17,12 +17,6 @@
 package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertEquals;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-
-import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -30,6 +24,12 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
 
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
 public class TestSplitText {
 
     final String originalFilename = "original.txt";
@@ -176,7 +176,7 @@ public class TestSplitText {
 
         runner.assertTransferCount(SplitText.REL_FAILURE, 0);
         runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
-        runner.assertTransferCount(SplitText.REL_SPLITS, 0);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 1);
 
         // repeat with header cou8nt versus header marker
         runner.clearTransferState();
@@ -189,7 +189,7 @@ public class TestSplitText {
 
         runner.assertTransferCount(SplitText.REL_FAILURE, 0);
         runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
-        runner.assertTransferCount(SplitText.REL_SPLITS, 0);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 1);
 
         // repeat single header line with no newline characters
         runner.clearTransferState();
@@ -202,7 +202,7 @@ public class TestSplitText {
 
         runner.assertTransferCount(SplitText.REL_FAILURE, 0);
         runner.assertTransferCount(SplitText.REL_ORIGINAL, 1);
-        runner.assertTransferCount(SplitText.REL_SPLITS, 0);
+        runner.assertTransferCount(SplitText.REL_SPLITS, 1);
     }
 
     @Test
@@ -813,7 +813,7 @@ public class TestSplitText {
 
         final List<MockFlowFile> splits = splitRunner.getFlowFilesForRelationship(SplitText.REL_SPLITS);
         splits.get(0).assertContentEquals("\n\n1");
-        splits.get(1).assertContentEquals("");
+        splits.get(1).assertContentEquals("\n");
     }
 
 }