You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by olegz <gi...@git.apache.org> on 2016/10/07 15:38:40 UTC

[GitHub] nifi pull request #1116: NIFI-2851 initial comit of perf improvements on Spl...

GitHub user olegz opened a pull request:

    https://github.com/apache/nifi/pull/1116

    NIFI-2851 initial comit of perf improvements on SplitText

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/olegz/nifi NIFI-2851

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/1116.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1116
    
----
commit 0818d8689cb474bb1ad81dcd672cb0e31078825e
Author: Oleg Zhurakousky <ol...@suitcase.io>
Date:   2016-10-07T15:37:32Z

    NIFI-2851 initial comit of perf improvements on SplitText

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83260752
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    -        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws IOException {
    -                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and remaining text
    -                        final byte[] headerBytes = headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream rawOut) throws IOException {
    -                                        try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
    -                                                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, headerNewLineBytes));
    -                                            bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
    -                                splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    +                FlowFile headerFlowFile = null;
    +                long headerCrlfLength = 0;
    +                if (headerSplitInfoRef.get() != null) {
    +                    headerFlowFile = session.clone(flowFile, headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
    +                    headerCrlfLength = headerSplitInfoRef.get().trimmedLength;
    +                }
    +                int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
    +                String fragmentId = UUID.randomUUID().toString();
    +
    +                if (computedSplitsInfo.size() == 0) {
    +                    FlowFile splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
    +                    splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, 0, splitFlowFile.getSize(),
    +                            fragmentId, fragmentIndex++, 0, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                    splitFlowFiles.add(splitFlowFile);
    +                } else {
    +                    for (SplitInfo computedSplitInfo : computedSplitsInfo) {
    +                        long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length;
    +                        boolean proceedWithClone = headerFlowFile != null || length > 0;
    +                        if (proceedWithClone) {
    +                            FlowFile splitFlowFile = null;
    +                            if (headerFlowFile != null) {
    +                                if (length > 0) {
    +                                    splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length);
    +                                    splitFlowFile = session.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
                                     } else {
    -                                    // if the number of content lines is a multiple of the SPLIT_LINE_COUNT,
    -                                    // the last flow file will contain just a header; don't forward that one
    -                                    session.remove(splitFile);
    +                                    splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER
                                     }
    -                            }
    -
    -                            // Check for EOF
    -                            in.mark(1);
    -                            if (in.read() == -1) {
    -                                break;
    -                            }
    -                            in.reset();
    -
    -                        } else {
    -                            // We have no header lines, so we can simply demarcate the original File via the
    -                            // ProcessSession#clone method.
    -                            long beforeReadingLines = in.getBytesConsumed() - bufferedPartialLine;
    -                            final SplitInfo info = locateSplitPoint(in, maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
    -                            if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
    -                                bufferedPartialLine = info.bufferedBytes;
    -                            }
    -                            if (info.endOfStream) {
    -                                // stream is out of data
    -                                if (info.lengthBytes > 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = System.nanoTime() - startNanos;
    -                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; total processing time = {} ms",
    -                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    -                                break;
                                 } else {
    -                                if (info.lengthBytes != 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    info.lengthBytes -= bufferedPartialLine;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = System.nanoTime() - startNanos;
    -                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; total processing time = {} ms",
    -                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    +                                splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length);
                                 }
    +
    +                            splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++,
    +                                    computedSplitsInfo.size(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                            splitFlowFiles.add(splitFlowFile);
                             }
                         }
                     }
    -            }
    -        });
     
    -        if (errorMessage.get() != null) {
    -            logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
    -            session.transfer(flowFile, REL_FAILURE);
    -            if (!splits.isEmpty()) {
    -                session.remove(splits);
    +                getLogger().info("Split " + flowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : "."));
    +                if (headerFlowFile != null) {
    +                    session.remove(headerFlowFile);
    +                }
                 }
    -            return;
    -        }
     
    -        if (!splitInfos.isEmpty()) {
    -            // Create the splits
    -            for (final SplitInfo info : splitInfos) {
    -                FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes);
    -                split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines));
    -                split = session.putAttribute(split, FRAGMENT_SIZE, String.valueOf(info.lengthBytes));
    -                splits.add(split);
    +            if (error.get()) {
    +                session.transfer(flowFile, REL_FAILURE);
    +            } else {
    +                session.transfer(flowFile, REL_ORIGINAL);
    +                session.transfer(splitFlowFiles, REL_SPLITS);
                 }
    -        }
    -        finishFragmentAttributes(session, flowFile, splits);
    -
    -        if (splits.size() > 10) {
    -            logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()});
             } else {
    -            logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits});
    +            context.yield();
             }
    -
    -        session.transfer(flowFile, REL_ORIGINAL);
    -        session.transfer(splits, REL_SPLITS);
         }
     
    -    private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
    -        final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
    -
    -        final String fragmentId = UUID.randomUUID().toString();
    -        final ArrayList<FlowFile> newList = new ArrayList<>(splits);
    -        splits.clear();
    -        for (int i = 1; i <= newList.size(); i++) {
    -            FlowFile ff = newList.get(i - 1);
    -            final Map<String, String> attributes = new HashMap<>();
    -            attributes.put(FRAGMENT_ID, fragmentId);
    -            attributes.put(FRAGMENT_INDEX, String.valueOf(i));
    -            attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
    -            attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
    -            FlowFile newFF = session.putAllAttributes(ff, attributes);
    -            splits.add(newFF);
    -        }
    +    /**
    +     *
    +     */
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        List<ValidationResult> results = new ArrayList<>();
    +        boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    +                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    +        results.add(new ValidationResult.Builder().subject("Maximum Fragment Size").valid(!invalidState)
    +                .explanation("Property must be specified when Line Split Count is 0").build());
    +        return results;
         }
     
    -    private static class SplitInfo {
    -
    -        public long offsetBytes;
    -        public long lengthBytes;
    -        public long lengthLines;
    -        public long bufferedBytes;
    -        public boolean endOfStream;
    -
    -        public SplitInfo() {
    -            this.offsetBytes = 0L;
    -            this.lengthBytes = 0L;
    -            this.lengthLines = 0L;
    -            this.bufferedBytes = 0L;
    -            this.endOfStream = false;
    -        }
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return Collections.unmodifiableList(properties);
    --- End diff --
    
    Since we already have a List<PropertyDescriptor> as a member variable, why not just make the member variable an unmodifiable list and return that instead of creating a new one each time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83263078
  
    --- Diff: nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java ---
    @@ -0,0 +1,227 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.stream.io.util;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +
    +/**
    + * Implementation of demarcator of text lines in the provided
    + * {@link InputStream}. It works similar to the {@link BufferedReader} and its
    + * {@link BufferedReader#readLine()} methods except that it does not create a
    + * String representing the text line and instead returns the offset info for the
    + * computed text line. See {@link #nextOffsetInfo()} and
    + * {@link #nextOffsetInfo(byte[])} for more details.
    + * <p>
    + * This class is NOT thread-safe.
    + * </p>
    + */
    +public class TextLineDemarcator {
    +
    +    private final static int INIT_BUFFER_SIZE = 8192;
    +
    +    private final InputStream is;
    +
    +    private final int initialBufferSize;
    +
    +    private byte[] buffer;
    +
    +    private int index;
    +
    +    private int mark;
    +
    +    private long offset;
    +
    +    private int bufferLength;
    +
    +    /**
    +     * Constructs an instance of demarcator with provided {@link InputStream}
    +     * and default buffer size.
    +     */
    +    public TextLineDemarcator(InputStream is) {
    +        this(is, INIT_BUFFER_SIZE);
    +    }
    +
    +    /**
    +     * Constructs an instance of demarcator with provided {@link InputStream}
    +     * and initial buffer size.
    +     */
    +    public TextLineDemarcator(InputStream is, int initialBufferSize) {
    +        if (is == null) {
    +            throw new IllegalArgumentException("'is' must not be null.");
    +        }
    +        if (initialBufferSize < 1) {
    +            throw new IllegalArgumentException("'initialBufferSize' must be > 0.");
    +        }
    +        this.is = is;
    +        this.initialBufferSize = initialBufferSize;
    +        this.buffer = new byte[initialBufferSize];
    +    }
    +
    +    /**
    +     * Will compute the next <i>offset info</i> for a
    +     * text line (line terminated by either '\r', '\n' or '\r\n').
    +     * <br>
    +     * The <i>offset info</i> computed and returned as <code>long[]</code> consisting of
    +     * 4 elements <code>{startOffset, length, crlfLength, startsWithMatch}</code>.
    +     *  <ul>
    +     *    <li><i>startOffset</i> - the offset in the overall stream which represents the beginning of the text line</li>
    +     *    <li><i>length</i> - length of the text line including CRLF characters</li>
    +     *    <li><i>crlfLength</i> - the length of the CRLF. Could be either 1 (if line ends with '\n' or '\r')
    +     *                                          or 2 (if line ends with '\r\n').</li>
    +     *    <li><i>startsWithMatch</i> - value is always 1. See {@link #nextOffsetInfo(byte[])} for more info.</li>
    +     *  </ul>
    +     *
    +     * @return offset info as <code>long[]</code>
    +     */
    +    public long[] nextOffsetInfo() {
    +        return this.nextOffsetInfo(null);
    +    }
    +
    +    /**
    +     * Will compute the next <i>offset info</i> for a
    +     * text line (line terminated by either '\r', '\n' or '\r\n').
    +     * <br>
    +     * The <i>offset info</i> computed and returned as <code>long[]</code> consisting of
    +     * 4 elements <code>{startOffset, length, crlfLength, startsWithMatch}</code>.
    +     *  <ul>
    +     *    <li><i>startOffset</i> - the offset in the overall stream which represents the beginning of the text line</li>
    +     *    <li><i>length</i> - length of the text line including CRLF characters</li>
    +     *    <li><i>crlfLength</i> - the length of the CRLF. Could be either 1 (if line ends with '\n' or '\r')
    +     *                                          or 2 (if line ends with '\r\n').</li>
    +     *    <li><i>startsWithMatch</i> - value is always 1 unless 'startsWith' is provided. If 'startsWith' is provided it will
    +     *                                          be compared to the computed text line and if matches the value
    +     *                                          will remain 1 otherwise it will be set to 0. </li>
    +     *  </ul>
    +     * @param startsWith - bytes
    +     * @return offset info as <code>long[]</code>
    +     */
    +    public long[] nextOffsetInfo(byte[] startsWith) {
    +        long[] offsetInfo = null;
    +        int lineLength = 0;
    +        byte[] token = null;
    +        lineLoop:
    +        while (this.bufferLength != -1) {
    +            if (this.index >= this.bufferLength) {
    +                this.fill();
    +            }
    +            if (this.bufferLength != -1) {
    +                int i;
    +                byte byteVal;
    +                for (i = this.index; i < this.bufferLength; i++) {
    +                    byteVal = this.buffer[i];
    +                    lineLength++;
    +                    int crlfLength = isEol(byteVal, i);
    +                    if (crlfLength > 0) {
    +                        i += crlfLength;
    +                        if (crlfLength == 2) {
    +                            lineLength++;
    +                        }
    +                        offsetInfo = new long[] { this.offset, lineLength, crlfLength, 1 };
    +                        if (startsWith != null) {
    +                            token = this.extractDataToken(lineLength);
    +                        }
    +                        this.index = i;
    +                        this.mark = this.index;
    +                        break lineLoop;
    +                    }
    +                }
    +                this.index = i;
    +            }
    +        }
    +        // EOF where last char(s) are not CRLF.
    +        if (lineLength > 0 && offsetInfo == null) {
    +            offsetInfo = new long[] { this.offset, lineLength, 0, 1 };
    +            if (startsWith != null) {
    +                token = this.extractDataToken(lineLength);
    +            }
    +        }
    +        this.offset += lineLength;
    +
    +        // checks if the new line starts with 'startsWith' chars
    +        if (startsWith != null) {
    +            for (int i = 0; i < startsWith.length; i++) {
    +                byte sB = startsWith[i];
    +                if (token != null && sB != token[i]) {
    +                    offsetInfo[3] = 0;
    +                    break;
    +                }
    +            }
    +        }
    +        return offsetInfo;
    +    }
    +
    +    /**
    +     *
    +     */
    +    private int isEol(byte currentByte, int currentIndex) {
    +        int crlfLength = 0;
    +        if (currentByte == '\n') {
    +            crlfLength = 1;
    +        } else if (currentByte == '\r') {
    +            if ((currentIndex + 1) >= this.bufferLength) {
    +                this.index = currentIndex + 1;
    +                this.fill();
    +            }
    +            currentByte = this.buffer[currentIndex + 1];
    +            crlfLength = currentByte == '\n' ? 2 : 1;
    +        }
    +        return crlfLength;
    +    }
    +
    +    /**
    +     *
    +     */
    +    private byte[] extractDataToken(int length) {
    +        byte[] data = null;
    +        if (length > 0) {
    +            data = new byte[length];
    +            System.arraycopy(this.buffer, this.mark, data, 0, data.length);
    +        }
    +        return data;
    +    }
    +
    +    /**
    +     * Will fill the current buffer from current 'index' position, expanding it
    +     * and or shuffling it if necessary
    +     */
    +    private void fill() {
    +        if (this.index >= this.buffer.length) {
    +            if (this.mark == 0) { // expand
    +                byte[] newBuff = new byte[this.buffer.length + this.initialBufferSize];
    +                System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
    +                this.buffer = newBuff;
    +            } else { // shuffle
    +                int length = this.index - this.mark;
    +                System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
    +                this.index = length;
    +                this.mark = 0;
    --- End diff --
    
    I believe this.bufferLength should be set to length here, no? This is the same bug that was found recently in StreamDemarcator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83643595
  
    --- Diff: nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java ---
    @@ -0,0 +1,227 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.stream.io.util;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +
    +/**
    + * Implementation of demarcator of text lines in the provided
    + * {@link InputStream}. It works similar to the {@link BufferedReader} and its
    + * {@link BufferedReader#readLine()} methods except that it does not create a
    + * String representing the text line and instead returns the offset info for the
    + * computed text line. See {@link #nextOffsetInfo()} and
    + * {@link #nextOffsetInfo(byte[])} for more details.
    + * <p>
    + * This class is NOT thread-safe.
    + * </p>
    + */
    +public class TextLineDemarcator {
    +
    +    private final static int INIT_BUFFER_SIZE = 8192;
    +
    +    private final InputStream is;
    +
    +    private final int initialBufferSize;
    +
    +    private byte[] buffer;
    +
    +    private int index;
    +
    +    private int mark;
    +
    +    private long offset;
    +
    +    private int bufferLength;
    +
    +    /**
    +     * Constructs an instance of demarcator with provided {@link InputStream}
    +     * and default buffer size.
    +     */
    +    public TextLineDemarcator(InputStream is) {
    +        this(is, INIT_BUFFER_SIZE);
    +    }
    +
    +    /**
    +     * Constructs an instance of demarcator with provided {@link InputStream}
    +     * and initial buffer size.
    +     */
    +    public TextLineDemarcator(InputStream is, int initialBufferSize) {
    +        if (is == null) {
    +            throw new IllegalArgumentException("'is' must not be null.");
    +        }
    +        if (initialBufferSize < 1) {
    +            throw new IllegalArgumentException("'initialBufferSize' must be > 0.");
    +        }
    +        this.is = is;
    +        this.initialBufferSize = initialBufferSize;
    +        this.buffer = new byte[initialBufferSize];
    +    }
    +
    +    /**
    +     * Will compute the next <i>offset info</i> for a
    +     * text line (line terminated by either '\r', '\n' or '\r\n').
    +     * <br>
    +     * The <i>offset info</i> computed and returned as <code>long[]</code> consisting of
    +     * 4 elements <code>{startOffset, length, crlfLength, startsWithMatch}</code>.
    +     *  <ul>
    +     *    <li><i>startOffset</i> - the offset in the overall stream which represents the beginning of the text line</li>
    +     *    <li><i>length</i> - length of the text line including CRLF characters</li>
    +     *    <li><i>crlfLength</i> - the length of the CRLF. Could be either 1 (if line ends with '\n' or '\r')
    +     *                                          or 2 (if line ends with '\r\n').</li>
    +     *    <li><i>startsWithMatch</i> - value is always 1. See {@link #nextOffsetInfo(byte[])} for more info.</li>
    +     *  </ul>
    +     *
    +     * @return offset info as <code>long[]</code>
    +     */
    +    public long[] nextOffsetInfo() {
    --- End diff --
    
    Yes, it would be easier to read, but based on running some performance tests there is also a price to pay for it although not very significant. Will change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83261038
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    -        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws IOException {
    -                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and remaining text
    -                        final byte[] headerBytes = headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream rawOut) throws IOException {
    -                                        try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
    -                                                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, headerNewLineBytes));
    -                                            bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
    -                                splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    +                FlowFile headerFlowFile = null;
    +                long headerCrlfLength = 0;
    +                if (headerSplitInfoRef.get() != null) {
    +                    headerFlowFile = session.clone(flowFile, headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
    +                    headerCrlfLength = headerSplitInfoRef.get().trimmedLength;
    +                }
    +                int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
    +                String fragmentId = UUID.randomUUID().toString();
    +
    +                if (computedSplitsInfo.size() == 0) {
    +                    FlowFile splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
    +                    splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, 0, splitFlowFile.getSize(),
    +                            fragmentId, fragmentIndex++, 0, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                    splitFlowFiles.add(splitFlowFile);
    +                } else {
    +                    for (SplitInfo computedSplitInfo : computedSplitsInfo) {
    +                        long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length;
    +                        boolean proceedWithClone = headerFlowFile != null || length > 0;
    +                        if (proceedWithClone) {
    +                            FlowFile splitFlowFile = null;
    +                            if (headerFlowFile != null) {
    +                                if (length > 0) {
    +                                    splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length);
    +                                    splitFlowFile = session.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
                                     } else {
    -                                    // if the number of content lines is a multiple of the SPLIT_LINE_COUNT,
    -                                    // the last flow file will contain just a header; don't forward that one
    -                                    session.remove(splitFile);
    +                                    splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER
                                     }
    -                            }
    -
    -                            // Check for EOF
    -                            in.mark(1);
    -                            if (in.read() == -1) {
    -                                break;
    -                            }
    -                            in.reset();
    -
    -                        } else {
    -                            // We have no header lines, so we can simply demarcate the original File via the
    -                            // ProcessSession#clone method.
    -                            long beforeReadingLines = in.getBytesConsumed() - bufferedPartialLine;
    -                            final SplitInfo info = locateSplitPoint(in, maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
    -                            if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
    -                                bufferedPartialLine = info.bufferedBytes;
    -                            }
    -                            if (info.endOfStream) {
    -                                // stream is out of data
    -                                if (info.lengthBytes > 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = System.nanoTime() - startNanos;
    -                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; total processing time = {} ms",
    -                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    -                                break;
                                 } else {
    -                                if (info.lengthBytes != 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    info.lengthBytes -= bufferedPartialLine;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = System.nanoTime() - startNanos;
    -                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; total processing time = {} ms",
    -                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    +                                splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length);
                                 }
    +
    +                            splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++,
    +                                    computedSplitsInfo.size(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                            splitFlowFiles.add(splitFlowFile);
                             }
                         }
                     }
    -            }
    -        });
     
    -        if (errorMessage.get() != null) {
    -            logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
    -            session.transfer(flowFile, REL_FAILURE);
    -            if (!splits.isEmpty()) {
    -                session.remove(splits);
    +                getLogger().info("Split " + flowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : "."));
    +                if (headerFlowFile != null) {
    +                    session.remove(headerFlowFile);
    +                }
                 }
    -            return;
    -        }
     
    -        if (!splitInfos.isEmpty()) {
    -            // Create the splits
    -            for (final SplitInfo info : splitInfos) {
    -                FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes);
    -                split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines));
    -                split = session.putAttribute(split, FRAGMENT_SIZE, String.valueOf(info.lengthBytes));
    -                splits.add(split);
    +            if (error.get()) {
    +                session.transfer(flowFile, REL_FAILURE);
    +            } else {
    +                session.transfer(flowFile, REL_ORIGINAL);
    +                session.transfer(splitFlowFiles, REL_SPLITS);
                 }
    -        }
    -        finishFragmentAttributes(session, flowFile, splits);
    -
    -        if (splits.size() > 10) {
    -            logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()});
             } else {
    -            logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits});
    +            context.yield();
             }
    -
    -        session.transfer(flowFile, REL_ORIGINAL);
    -        session.transfer(splits, REL_SPLITS);
         }
     
    -    private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
    -        final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
    -
    -        final String fragmentId = UUID.randomUUID().toString();
    -        final ArrayList<FlowFile> newList = new ArrayList<>(splits);
    -        splits.clear();
    -        for (int i = 1; i <= newList.size(); i++) {
    -            FlowFile ff = newList.get(i - 1);
    -            final Map<String, String> attributes = new HashMap<>();
    -            attributes.put(FRAGMENT_ID, fragmentId);
    -            attributes.put(FRAGMENT_INDEX, String.valueOf(i));
    -            attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
    -            attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
    -            FlowFile newFF = session.putAllAttributes(ff, attributes);
    -            splits.add(newFF);
    -        }
    +    /**
    +     *
    +     */
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        List<ValidationResult> results = new ArrayList<>();
    +        boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    +                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    +        results.add(new ValidationResult.Builder().subject("Maximum Fragment Size").valid(!invalidState)
    +                .explanation("Property must be specified when Line Split Count is 0").build());
    +        return results;
         }
     
    -    private static class SplitInfo {
    -
    -        public long offsetBytes;
    -        public long lengthBytes;
    -        public long lengthLines;
    -        public long bufferedBytes;
    -        public boolean endOfStream;
    -
    -        public SplitInfo() {
    -            this.offsetBytes = 0L;
    -            this.lengthBytes = 0L;
    -            this.lengthLines = 0L;
    -            this.bufferedBytes = 0L;
    -            this.endOfStream = false;
    -        }
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return Collections.unmodifiableList(properties);
         }
     
    -    public static class EndOfLineBuffer {
    -        private static final byte CARRIAGE_RETURN = (byte) '\r';
    -        private static final byte NEWLINE = (byte) '\n';
    -
    -        private final BitSet buffer = new BitSet();
    -        private int index = 0;
    -
    -        public void clear() {
    -            index = 0;
    -        }
    -
    -        public void addEndOfLine(final boolean carriageReturn, final boolean newLine) {
    -            buffer.set(index++, carriageReturn);
    -            buffer.set(index++, newLine);
    -        }
    -
    -        private void drainTo(final OutputStream out) throws IOException {
    -            for (int i = 0; i < index; i += 2) {
    -                final boolean cr = buffer.get(i);
    -                final boolean nl = buffer.get(i + 1);
    -
    -                // we've consumed all data in the buffer
    -                if (!cr && !nl) {
    -                    return;
    -                }
    -
    -                if (cr) {
    -                    out.write(CARRIAGE_RETURN);
    +    /**
    +     *
    +     */
    +   private FlowFile updateAttributes(ProcessSession session, FlowFile splitFlowFile, long lCount, long fSize, String fId, int fIdx, int fCount, String origFname) {
    --- End diff --
    
    should avoid abbreviations of variable names - nifi tends to spell everything out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83260605
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    --- End diff --
    
    Should remove blank javadoc lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83255847
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    -        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws IOException {
    -                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    --- End diff --
    
    We should fail-fast here and follow the existing convention of using `if (flowFile == null) { return; }` rather than indenting the entire method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1116: NIFI-2851 initial commit of perf improvements on SplitText

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on the issue:

    https://github.com/apache/nifi/pull/1116
  
    @olegz thanks for jumping on this. Sorry it's taken me so long to get back to it. I verified the changes are good now. I added an additional unit test to verify a corner case that was problematic in the StreamDemarcator and all is looking good. +1 merged to master!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #1116: NIFI-2851 initial commit of perf improvements on SplitText

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on the issue:

    https://github.com/apache/nifi/pull/1116
  
    @markap14 PR comments are addressed. Thanks for reviewing. Back at ya!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83639515
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    -        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws IOException {
    -                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and remaining text
    -                        final byte[] headerBytes = headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream rawOut) throws IOException {
    -                                        try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
    -                                                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, headerNewLineBytes));
    -                                            bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
    -                                splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    +                FlowFile headerFlowFile = null;
    +                long headerCrlfLength = 0;
    +                if (headerSplitInfoRef.get() != null) {
    +                    headerFlowFile = session.clone(flowFile, headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
    --- End diff --
    
    I believe it's OK here since it is completely encapsulated to the SplitText's instance making it essentially an extension to SplitText. I would agree if SplitInfo was 'static class. . .'.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83257640
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    -        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws IOException {
    -                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and remaining text
    -                        final byte[] headerBytes = headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream rawOut) throws IOException {
    -                                        try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
    -                                                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, headerNewLineBytes));
    -                                            bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
    -                                splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    +                FlowFile headerFlowFile = null;
    +                long headerCrlfLength = 0;
    +                if (headerSplitInfoRef.get() != null) {
    +                    headerFlowFile = session.clone(flowFile, headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
    --- End diff --
    
    We should probably be using getters here for startOffset, length, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83260348
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    -        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws IOException {
    -                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and remaining text
    -                        final byte[] headerBytes = headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream rawOut) throws IOException {
    -                                        try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
    -                                                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, headerNewLineBytes));
    -                                            bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
    -                                splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    +                FlowFile headerFlowFile = null;
    +                long headerCrlfLength = 0;
    +                if (headerSplitInfoRef.get() != null) {
    +                    headerFlowFile = session.clone(flowFile, headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
    +                    headerCrlfLength = headerSplitInfoRef.get().trimmedLength;
    +                }
    +                int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
    +                String fragmentId = UUID.randomUUID().toString();
    +
    +                if (computedSplitsInfo.size() == 0) {
    +                    FlowFile splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
    +                    splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, 0, splitFlowFile.getSize(),
    +                            fragmentId, fragmentIndex++, 0, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                    splitFlowFiles.add(splitFlowFile);
    +                } else {
    +                    for (SplitInfo computedSplitInfo : computedSplitsInfo) {
    +                        long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length;
    +                        boolean proceedWithClone = headerFlowFile != null || length > 0;
    +                        if (proceedWithClone) {
    +                            FlowFile splitFlowFile = null;
    +                            if (headerFlowFile != null) {
    +                                if (length > 0) {
    +                                    splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length);
    +                                    splitFlowFile = session.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
                                     } else {
    -                                    // if the number of content lines is a multiple of the SPLIT_LINE_COUNT,
    -                                    // the last flow file will contain just a header; don't forward that one
    -                                    session.remove(splitFile);
    +                                    splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER
                                     }
    -                            }
    -
    -                            // Check for EOF
    -                            in.mark(1);
    -                            if (in.read() == -1) {
    -                                break;
    -                            }
    -                            in.reset();
    -
    -                        } else {
    -                            // We have no header lines, so we can simply demarcate the original File via the
    -                            // ProcessSession#clone method.
    -                            long beforeReadingLines = in.getBytesConsumed() - bufferedPartialLine;
    -                            final SplitInfo info = locateSplitPoint(in, maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
    -                            if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
    -                                bufferedPartialLine = info.bufferedBytes;
    -                            }
    -                            if (info.endOfStream) {
    -                                // stream is out of data
    -                                if (info.lengthBytes > 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = System.nanoTime() - startNanos;
    -                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; total processing time = {} ms",
    -                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    -                                break;
                                 } else {
    -                                if (info.lengthBytes != 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    info.lengthBytes -= bufferedPartialLine;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = System.nanoTime() - startNanos;
    -                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; total processing time = {} ms",
    -                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    +                                splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length);
                                 }
    +
    +                            splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++,
    +                                    computedSplitsInfo.size(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                            splitFlowFiles.add(splitFlowFile);
                             }
                         }
                     }
    -            }
    -        });
     
    -        if (errorMessage.get() != null) {
    -            logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
    -            session.transfer(flowFile, REL_FAILURE);
    -            if (!splits.isEmpty()) {
    -                session.remove(splits);
    +                getLogger().info("Split " + flowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : "."));
    +                if (headerFlowFile != null) {
    +                    session.remove(headerFlowFile);
    +                }
                 }
    -            return;
    -        }
     
    -        if (!splitInfos.isEmpty()) {
    -            // Create the splits
    -            for (final SplitInfo info : splitInfos) {
    -                FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes);
    -                split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines));
    -                split = session.putAttribute(split, FRAGMENT_SIZE, String.valueOf(info.lengthBytes));
    -                splits.add(split);
    +            if (error.get()) {
    +                session.transfer(flowFile, REL_FAILURE);
    +            } else {
    +                session.transfer(flowFile, REL_ORIGINAL);
    +                session.transfer(splitFlowFiles, REL_SPLITS);
                 }
    -        }
    -        finishFragmentAttributes(session, flowFile, splits);
    -
    -        if (splits.size() > 10) {
    -            logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()});
             } else {
    -            logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits});
    +            context.yield();
    --- End diff --
    
    We should not be yielding in this case. There is nothing preventing the processor from making progress on the next FlowFile.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83260548
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    -        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws IOException {
    -                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and remaining text
    -                        final byte[] headerBytes = headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream rawOut) throws IOException {
    -                                        try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
    -                                                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, headerNewLineBytes));
    -                                            bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
    -                                splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    +                FlowFile headerFlowFile = null;
    +                long headerCrlfLength = 0;
    +                if (headerSplitInfoRef.get() != null) {
    +                    headerFlowFile = session.clone(flowFile, headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
    +                    headerCrlfLength = headerSplitInfoRef.get().trimmedLength;
    +                }
    +                int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
    +                String fragmentId = UUID.randomUUID().toString();
    +
    +                if (computedSplitsInfo.size() == 0) {
    +                    FlowFile splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
    +                    splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, 0, splitFlowFile.getSize(),
    +                            fragmentId, fragmentIndex++, 0, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                    splitFlowFiles.add(splitFlowFile);
    +                } else {
    +                    for (SplitInfo computedSplitInfo : computedSplitsInfo) {
    +                        long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length;
    +                        boolean proceedWithClone = headerFlowFile != null || length > 0;
    +                        if (proceedWithClone) {
    +                            FlowFile splitFlowFile = null;
    +                            if (headerFlowFile != null) {
    +                                if (length > 0) {
    +                                    splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length);
    +                                    splitFlowFile = session.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
                                     } else {
    -                                    // if the number of content lines is a multiple of the SPLIT_LINE_COUNT,
    -                                    // the last flow file will contain just a header; don't forward that one
    -                                    session.remove(splitFile);
    +                                    splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER
                                     }
    -                            }
    -
    -                            // Check for EOF
    -                            in.mark(1);
    -                            if (in.read() == -1) {
    -                                break;
    -                            }
    -                            in.reset();
    -
    -                        } else {
    -                            // We have no header lines, so we can simply demarcate the original File via the
    -                            // ProcessSession#clone method.
    -                            long beforeReadingLines = in.getBytesConsumed() - bufferedPartialLine;
    -                            final SplitInfo info = locateSplitPoint(in, maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
    -                            if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
    -                                bufferedPartialLine = info.bufferedBytes;
    -                            }
    -                            if (info.endOfStream) {
    -                                // stream is out of data
    -                                if (info.lengthBytes > 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = System.nanoTime() - startNanos;
    -                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; total processing time = {} ms",
    -                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    -                                break;
                                 } else {
    -                                if (info.lengthBytes != 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    info.lengthBytes -= bufferedPartialLine;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = System.nanoTime() - startNanos;
    -                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; total processing time = {} ms",
    -                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    +                                splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length);
                                 }
    +
    +                            splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++,
    +                                    computedSplitsInfo.size(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                            splitFlowFiles.add(splitFlowFile);
                             }
                         }
                     }
    -            }
    -        });
     
    -        if (errorMessage.get() != null) {
    -            logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
    -            session.transfer(flowFile, REL_FAILURE);
    -            if (!splits.isEmpty()) {
    -                session.remove(splits);
    +                getLogger().info("Split " + flowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : "."));
    +                if (headerFlowFile != null) {
    +                    session.remove(headerFlowFile);
    +                }
                 }
    -            return;
    -        }
     
    -        if (!splitInfos.isEmpty()) {
    -            // Create the splits
    -            for (final SplitInfo info : splitInfos) {
    -                FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes);
    -                split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines));
    -                split = session.putAttribute(split, FRAGMENT_SIZE, String.valueOf(info.lengthBytes));
    -                splits.add(split);
    +            if (error.get()) {
    +                session.transfer(flowFile, REL_FAILURE);
    +            } else {
    +                session.transfer(flowFile, REL_ORIGINAL);
    +                session.transfer(splitFlowFiles, REL_SPLITS);
                 }
    -        }
    -        finishFragmentAttributes(session, flowFile, splits);
    -
    -        if (splits.size() > 10) {
    -            logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()});
             } else {
    -            logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits});
    +            context.yield();
             }
    -
    -        session.transfer(flowFile, REL_ORIGINAL);
    -        session.transfer(splits, REL_SPLITS);
         }
     
    -    private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
    -        final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
    -
    -        final String fragmentId = UUID.randomUUID().toString();
    -        final ArrayList<FlowFile> newList = new ArrayList<>(splits);
    -        splits.clear();
    -        for (int i = 1; i <= newList.size(); i++) {
    -            FlowFile ff = newList.get(i - 1);
    -            final Map<String, String> attributes = new HashMap<>();
    -            attributes.put(FRAGMENT_ID, fragmentId);
    -            attributes.put(FRAGMENT_INDEX, String.valueOf(i));
    -            attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
    -            attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
    -            FlowFile newFF = session.putAllAttributes(ff, attributes);
    -            splits.add(newFF);
    -        }
    +    /**
    +     *
    +     */
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        List<ValidationResult> results = new ArrayList<>();
    +        boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    +                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    +        results.add(new ValidationResult.Builder().subject("Maximum Fragment Size").valid(!invalidState)
    +                .explanation("Property must be specified when Line Split Count is 0").build());
    +        return results;
         }
     
    -    private static class SplitInfo {
    -
    -        public long offsetBytes;
    -        public long lengthBytes;
    -        public long lengthLines;
    -        public long bufferedBytes;
    -        public boolean endOfStream;
    -
    -        public SplitInfo() {
    -            this.offsetBytes = 0L;
    -            this.lengthBytes = 0L;
    -            this.lengthLines = 0L;
    -            this.bufferedBytes = 0L;
    -            this.endOfStream = false;
    -        }
    +    /**
    --- End diff --
    
    Should remove blank javadoc lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83258027
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    -        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws IOException {
    -                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and remaining text
    -                        final byte[] headerBytes = headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream rawOut) throws IOException {
    -                                        try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
    -                                                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, headerNewLineBytes));
    -                                            bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
    -                                splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    +                FlowFile headerFlowFile = null;
    +                long headerCrlfLength = 0;
    +                if (headerSplitInfoRef.get() != null) {
    +                    headerFlowFile = session.clone(flowFile, headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
    +                    headerCrlfLength = headerSplitInfoRef.get().trimmedLength;
    +                }
    +                int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
    --- End diff --
    
    I believe that leaving this as 1 is correct, and that it should not be changed to 0. Changing this would break backward compatibility and provides no real benefit. Though 0 is a more common starting point for developers, 1 is also used commonly enough, and 1 was used as the starting point here specifically because if you look at FlowFile attributes and see, for example, a 3, it is more intuitive to think that this is the 3rd one in a series, rather than the 4th one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83262537
  
    --- Diff: nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/TextLineDemarcator.java ---
    @@ -0,0 +1,227 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.stream.io.util;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStream;
    +
    +/**
    + * Implementation of demarcator of text lines in the provided
    + * {@link InputStream}. It works similar to the {@link BufferedReader} and its
    + * {@link BufferedReader#readLine()} methods except that it does not create a
    + * String representing the text line and instead returns the offset info for the
    + * computed text line. See {@link #nextOffsetInfo()} and
    + * {@link #nextOffsetInfo(byte[])} for more details.
    + * <p>
    + * This class is NOT thread-safe.
    + * </p>
    + */
    +public class TextLineDemarcator {
    +
    +    private final static int INIT_BUFFER_SIZE = 8192;
    +
    +    private final InputStream is;
    +
    +    private final int initialBufferSize;
    +
    +    private byte[] buffer;
    +
    +    private int index;
    +
    +    private int mark;
    +
    +    private long offset;
    +
    +    private int bufferLength;
    +
    +    /**
    +     * Constructs an instance of demarcator with provided {@link InputStream}
    +     * and default buffer size.
    +     */
    +    public TextLineDemarcator(InputStream is) {
    +        this(is, INIT_BUFFER_SIZE);
    +    }
    +
    +    /**
    +     * Constructs an instance of demarcator with provided {@link InputStream}
    +     * and initial buffer size.
    +     */
    +    public TextLineDemarcator(InputStream is, int initialBufferSize) {
    +        if (is == null) {
    +            throw new IllegalArgumentException("'is' must not be null.");
    +        }
    +        if (initialBufferSize < 1) {
    +            throw new IllegalArgumentException("'initialBufferSize' must be > 0.");
    +        }
    +        this.is = is;
    +        this.initialBufferSize = initialBufferSize;
    +        this.buffer = new byte[initialBufferSize];
    +    }
    +
    +    /**
    +     * Will compute the next <i>offset info</i> for a
    +     * text line (line terminated by either '\r', '\n' or '\r\n').
    +     * <br>
    +     * The <i>offset info</i> computed and returned as <code>long[]</code> consisting of
    +     * 4 elements <code>{startOffset, length, crlfLength, startsWithMatch}</code>.
    +     *  <ul>
    +     *    <li><i>startOffset</i> - the offset in the overall stream which represents the beginning of the text line</li>
    +     *    <li><i>length</i> - length of the text line including CRLF characters</li>
    +     *    <li><i>crlfLength</i> - the length of the CRLF. Could be either 1 (if line ends with '\n' or '\r')
    +     *                                          or 2 (if line ends with '\r\n').</li>
    +     *    <li><i>startsWithMatch</i> - value is always 1. See {@link #nextOffsetInfo(byte[])} for more info.</li>
    +     *  </ul>
    +     *
    +     * @return offset info as <code>long[]</code>
    +     */
    +    public long[] nextOffsetInfo() {
    --- End diff --
    
    Why are we returning a long[] here instead of a POJO? This makes the code more difficult to follow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83256735
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    -        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws IOException {
    -                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and remaining text
    -                        final byte[] headerBytes = headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream rawOut) throws IOException {
    -                                        try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
    -                                                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, headerNewLineBytes));
    -                                            bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
    -                                splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    --- End diff --
    
    We should fail-fast here - if error.get() then transfer to failure and return, rather than indenting the next 50 lines or so within a new block


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83260507
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    -        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws IOException {
    -                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and remaining text
    -                        final byte[] headerBytes = headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream rawOut) throws IOException {
    -                                        try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
    -                                                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, headerNewLineBytes));
    -                                            bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
    -                                splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    +                FlowFile headerFlowFile = null;
    +                long headerCrlfLength = 0;
    +                if (headerSplitInfoRef.get() != null) {
    +                    headerFlowFile = session.clone(flowFile, headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
    +                    headerCrlfLength = headerSplitInfoRef.get().trimmedLength;
    +                }
    +                int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
    +                String fragmentId = UUID.randomUUID().toString();
    +
    +                if (computedSplitsInfo.size() == 0) {
    +                    FlowFile splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
    +                    splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, 0, splitFlowFile.getSize(),
    +                            fragmentId, fragmentIndex++, 0, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                    splitFlowFiles.add(splitFlowFile);
    +                } else {
    +                    for (SplitInfo computedSplitInfo : computedSplitsInfo) {
    +                        long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length;
    +                        boolean proceedWithClone = headerFlowFile != null || length > 0;
    +                        if (proceedWithClone) {
    +                            FlowFile splitFlowFile = null;
    +                            if (headerFlowFile != null) {
    +                                if (length > 0) {
    +                                    splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length);
    +                                    splitFlowFile = session.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
                                     } else {
    -                                    // if the number of content lines is a multiple of the SPLIT_LINE_COUNT,
    -                                    // the last flow file will contain just a header; don't forward that one
    -                                    session.remove(splitFile);
    +                                    splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER
                                     }
    -                            }
    -
    -                            // Check for EOF
    -                            in.mark(1);
    -                            if (in.read() == -1) {
    -                                break;
    -                            }
    -                            in.reset();
    -
    -                        } else {
    -                            // We have no header lines, so we can simply demarcate the original File via the
    -                            // ProcessSession#clone method.
    -                            long beforeReadingLines = in.getBytesConsumed() - bufferedPartialLine;
    -                            final SplitInfo info = locateSplitPoint(in, maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
    -                            if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
    -                                bufferedPartialLine = info.bufferedBytes;
    -                            }
    -                            if (info.endOfStream) {
    -                                // stream is out of data
    -                                if (info.lengthBytes > 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = System.nanoTime() - startNanos;
    -                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; total processing time = {} ms",
    -                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    -                                break;
                                 } else {
    -                                if (info.lengthBytes != 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    info.lengthBytes -= bufferedPartialLine;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = System.nanoTime() - startNanos;
    -                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; total processing time = {} ms",
    -                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    +                                splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length);
                                 }
    +
    +                            splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++,
    +                                    computedSplitsInfo.size(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                            splitFlowFiles.add(splitFlowFile);
                             }
                         }
                     }
    -            }
    -        });
     
    -        if (errorMessage.get() != null) {
    -            logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
    -            session.transfer(flowFile, REL_FAILURE);
    -            if (!splits.isEmpty()) {
    -                session.remove(splits);
    +                getLogger().info("Split " + flowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : "."));
    +                if (headerFlowFile != null) {
    +                    session.remove(headerFlowFile);
    +                }
                 }
    -            return;
    -        }
     
    -        if (!splitInfos.isEmpty()) {
    -            // Create the splits
    -            for (final SplitInfo info : splitInfos) {
    -                FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes);
    -                split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines));
    -                split = session.putAttribute(split, FRAGMENT_SIZE, String.valueOf(info.lengthBytes));
    -                splits.add(split);
    +            if (error.get()) {
    +                session.transfer(flowFile, REL_FAILURE);
    +            } else {
    +                session.transfer(flowFile, REL_ORIGINAL);
    +                session.transfer(splitFlowFiles, REL_SPLITS);
                 }
    -        }
    -        finishFragmentAttributes(session, flowFile, splits);
    -
    -        if (splits.size() > 10) {
    -            logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()});
             } else {
    -            logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits});
    +            context.yield();
             }
    -
    -        session.transfer(flowFile, REL_ORIGINAL);
    -        session.transfer(splits, REL_SPLITS);
         }
     
    -    private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
    -        final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
    -
    -        final String fragmentId = UUID.randomUUID().toString();
    -        final ArrayList<FlowFile> newList = new ArrayList<>(splits);
    -        splits.clear();
    -        for (int i = 1; i <= newList.size(); i++) {
    -            FlowFile ff = newList.get(i - 1);
    -            final Map<String, String> attributes = new HashMap<>();
    -            attributes.put(FRAGMENT_ID, fragmentId);
    -            attributes.put(FRAGMENT_INDEX, String.valueOf(i));
    -            attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
    -            attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
    -            FlowFile newFF = session.putAllAttributes(ff, attributes);
    -            splits.add(newFF);
    -        }
    +    /**
    +     *
    +     */
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    +        List<ValidationResult> results = new ArrayList<>();
    +        boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    +                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    +        results.add(new ValidationResult.Builder().subject("Maximum Fragment Size").valid(!invalidState)
    +                .explanation("Property must be specified when Line Split Count is 0").build());
    +        return results;
         }
     
    -    private static class SplitInfo {
    -
    -        public long offsetBytes;
    -        public long lengthBytes;
    -        public long lengthLines;
    -        public long bufferedBytes;
    -        public boolean endOfStream;
    -
    -        public SplitInfo() {
    -            this.offsetBytes = 0L;
    -            this.lengthBytes = 0L;
    -            this.lengthLines = 0L;
    -            this.bufferedBytes = 0L;
    -            this.endOfStream = false;
    -        }
    +    /**
    +     *
    +     */
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return Collections.unmodifiableList(properties);
         }
     
    -    public static class EndOfLineBuffer {
    -        private static final byte CARRIAGE_RETURN = (byte) '\r';
    -        private static final byte NEWLINE = (byte) '\n';
    -
    -        private final BitSet buffer = new BitSet();
    -        private int index = 0;
    -
    -        public void clear() {
    -            index = 0;
    -        }
    -
    -        public void addEndOfLine(final boolean carriageReturn, final boolean newLine) {
    -            buffer.set(index++, carriageReturn);
    -            buffer.set(index++, newLine);
    -        }
    -
    -        private void drainTo(final OutputStream out) throws IOException {
    -            for (int i = 0; i < index; i += 2) {
    -                final boolean cr = buffer.get(i);
    -                final boolean nl = buffer.get(i + 1);
    -
    -                // we've consumed all data in the buffer
    -                if (!cr && !nl) {
    -                    return;
    -                }
    -
    -                if (cr) {
    -                    out.write(CARRIAGE_RETURN);
    +    /**
    --- End diff --
    
    Should remove blank javadoc lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83256378
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    -        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws IOException {
    -                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to failure.");
    --- End diff --
    
    We should probably be logging e.toString() instead of e.getMessage(), as it provides more details about what went wrong. We should also be logging a toString() of the FlowFile that is being routed to failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/1116


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial comit of perf improvements on Spl...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r82414967
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java ---
    @@ -798,6 +798,7 @@ public void testWithSplitThatStartsWithNewLine() {
         }
     
         @Test
    +    @Ignore // temporary, fixing it
    --- End diff --
    
    @olegz did you overlook this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial commit of perf improvements on Sp...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r83260562
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---
    @@ -150,548 +145,320 @@
                 .description("If a file cannot be split for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
                 .build();
     
    -    private List<PropertyDescriptor> properties;
    -    private Set<Relationship> relationships;
    +    private static final List<PropertyDescriptor> properties;
    +    private static final Set<Relationship> relationships;
     
    -    @Override
    -    protected void init(final ProcessorInitializationContext context) {
    -        final List<PropertyDescriptor> properties = new ArrayList<>();
    +    static {
    +        properties = new ArrayList<>();
             properties.add(LINE_SPLIT_COUNT);
             properties.add(FRAGMENT_MAX_SIZE);
             properties.add(HEADER_LINE_COUNT);
             properties.add(HEADER_MARKER);
             properties.add(REMOVE_TRAILING_NEWLINES);
    -        this.properties = Collections.unmodifiableList(properties);
     
    -        final Set<Relationship> relationships = new HashSet<>();
    +        relationships = new HashSet<>();
             relationships.add(REL_ORIGINAL);
             relationships.add(REL_SPLITS);
             relationships.add(REL_FAILURE);
    -        this.relationships = Collections.unmodifiableSet(relationships);
         }
     
    -    @Override
    -    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
    -        List<ValidationResult> results = new ArrayList<>();
    -
    -        final boolean invalidState = (validationContext.getProperty(LINE_SPLIT_COUNT).asInteger() == 0
    -                && !validationContext.getProperty(FRAGMENT_MAX_SIZE).isSet());
    -
    -        results.add(new ValidationResult.Builder()
    -            .subject("Maximum Fragment Size")
    -            .valid(!invalidState)
    -            .explanation("Property must be specified when Line Split Count is 0")
    -            .build()
    -        );
    -        return results;
    -    }
    -
    -    @Override
    -    public Set<Relationship> getRelationships() {
    -        return relationships;
    -    }
    -
    -    @Override
    -    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return properties;
    -    }
    -
    -    private int readLines(final InputStream in, final int maxNumLines, final long maxByteCount, final OutputStream out,
    -                          final boolean includeLineDelimiter, final byte[] leadingNewLineBytes) throws IOException {
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -
    -        byte[] leadingBytes = leadingNewLineBytes;
    -        int numLines = 0;
    -        long totalBytes = 0L;
    -        for (int i = 0; i < maxNumLines; i++) {
    -            final EndOfLineMarker eolMarker = countBytesToSplitPoint(in, out, totalBytes, maxByteCount, includeLineDelimiter, eolBuffer, leadingBytes);
    -            final long bytes = eolMarker.getBytesConsumed();
    -            leadingBytes = eolMarker.getLeadingNewLineBytes();
    -
    -            if (includeLineDelimiter && out != null) {
    -                if (leadingBytes != null) {
    -                    out.write(leadingBytes);
    -                    leadingBytes = null;
    -                }
    -                eolBuffer.drainTo(out);
    -            }
    -            totalBytes += bytes;
    -            if (bytes <= 0) {
    -                return numLines;
    -            }
    -            numLines++;
    -            if (totalBytes >= maxByteCount) {
    -                break;
    -            }
    -        }
    -        return numLines;
    -    }
    -
    -    private EndOfLineMarker countBytesToSplitPoint(final InputStream in, final OutputStream out, final long bytesReadSoFar, final long maxSize,
    -                                                   final boolean includeLineDelimiter, final EndOfLineBuffer eolBuffer, final byte[] leadingNewLineBytes) throws IOException {
    -        long bytesRead = 0L;
    -        final ByteArrayOutputStream buffer;
    -        if (out != null) {
    -            buffer = new ByteArrayOutputStream();
    -        } else {
    -            buffer = null;
    -        }
    -        byte[] bytesToWriteFirst = leadingNewLineBytes;
    -
    -        in.mark(Integer.MAX_VALUE);
    -        while (true) {
    -            final int nextByte = in.read();
    -
    -            // if we hit end of stream we're done
    -            if (nextByte == -1) {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, true, bytesToWriteFirst);  // bytesToWriteFirst should be "null"?
    -            }
    +    private volatile boolean removeTrailingNewLines;
     
    -            // Verify leading bytes do not violate size limitation
    -            if (bytesToWriteFirst != null && (bytesToWriteFirst.length + bytesRead) > (maxSize - bytesReadSoFar) && includeLineDelimiter) {
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    -            // Write leadingNewLines, if appropriate
    -            if ( buffer != null && includeLineDelimiter && bytesToWriteFirst != null) {
    -                bytesRead += bytesToWriteFirst.length;
    -                buffer.write(bytesToWriteFirst);
    -                bytesToWriteFirst = null;
    -            }
    -            // buffer the output
    -            bytesRead++;
    -            if (buffer != null && nextByte != '\n' && nextByte != '\r') {
    -                if (bytesToWriteFirst != null) {
    -                    buffer.write(bytesToWriteFirst);
    -                }
    -                bytesToWriteFirst = null;
    -                eolBuffer.drainTo(buffer);
    -                eolBuffer.clear();
    -                buffer.write(nextByte);
    -            }
    +    private volatile long maxSplitSize;
     
    -            // check the size limit
    -            if (bytesRead > (maxSize-bytesReadSoFar) && bytesReadSoFar > 0) {
    -                in.reset();
    -                if (buffer != null) {
    -                    buffer.close();
    -                }
    -                return new EndOfLineMarker(-1, eolBuffer, false, leadingNewLineBytes);
    -            }
    +    private volatile int lineCount;
     
    -            // if we have a new line, then we're done
    -            if (nextByte == '\n') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                    eolBuffer.addEndOfLine(false, true);
    -                }
    -                return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -            }
    +    private volatile int headerLineCount;
     
    -            // Determine if \n follows \r; in either case, end of line has been reached
    -            if (nextByte == '\r') {
    -                if (buffer != null) {
    -                    buffer.writeTo(out);
    -                    buffer.close();
    -                }
    -                in.mark(1);
    -                final int lookAheadByte = in.read();
    -                if (lookAheadByte == '\n') {
    -                    eolBuffer.addEndOfLine(true, true);
    -                    return new EndOfLineMarker(bytesRead + 1, eolBuffer, false, bytesToWriteFirst);
    -                } else {
    -                    in.reset();
    -                    eolBuffer.addEndOfLine(true, false);
    -                    return new EndOfLineMarker(bytesRead, eolBuffer, false, bytesToWriteFirst);
    -                }
    -            }
    -        }
    -    }
    +    private volatile String headerMarker;
     
    -    private SplitInfo locateSplitPoint(final InputStream in, final int numLines, final boolean keepAllNewLines, final long maxSize,
    -                                       final long bufferedBytes) throws IOException {
    -        final SplitInfo info = new SplitInfo();
    -        final EndOfLineBuffer eolBuffer = new EndOfLineBuffer();
    -        int lastByte = -1;
    -        info.lengthBytes = bufferedBytes;
    -        long lastEolBufferLength = 0L;
    -
    -        while ((info.lengthLines < numLines || (info.lengthLines == numLines && lastByte == '\r'))
    -                && (((info.lengthBytes + eolBuffer.length()) < maxSize) || info.lengthLines == 0)
    -                && eolBuffer.length() < maxSize) {
    -            in.mark(1);
    -            final int nextByte = in.read();
    -            // Check for \n following \r on last line
    -            if (info.lengthLines == numLines && lastByte == '\r' && nextByte != '\n') {
    -                in.reset();
    -                break;
    -            }
    -            switch (nextByte) {
    -                case -1:
    -                    info.endOfStream = true;
    -                    if (keepAllNewLines) {
    -                        info.lengthBytes += eolBuffer.length();
    -                    }
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    return info;
    -                case '\r':
    -                    eolBuffer.addEndOfLine(true, false);
    -                    info.lengthLines++;
    -                    info.bufferedBytes = 0;
    -                    break;
    -                case '\n':
    -                    eolBuffer.addEndOfLine(false, true);
    -                    if (lastByte != '\r') {
    -                        info.lengthLines++;
    -                    }
    -                    info.bufferedBytes = 0;
    -                    break;
    -                default:
    -                    if (eolBuffer.length() > 0) {
    -                        info.lengthBytes += eolBuffer.length();
    -                        lastEolBufferLength = eolBuffer.length();
    -                        eolBuffer.clear();
    -                    }
    -                    info.lengthBytes++;
    -                    info.bufferedBytes++;
    -                    break;
    -            }
    -            lastByte = nextByte;
    -        }
    -        // if current line exceeds size and not keeping eol characters, remove previously applied eol characters
    -        if ((info.lengthBytes + eolBuffer.length()) >= maxSize && !keepAllNewLines) {
    -            info.lengthBytes -= lastEolBufferLength;
    -        }
    -        if (keepAllNewLines) {
    -            info.lengthBytes += eolBuffer.length();
    -        }
    -        return info;
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(relationships);
         }
     
    -    private int countHeaderLines(final ByteCountingInputStream in,
    -                                 final String headerMarker) throws IOException {
    -        int headerInfo = 0;
    -
    -        final BufferedReader br = new BufferedReader(new InputStreamReader(in));
    -        in.mark(Integer.MAX_VALUE);
    -        String line = br.readLine();
    -        while (line != null) {
    -            // if line is not a header line, reset stream and return header counts
    -            if (!line.startsWith(headerMarker)) {
    -                in.reset();
    -                return headerInfo;
    -            } else {
    -                headerInfo++;
    -            }
    -            line = br.readLine();
    -        }
    -        in.reset();
    -        return headerInfo;
    +    /**
    +     *
    +     */
    +    @OnScheduled
    +    public void onSchedule(ProcessContext context) {
    +        this.removeTrailingNewLines = context.getProperty(REMOVE_TRAILING_NEWLINES).isSet()
    +                ? context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean() : false;
    +        this.maxSplitSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    +                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    +        this.lineCount = context.getProperty(LINE_SPLIT_COUNT).asInteger();
    +        this.headerLineCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    +        this.headerMarker = context.getProperty(HEADER_MARKER).getValue();
         }
     
    +    /**
    +     * Will split the incoming stream releasing all splits as FlowFile at once.
    +     */
         @Override
    -    public void onTrigger(final ProcessContext context, final ProcessSession session) {
    -        final FlowFile flowFile = session.get();
    -        if (flowFile == null) {
    -            return;
    -        }
    -
    -        final ComponentLog logger = getLogger();
    -        final int headerCount = context.getProperty(HEADER_LINE_COUNT).asInteger();
    -        final int maxLineCount = (context.getProperty(LINE_SPLIT_COUNT).asInteger() == 0)
    -                ? Integer.MAX_VALUE : context.getProperty(LINE_SPLIT_COUNT).asInteger();
    -        final long maxFragmentSize = context.getProperty(FRAGMENT_MAX_SIZE).isSet()
    -                ? context.getProperty(FRAGMENT_MAX_SIZE).asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE;
    -        final String headerMarker = context.getProperty(HEADER_MARKER).getValue();
    -        final boolean includeLineDelimiter = !context.getProperty(REMOVE_TRAILING_NEWLINES).asBoolean();
    -
    -        final AtomicReference<String> errorMessage = new AtomicReference<>(null);
    -        final ArrayList<SplitInfo> splitInfos = new ArrayList<>();
    -
    -        final long startNanos = System.nanoTime();
    -        final List<FlowFile> splits = new ArrayList<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(final InputStream rawIn) throws IOException {
    -                try (final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);
    -                        final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
    -
    -                    long bufferedPartialLine = 0;
    -
    -                    // if we have header lines, copy them into a ByteArrayOutputStream
    -                    final ByteArrayOutputStream headerStream = new ByteArrayOutputStream();
    -                    // Determine the number of lines of header, priority given to HEADER_LINE_COUNT property
    -                    int headerInfoLineCount = 0;
    -                    if (headerCount > 0) {
    -                        headerInfoLineCount = headerCount;
    -                    } else {
    -                        if (headerMarker != null) {
    -                            headerInfoLineCount = countHeaderLines(in, headerMarker);
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile != null) {
    +            AtomicBoolean error = new AtomicBoolean();
    +            List<FlowFile> splitFlowFiles = new ArrayList<>();
    +            List<SplitInfo> computedSplitsInfo = new ArrayList<>();
    +            AtomicReference<SplitInfo> headerSplitInfoRef = new AtomicReference<>();
    +            session.read(flowFile, new InputStreamCallback() {
    +                @Override
    +                public void process(InputStream in) throws IOException {
    +                    TextLineDemarcator demarcator = new TextLineDemarcator(in);
    +                    SplitInfo splitInfo = null;
    +                    long startOffset = 0;
    +
    +                    // Compute fragment representing the header (if available)
    +                    long start = System.nanoTime();
    +                    try {
    +                        if (SplitText.this.headerLineCount > 0) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, SplitText.this.headerLineCount, null, null);
    +                            if (splitInfo.lineCount < SplitText.this.headerLineCount) {
    +                                error.set(true);
    +                                getLogger().error("Unable to split " + flowFile + " due to insufficient amount of header lines. Required "
    +                                        + SplitText.this.headerLineCount + " but was " + splitInfo.lineCount + ". Routing to failure.");
    +                            }
    +                        } else if (SplitText.this.headerMarker != null) {
    +                            splitInfo = SplitText.this.computeHeader(demarcator, startOffset, Long.MAX_VALUE, SplitText.this.headerMarker.getBytes(StandardCharsets.UTF_8), null);
                             }
    +                        headerSplitInfoRef.set(splitInfo);
    +                    } catch (IllegalStateException e) {
    +                        error.set(true);
    +                        getLogger().error(e.getMessage() + " Routing to failure.");
                         }
     
    -                    final byte[] headerNewLineBytes;
    -                    final byte[] headerBytesWithoutTrailingNewLines;
    -                    if (headerInfoLineCount > 0) {
    -                        final int headerLinesCopied = readLines(in, headerInfoLineCount, Long.MAX_VALUE, headerStream, true, null);
    -
    -                        if (headerLinesCopied < headerInfoLineCount) {
    -                            errorMessage.set("Header Line Count is set to " + headerInfoLineCount + " but file had only " + headerLinesCopied + " lines");
    -                            return;
    +                    // Compute and collect fragments representing the individual splits
    +                    if (!error.get()) {
    +                        if (headerSplitInfoRef.get() != null) {
    +                            startOffset = headerSplitInfoRef.get().length;
                             }
    -
    -                        // Break header apart into trailing newlines and remaining text
    -                        final byte[] headerBytes = headerStream.toByteArray();
    -                        int headerNewLineByteCount = 0;
    -                        for (int i = headerBytes.length - 1; i >= 0; i--) {
    -                            final byte headerByte = headerBytes[i];
    -
    -                            if (headerByte == '\r' || headerByte == '\n') {
    -                                headerNewLineByteCount++;
    -                            } else {
    -                                break;
    -                            }
    +                        long preAccumulatedLength = startOffset;
    +                        while ((splitInfo = SplitText.this.nextSplit(demarcator, startOffset, SplitText.this.lineCount, splitInfo, preAccumulatedLength)) != null) {
    +                            computedSplitsInfo.add(splitInfo);
    +                            startOffset += splitInfo.length;
                             }
    -
    -                        if (headerNewLineByteCount == 0) {
    -                            headerNewLineBytes = null;
    -                            headerBytesWithoutTrailingNewLines = headerBytes;
    -                        } else {
    -                            headerNewLineBytes = new byte[headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, headerBytes.length - headerNewLineByteCount, headerNewLineBytes, 0, headerNewLineByteCount);
    -
    -                            headerBytesWithoutTrailingNewLines = new byte[headerBytes.length - headerNewLineByteCount];
    -                            System.arraycopy(headerBytes, 0, headerBytesWithoutTrailingNewLines, 0, headerBytes.length - headerNewLineByteCount);
    +                        long stop = System.nanoTime();
    +                        if (getLogger().isDebugEnabled()) {
    +                            getLogger().debug("Computed splits in " + (stop - start) + " milliseconds.");
                             }
    -                    } else {
    -                        headerBytesWithoutTrailingNewLines = null;
    -                        headerNewLineBytes = null;
                         }
    -
    -                    while (true) {
    -                        if (headerInfoLineCount > 0) {
    -                            // if we have header lines, create a new FlowFile, copy the header lines to that file,
    -                            // and then start copying lines
    -                            final AtomicInteger linesCopied = new AtomicInteger(0);
    -                            final AtomicLong bytesCopied = new AtomicLong(0L);
    -                            FlowFile splitFile = session.create(flowFile);
    -                            try {
    -                                splitFile = session.write(splitFile, new OutputStreamCallback() {
    -                                    @Override
    -                                    public void process(final OutputStream rawOut) throws IOException {
    -                                        try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
    -                                                final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(out)) {
    -                                            countingOut.write(headerBytesWithoutTrailingNewLines);
    -                                            //readLines has an offset of countingOut.getBytesWritten() to allow for header bytes written already
    -                                            linesCopied.set(readLines(in, maxLineCount, maxFragmentSize - countingOut.getBytesWritten(), countingOut,
    -                                                    includeLineDelimiter, headerNewLineBytes));
    -                                            bytesCopied.set(countingOut.getBytesWritten());
    -                                        }
    -                                    }
    -                                });
    -                                splitFile = session.putAttribute(splitFile, SPLIT_LINE_COUNT, String.valueOf(linesCopied.get()));
    -                                splitFile = session.putAttribute(splitFile, FRAGMENT_SIZE, String.valueOf(bytesCopied.get()));
    -                                logger.debug("Created Split File {} with {} lines, {} bytes", new Object[]{splitFile, linesCopied.get(), bytesCopied.get()});
    -                            } finally {
    -                                if (linesCopied.get() > 0) {
    -                                    splits.add(splitFile);
    +                }
    +            });
    +            if (!error.get()) {
    +                FlowFile headerFlowFile = null;
    +                long headerCrlfLength = 0;
    +                if (headerSplitInfoRef.get() != null) {
    +                    headerFlowFile = session.clone(flowFile, headerSplitInfoRef.get().startOffset, headerSplitInfoRef.get().length);
    +                    headerCrlfLength = headerSplitInfoRef.get().trimmedLength;
    +                }
    +                int fragmentIndex = 1; // set to 1 to preserve the existing behavior *only*. Perhaps should be deprecated to follow the 0,1,2... scheme
    +                String fragmentId = UUID.randomUUID().toString();
    +
    +                if (computedSplitsInfo.size() == 0) {
    +                    FlowFile splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength);
    +                    splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, 0, splitFlowFile.getSize(),
    +                            fragmentId, fragmentIndex++, 0, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                    splitFlowFiles.add(splitFlowFile);
    +                } else {
    +                    for (SplitInfo computedSplitInfo : computedSplitsInfo) {
    +                        long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length;
    +                        boolean proceedWithClone = headerFlowFile != null || length > 0;
    +                        if (proceedWithClone) {
    +                            FlowFile splitFlowFile = null;
    +                            if (headerFlowFile != null) {
    +                                if (length > 0) {
    +                                    splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length);
    +                                    splitFlowFile = session.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
                                     } else {
    -                                    // if the number of content lines is a multiple of the SPLIT_LINE_COUNT,
    -                                    // the last flow file will contain just a header; don't forward that one
    -                                    session.remove(splitFile);
    +                                    splitFlowFile = session.clone(flowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER
                                     }
    -                            }
    -
    -                            // Check for EOF
    -                            in.mark(1);
    -                            if (in.read() == -1) {
    -                                break;
    -                            }
    -                            in.reset();
    -
    -                        } else {
    -                            // We have no header lines, so we can simply demarcate the original File via the
    -                            // ProcessSession#clone method.
    -                            long beforeReadingLines = in.getBytesConsumed() - bufferedPartialLine;
    -                            final SplitInfo info = locateSplitPoint(in, maxLineCount, includeLineDelimiter, maxFragmentSize, bufferedPartialLine);
    -                            if (context.getProperty(FRAGMENT_MAX_SIZE).isSet()) {
    -                                bufferedPartialLine = info.bufferedBytes;
    -                            }
    -                            if (info.endOfStream) {
    -                                // stream is out of data
    -                                if (info.lengthBytes > 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = System.nanoTime() - startNanos;
    -                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; total processing time = {} ms",
    -                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    -                                break;
                                 } else {
    -                                if (info.lengthBytes != 0) {
    -                                    info.offsetBytes = beforeReadingLines;
    -                                    info.lengthBytes -= bufferedPartialLine;
    -                                    splitInfos.add(info);
    -                                    final long procNanos = System.nanoTime() - startNanos;
    -                                    final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
    -                                    logger.debug("Detected start of Split File in {} at byte offset {} with a length of {} bytes; "
    -                                                    + "total splits = {}; total processing time = {} ms",
    -                                            new Object[]{flowFile, beforeReadingLines, info.lengthBytes, splitInfos.size(), procMillis});
    -                                }
    +                                splitFlowFile = session.clone(flowFile, computedSplitInfo.startOffset, length);
                                 }
    +
    +                            splitFlowFile = SplitText.this.updateAttributes(session, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++,
    +                                    computedSplitsInfo.size(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
    +                            splitFlowFiles.add(splitFlowFile);
                             }
                         }
                     }
    -            }
    -        });
     
    -        if (errorMessage.get() != null) {
    -            logger.error("Unable to split {} due to {}; routing to failure", new Object[]{flowFile, errorMessage.get()});
    -            session.transfer(flowFile, REL_FAILURE);
    -            if (!splits.isEmpty()) {
    -                session.remove(splits);
    +                getLogger().info("Split " + flowFile + " into " + splitFlowFiles.size() + " flow files" + (headerFlowFile != null ? " containing headers." : "."));
    +                if (headerFlowFile != null) {
    +                    session.remove(headerFlowFile);
    +                }
                 }
    -            return;
    -        }
     
    -        if (!splitInfos.isEmpty()) {
    -            // Create the splits
    -            for (final SplitInfo info : splitInfos) {
    -                FlowFile split = session.clone(flowFile, info.offsetBytes, info.lengthBytes);
    -                split = session.putAttribute(split, SPLIT_LINE_COUNT, String.valueOf(info.lengthLines));
    -                split = session.putAttribute(split, FRAGMENT_SIZE, String.valueOf(info.lengthBytes));
    -                splits.add(split);
    +            if (error.get()) {
    +                session.transfer(flowFile, REL_FAILURE);
    +            } else {
    +                session.transfer(flowFile, REL_ORIGINAL);
    +                session.transfer(splitFlowFiles, REL_SPLITS);
                 }
    -        }
    -        finishFragmentAttributes(session, flowFile, splits);
    -
    -        if (splits.size() > 10) {
    -            logger.info("Split {} into {} files", new Object[]{flowFile, splits.size()});
             } else {
    -            logger.info("Split {} into {} files: {}", new Object[]{flowFile, splits.size(), splits});
    +            context.yield();
             }
    -
    -        session.transfer(flowFile, REL_ORIGINAL);
    -        session.transfer(splits, REL_SPLITS);
         }
     
    -    private void finishFragmentAttributes(final ProcessSession session, final FlowFile source, final List<FlowFile> splits) {
    -        final String originalFilename = source.getAttribute(CoreAttributes.FILENAME.key());
    -
    -        final String fragmentId = UUID.randomUUID().toString();
    -        final ArrayList<FlowFile> newList = new ArrayList<>(splits);
    -        splits.clear();
    -        for (int i = 1; i <= newList.size(); i++) {
    -            FlowFile ff = newList.get(i - 1);
    -            final Map<String, String> attributes = new HashMap<>();
    -            attributes.put(FRAGMENT_ID, fragmentId);
    -            attributes.put(FRAGMENT_INDEX, String.valueOf(i));
    -            attributes.put(FRAGMENT_COUNT, String.valueOf(newList.size()));
    -            attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFilename);
    -            FlowFile newFF = session.putAllAttributes(ff, attributes);
    -            splits.add(newFF);
    -        }
    +    /**
    --- End diff --
    
    Should remove blank javadoc lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #1116: NIFI-2851 initial comit of perf improvements on Spl...

Posted by olegz <gi...@git.apache.org>.
Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1116#discussion_r82415774
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitText.java ---
    @@ -798,6 +798,7 @@ public void testWithSplitThatStartsWithNewLine() {
         }
     
         @Test
    +    @Ignore // temporary, fixing it
    --- End diff --
    
    No, didn't want to hold the PR as this is kind of an edge case which I hope to address after thinking about it a bit. 
    Basically the assertion fails here ```splits.get(1).assertContentEquals("");``` since it now comes it as new line. But I also wonder why is this a split to begin with with no data?
    So, i wanted to look some more as well as see if it is actually a bug that needs to be fixed, but didn't want to hold the PR since regardless this is an edge case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---