You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/20 06:52:27 UTC

[5/8] incubator-nifi git commit: NIFI-189 Collapsed several commons libs into one

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
new file mode 100644
index 0000000..284cd54
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.InputStream;
+
+/**
+ * This class performs the same function as java.io.ByteArrayInputStream but
+ * does not mark its methods as synchronized
+ */
+public class ByteArrayInputStream extends InputStream {
+
+    /**
+     * An array of bytes that was provided by the creator of the stream.
+     * Elements <code>buf[0]</code> through <code>buf[count-1]</code> are the
+     * only bytes that can ever be read from the stream; element
+     * <code>buf[pos]</code> is the next byte to be read.
+     */
+    protected byte buf[];
+
+    /**
+     * The index of the next character to read from the input stream buffer.
+     * This value should always be nonnegative and not larger than the value of
+     * <code>count</code>. The next byte to be read from the input stream buffer
+     * will be <code>buf[pos]</code>.
+     */
+    protected int pos;
+
+    /**
+     * The currently marked position in the stream. ByteArrayInputStream objects
+     * are marked at position zero by default when constructed. They may be
+     * marked at another position within the buffer by the <code>mark()</code>
+     * method. The current buffer position is set to this point by the
+     * <code>reset()</code> method.
+     * <p>
+     * If no mark has been set, then the value of mark is the offset passed to
+     * the constructor (or 0 if the offset was not supplied).
+     *
+     * @since JDK1.1
+     */
+    protected int mark = 0;
+
+    /**
+     * The index one greater than the last valid character in the input stream
+     * buffer. This value should always be nonnegative and not larger than the
+     * length of <code>buf</code>. It is one greater than the position of the
+     * last byte within <code>buf</code> that can ever be read from the input
+     * stream buffer.
+     */
+    protected int count;
+
+    /**
+     * Creates a <code>ByteArrayInputStream</code> so that it uses
+     * <code>buf</code> as its buffer array. The buffer array is not copied. The
+     * initial value of <code>pos</code> is <code>0</code> and the initial value
+     * of  <code>count</code> is the length of <code>buf</code>.
+     *
+     * @param buf the input buffer.
+     */
+    public ByteArrayInputStream(byte buf[]) {
+        this.buf = buf;
+        this.pos = 0;
+        this.count = buf.length;
+    }
+
+    /**
+     * Creates <code>ByteArrayInputStream</code> that uses <code>buf</code> as
+     * its buffer array. The initial value of <code>pos</code> is
+     * <code>offset</code> and the initial value of <code>count</code> is the
+     * minimum of <code>offset+length</code> and <code>buf.length</code>. The
+     * buffer array is not copied. The buffer's mark is set to the specified
+     * offset.
+     *
+     * @param buf the input buffer.
+     * @param offset the offset in the buffer of the first byte to read.
+     * @param length the maximum number of bytes to read from the buffer.
+     */
+    public ByteArrayInputStream(byte buf[], int offset, int length) {
+        this.buf = buf;
+        this.pos = offset;
+        this.count = Math.min(offset + length, buf.length);
+        this.mark = offset;
+    }
+
+    /**
+     * Reads the next byte of data from this input stream. The value byte is
+     * returned as an <code>int</code> in the range <code>0</code> to
+     * <code>255</code>. If no byte is available because the end of the stream
+     * has been reached, the value <code>-1</code> is returned.
+     * <p>
+     * This <code>read</code> method cannot block.
+     *
+     * @return the next byte of data, or <code>-1</code> if the end of the
+     * stream has been reached.
+     */
+    @Override
+    public int read() {
+        return (pos < count) ? (buf[pos++] & 0xff) : -1;
+    }
+
+    /**
+     * Reads up to <code>len</code> bytes of data into an array of bytes from
+     * this input stream. If <code>pos</code> equals <code>count</code>, then
+     * <code>-1</code> is returned to indicate end of file. Otherwise, the
+     * number <code>k</code> of bytes read is equal to the smaller of
+     * <code>len</code> and <code>count-pos</code>. If <code>k</code> is
+     * positive, then bytes <code>buf[pos]</code> through
+     * <code>buf[pos+k-1]</code> are copied into <code>b[off]</code> through
+     * <code>b[off+k-1]</code> in the manner performed by
+     * <code>System.arraycopy</code>. The value <code>k</code> is added into
+     * <code>pos</code> and <code>k</code> is returned.
+     * <p>
+     * This <code>read</code> method cannot block.
+     *
+     * @param b the buffer into which the data is read.
+     * @param off the start offset in the destination array <code>b</code>
+     * @param len the maximum number of bytes read.
+     * @return the total number of bytes read into the buffer, or
+     * <code>-1</code> if there is no more data because the end of the stream
+     * has been reached.
+     * @exception NullPointerException If <code>b</code> is <code>null</code>.
+     * @exception IndexOutOfBoundsException If <code>off</code> is negative,
+     * <code>len</code> is negative, or <code>len</code> is greater than
+     * <code>b.length - off</code>
+     */
+    @Override
+    public int read(byte b[], int off, int len) {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException();
+        }
+
+        if (pos >= count) {
+            return -1;
+        }
+
+        int avail = count - pos;
+        if (len > avail) {
+            len = avail;
+        }
+        if (len <= 0) {
+            return 0;
+        }
+        System.arraycopy(buf, pos, b, off, len);
+        pos += len;
+        return len;
+    }
+
+    /**
+     * Skips <code>n</code> bytes of input from this input stream. Fewer bytes
+     * might be skipped if the end of the input stream is reached. The actual
+     * number <code>k</code> of bytes to be skipped is equal to the smaller of
+     * <code>n</code> and  <code>count-pos</code>. The value <code>k</code> is
+     * added into <code>pos</code> and <code>k</code> is returned.
+     *
+     * @param n the number of bytes to be skipped.
+     * @return the actual number of bytes skipped.
+     */
+    @Override
+    public long skip(long n) {
+        long k = count - pos;
+        if (n < k) {
+            k = n < 0 ? 0 : n;
+        }
+
+        pos += k;
+        return k;
+    }
+
+    /**
+     * Returns the number of remaining bytes that can be read (or skipped over)
+     * from this input stream.
+     * <p>
+     * The value returned is <code>count&nbsp;- pos</code>, which is the number
+     * of bytes remaining to be read from the input buffer.
+     *
+     * @return the number of remaining bytes that can be read (or skipped over)
+     * from this input stream without blocking.
+     */
+    @Override
+    public int available() {
+        return count - pos;
+    }
+
+    /**
+     * Tests if this <code>InputStream</code> supports mark/reset. The
+     * <code>markSupported</code> method of <code>ByteArrayInputStream</code>
+     * always returns <code>true</code>.
+     *
+     * @since JDK1.1
+     */
+    @Override
+    public boolean markSupported() {
+        return true;
+    }
+
+    /**
+     * Set the current marked position in the stream. ByteArrayInputStream
+     * objects are marked at position zero by default when constructed. They may
+     * be marked at another position within the buffer by this method.
+     * <p>
+     * If no mark has been set, then the value of the mark is the offset passed
+     * to the constructor (or 0 if the offset was not supplied).
+     *
+     * <p>
+     * Note: The <code>readAheadLimit</code> for this class has no meaning.
+     *
+     * @since JDK1.1
+     */
+    @Override
+    public void mark(int readAheadLimit) {
+        mark = pos;
+    }
+
+    /**
+     * Resets the buffer to the marked position. The marked position is 0 unless
+     * another position was marked or an offset was specified in the
+     * constructor.
+     */
+    @Override
+    public void reset() {
+        pos = mark;
+    }
+
+    /**
+     * Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in
+     * this class can be called after the stream has been closed without
+     * generating an <tt>IOException</tt>.
+     * <p>
+     */
+    @Override
+    public void close() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
new file mode 100644
index 0000000..459563b
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+/**
+ * This class provides a more efficient implementation of the
+ * java.io.ByteArrayOutputStream. The efficiency is gained in two ways:
+ * <ul>
+ * <li>The write methods are not synchronized</li>
+ * <li>The class provides {@link #getUnderlyingBuffer()} and
+ * {@link #getBufferLength()}, which can be used to access the underlying byte
+ * array directly, rather than the System.arraycopy that {@link #toByteArray()}
+ * uses
+ * </ul>
+ *
+ */
+public class ByteArrayOutputStream extends OutputStream {
+
+    /**
+     * The buffer where data is stored.
+     */
+    protected byte buf[];
+
+    /**
+     * The number of valid bytes in the buffer.
+     */
+    protected int count;
+
+    /**
+     * Creates a new byte array output stream. The buffer capacity is initially
+     * 32 bytes, though its size increases if necessary.
+     */
+    public ByteArrayOutputStream() {
+        this(32);
+    }
+
+    /**
+     * Creates a new byte array output stream, with a buffer capacity of the
+     * specified size, in bytes.
+     *
+     * @param size the initial size.
+     * @exception IllegalArgumentException if size is negative.
+     */
+    public ByteArrayOutputStream(int size) {
+        if (size < 0) {
+            throw new IllegalArgumentException("Negative initial size: "
+                    + size);
+        }
+        buf = new byte[size];
+    }
+
+    /**
+     * Increases the capacity if necessary to ensure that it can hold at least
+     * the number of elements specified by the minimum capacity argument.
+     *
+     * @param minCapacity the desired minimum capacity
+     * @throws OutOfMemoryError if {@code minCapacity < 0}. This is interpreted
+     * as a request for the unsatisfiably large capacity
+     * {@code (long) Integer.MAX_VALUE + (minCapacity - Integer.MAX_VALUE)}.
+     */
+    private void ensureCapacity(int minCapacity) {
+        // overflow-conscious code
+        if (minCapacity - buf.length > 0) {
+            grow(minCapacity);
+        }
+    }
+
+    /**
+     * Increases the capacity to ensure that it can hold at least the number of
+     * elements specified by the minimum capacity argument.
+     *
+     * @param minCapacity the desired minimum capacity
+     */
+    private void grow(int minCapacity) {
+        // overflow-conscious code
+        int oldCapacity = buf.length;
+        int newCapacity = oldCapacity << 1;
+        if (newCapacity - minCapacity < 0) {
+            newCapacity = minCapacity;
+        }
+        if (newCapacity < 0) {
+            if (minCapacity < 0) // overflow
+            {
+                throw new OutOfMemoryError();
+            }
+            newCapacity = Integer.MAX_VALUE;
+        }
+        buf = Arrays.copyOf(buf, newCapacity);
+    }
+
+    /**
+     * Writes the specified byte to this byte array output stream.
+     *
+     * @param b the byte to be written.
+     */
+    @Override
+    public void write(int b) {
+        ensureCapacity(count + 1);
+        buf[count] = (byte) b;
+        count += 1;
+    }
+
+    /**
+     * Writes <code>len</code> bytes from the specified byte array starting at
+     * offset <code>off</code> to this byte array output stream.
+     *
+     * @param b the data.
+     * @param off the start offset in the data.
+     * @param len the number of bytes to write.
+     */
+    @Override
+    public void write(byte b[], int off, int len) {
+        if ((off < 0) || (off > b.length) || (len < 0)
+                || ((off + len) - b.length > 0)) {
+            throw new IndexOutOfBoundsException();
+        }
+        ensureCapacity(count + len);
+        System.arraycopy(b, off, buf, count, len);
+        count += len;
+    }
+
+    /**
+     * Writes the complete contents of this byte array output stream to the
+     * specified output stream argument, as if by calling the output stream's
+     * write method using <code>out.write(buf, 0, count)</code>.
+     *
+     * @param out the output stream to which to write the data.
+     * @exception IOException if an I/O error occurs.
+     */
+    public void writeTo(OutputStream out) throws IOException {
+        out.write(buf, 0, count);
+    }
+
+    /**
+     * Resets the <code>count</code> field of this byte array output stream to
+     * zero, so that all currently accumulated output in the output stream is
+     * discarded. The output stream can be used again, reusing the already
+     * allocated buffer space.
+     *
+     * @see java.io.ByteArrayInputStream#count
+     */
+    public void reset() {
+        count = 0;
+    }
+
+    /**
+     * Creates a newly allocated byte array. Its size is the current size of
+     * this output stream and the valid contents of the buffer have been copied
+     * into it.
+     *
+     * @return the current contents of this output stream, as a byte array.
+     * @see java.io.ByteArrayOutputStream#size()
+     */
+    public byte toByteArray   () 
+        [] {
+        return Arrays.copyOf(buf, count);
+    }
+
+    /**
+     * Returns the current size of the buffer.
+     *
+     * @return the value of the <code>count</code> field, which is the number of
+     * valid bytes in this output stream.
+     * @see java.io.ByteArrayOutputStream#count
+     */
+    public int size() {
+        return count;
+    }
+
+    /**
+     * Converts the buffer's contents into a string decoding bytes using the
+     * platform's default character set. The length of the new <tt>String</tt>
+     * is a function of the character set, and hence may not be equal to the
+     * size of the buffer.
+     *
+     * <p>
+     * This method always replaces malformed-input and unmappable-character
+     * sequences with the default replacement string for the platform's default
+     * character set. The {@linkplain java.nio.charset.CharsetDecoder} class
+     * should be used when more control over the decoding process is required.
+     *
+     * @return String decoded from the buffer's contents.
+     * @since JDK1.1
+     */
+    @Override
+    public String toString() {
+        return new String(buf, 0, count);
+    }
+
+    /**
+     * Converts the buffer's contents into a string by decoding the bytes using
+     * the specified {@link java.nio.charset.Charset charsetName}. The length of
+     * the new <tt>String</tt> is a function of the charset, and hence may not
+     * be equal to the length of the byte array.
+     *
+     * <p>
+     * This method always replaces malformed-input and unmappable-character
+     * sequences with this charset's default replacement string. The {@link
+     * java.nio.charset.CharsetDecoder} class should be used when more control
+     * over the decoding process is required.
+     *
+     * @param charsetName the name of a supported
+     *              {@linkplain java.nio.charset.Charset <code>charset</code>}
+     * @return String decoded from the buffer's contents.
+     * @exception UnsupportedEncodingException If the named charset is not
+     * supported
+     * @since JDK1.1
+     */
+    public String toString(String charsetName) throws UnsupportedEncodingException {
+        return new String(buf, 0, count, charsetName);
+    }
+
+    /**
+     * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in
+     * this class can be called after the stream has been closed without
+     * generating an <tt>IOException</tt>.
+     * <p>
+     *
+     */
+    @Override
+    public void close() {
+    }
+
+    public byte[] getUnderlyingBuffer() {
+        return buf;
+    }
+
+    public int getBufferLength() {
+        return count;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
new file mode 100644
index 0000000..8294af3
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class ByteCountingInputStream extends InputStream {
+
+    private final InputStream in;
+    private long bytesRead = 0L;
+    private long bytesSkipped = 0L;
+
+    private long bytesSinceMark = 0L;
+
+    public ByteCountingInputStream(final InputStream in) {
+        this.in = in;
+    }
+
+    @Override
+    public int read() throws IOException {
+        final int fromSuper = in.read();
+        if (fromSuper >= 0) {
+            bytesRead++;
+            bytesSinceMark++;
+        }
+        return fromSuper;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        final int fromSuper = in.read(b, off, len);
+        if (fromSuper >= 0) {
+            bytesRead += fromSuper;
+            bytesSinceMark += fromSuper;
+        }
+
+        return fromSuper;
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        final long skipped = in.skip(n);
+        if (skipped >= 0) {
+            bytesSkipped += skipped;
+            bytesSinceMark += skipped;
+        }
+        return skipped;
+    }
+
+    public long getBytesRead() {
+        return bytesRead;
+    }
+
+    public long getBytesSkipped() {
+        return bytesSkipped;
+    }
+
+    public long getBytesConsumed() {
+        return getBytesRead() + getBytesSkipped();
+    }
+
+    @Override
+    public void mark(final int readlimit) {
+        in.mark(readlimit);
+
+        bytesSinceMark = 0L;
+    }
+
+    @Override
+    public boolean markSupported() {
+        return in.markSupported();
+    }
+
+    @Override
+    public void reset() throws IOException {
+        in.reset();
+        bytesRead -= bytesSinceMark;
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
new file mode 100644
index 0000000..d8e1a42
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class ByteCountingOutputStream extends OutputStream {
+
+    private final OutputStream out;
+    private long bytesWritten = 0L;
+
+    public ByteCountingOutputStream(final OutputStream out) {
+        this.out = out;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+        bytesWritten++;
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    ;
+    
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        out.write(b, off, len);
+        bytesWritten += len;
+    }
+
+    public long getBytesWritten() {
+        return bytesWritten;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        out.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
new file mode 100644
index 0000000..1dd90f5
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.DataOutput;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+
+/**
+ * This class is different from java.io.DataOutputStream in that it does
+ * synchronize on its methods.
+ */
+public class DataOutputStream extends FilterOutputStream implements DataOutput {
+
+    /**
+     * The number of bytes written to the data output stream so far. If this
+     * counter overflows, it will be wrapped to Integer.MAX_VALUE.
+     */
+    protected int written;
+
+    /**
+     * bytearr is initialized on demand by writeUTF
+     */
+    private byte[] bytearr = null;
+
+    /**
+     * Creates a new data output stream to write data to the specified
+     * underlying output stream. The counter <code>written</code> is set to
+     * zero.
+     *
+     * @param out the underlying output stream, to be saved for later use.
+     * @see java.io.FilterOutputStream#out
+     */
+    public DataOutputStream(OutputStream out) {
+        super(out);
+    }
+
+    /**
+     * Increases the written counter by the specified value until it reaches
+     * Integer.MAX_VALUE.
+     */
+    private void incCount(int value) {
+        int temp = written + value;
+        if (temp < 0) {
+            temp = Integer.MAX_VALUE;
+        }
+        written = temp;
+    }
+
+    /**
+     * Writes the specified byte (the low eight bits of the argument
+     * <code>b</code>) to the underlying output stream. If no exception is
+     * thrown, the counter <code>written</code> is incremented by
+     * <code>1</code>.
+     * <p>
+     * Implements the <code>write</code> method of <code>OutputStream</code>.
+     *
+     * @param b the <code>byte</code> to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+        incCount(1);
+    }
+
+    /**
+     * Writes <code>len</code> bytes from the specified byte array starting at
+     * offset <code>off</code> to the underlying output stream. If no exception
+     * is thrown, the counter <code>written</code> is incremented by
+     * <code>len</code>.
+     *
+     * @param b the data.
+     * @param off the start offset in the data.
+     * @param len the number of bytes to write.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+        out.write(b, off, len);
+        incCount(len);
+    }
+
+    /**
+     * Flushes this data output stream. This forces any buffered output bytes to
+     * be written out to the stream.
+     * <p>
+     * The <code>flush</code> method of <code>DataOutputStream</code> calls the
+     * <code>flush</code> method of its underlying output stream.
+     *
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     * @see java.io.OutputStream#flush()
+     */
+    @Override
+    public void flush() throws IOException {
+        out.flush();
+    }
+
+    /**
+     * Writes a <code>boolean</code> to the underlying output stream as a 1-byte
+     * value. The value <code>true</code> is written out as the value
+     * <code>(byte)1</code>; the value <code>false</code> is written out as the
+     * value <code>(byte)0</code>. If no exception is thrown, the counter
+     * <code>written</code> is incremented by <code>1</code>.
+     *
+     * @param v a <code>boolean</code> value to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeBoolean(boolean v) throws IOException {
+        out.write(v ? 1 : 0);
+        incCount(1);
+    }
+
+    /**
+     * Writes out a <code>byte</code> to the underlying output stream as a
+     * 1-byte value. If no exception is thrown, the counter <code>written</code>
+     * is incremented by <code>1</code>.
+     *
+     * @param v a <code>byte</code> value to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeByte(int v) throws IOException {
+        out.write(v);
+        incCount(1);
+    }
+
+    /**
+     * Writes a <code>short</code> to the underlying output stream as two bytes,
+     * high byte first. If no exception is thrown, the counter
+     * <code>written</code> is incremented by <code>2</code>.
+     *
+     * @param v a <code>short</code> to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeShort(int v) throws IOException {
+        out.write((v >>> 8) & 0xFF);
+        out.write((v) & 0xFF);
+        incCount(2);
+    }
+
+    /**
+     * Writes a <code>char</code> to the underlying output stream as a 2-byte
+     * value, high byte first. If no exception is thrown, the counter
+     * <code>written</code> is incremented by <code>2</code>.
+     *
+     * @param v a <code>char</code> value to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeChar(int v) throws IOException {
+        out.write((v >>> 8) & 0xFF);
+        out.write((v) & 0xFF);
+        incCount(2);
+    }
+
+    /**
+     * Writes an <code>int</code> to the underlying output stream as four bytes,
+     * high byte first. If no exception is thrown, the counter
+     * <code>written</code> is incremented by <code>4</code>.
+     *
+     * @param v an <code>int</code> to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeInt(int v) throws IOException {
+        out.write((v >>> 24) & 0xFF);
+        out.write((v >>> 16) & 0xFF);
+        out.write((v >>> 8) & 0xFF);
+        out.write((v) & 0xFF);
+        incCount(4);
+    }
+
+    private final byte writeBuffer[] = new byte[8];
+
+    /**
+     * Writes a <code>long</code> to the underlying output stream as eight
+     * bytes, high byte first. In no exception is thrown, the counter
+     * <code>written</code> is incremented by <code>8</code>.
+     *
+     * @param v a <code>long</code> to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeLong(long v) throws IOException {
+        writeBuffer[0] = (byte) (v >>> 56);
+        writeBuffer[1] = (byte) (v >>> 48);
+        writeBuffer[2] = (byte) (v >>> 40);
+        writeBuffer[3] = (byte) (v >>> 32);
+        writeBuffer[4] = (byte) (v >>> 24);
+        writeBuffer[5] = (byte) (v >>> 16);
+        writeBuffer[6] = (byte) (v >>> 8);
+        writeBuffer[7] = (byte) (v);
+        out.write(writeBuffer, 0, 8);
+        incCount(8);
+    }
+
+    /**
+     * Converts the float argument to an <code>int</code> using the
+     * <code>floatToIntBits</code> method in class <code>Float</code>, and then
+     * writes that <code>int</code> value to the underlying output stream as a
+     * 4-byte quantity, high byte first. If no exception is thrown, the counter
+     * <code>written</code> is incremented by <code>4</code>.
+     *
+     * @param v a <code>float</code> value to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     * @see java.lang.Float#floatToIntBits(float)
+     */
+    @Override
+    public final void writeFloat(float v) throws IOException {
+        writeInt(Float.floatToIntBits(v));
+    }
+
+    /**
+     * Converts the double argument to a <code>long</code> using the
+     * <code>doubleToLongBits</code> method in class <code>Double</code>, and
+     * then writes that <code>long</code> value to the underlying output stream
+     * as an 8-byte quantity, high byte first. If no exception is thrown, the
+     * counter <code>written</code> is incremented by <code>8</code>.
+     *
+     * @param v a <code>double</code> value to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     * @see java.lang.Double#doubleToLongBits(double)
+     */
+    @Override
+    public final void writeDouble(double v) throws IOException {
+        writeLong(Double.doubleToLongBits(v));
+    }
+
+    /**
+     * Writes out the string to the underlying output stream as a sequence of
+     * bytes. Each character in the string is written out, in sequence, by
+     * discarding its high eight bits. If no exception is thrown, the counter
+     * <code>written</code> is incremented by the length of <code>s</code>.
+     *
+     * @param s a string of bytes to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeBytes(String s) throws IOException {
+        int len = s.length();
+        for (int i = 0; i < len; i++) {
+            out.write((byte) s.charAt(i));
+        }
+        incCount(len);
+    }
+
+    /**
+     * Writes a string to the underlying output stream as a sequence of
+     * characters. Each character is written to the data output stream as if by
+     * the <code>writeChar</code> method. If no exception is thrown, the counter
+     * <code>written</code> is incremented by twice the length of
+     * <code>s</code>.
+     *
+     * @param s a <code>String</code> value to be written.
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.DataOutputStream#writeChar(int)
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public final void writeChars(String s) throws IOException {
+        int len = s.length();
+        for (int i = 0; i < len; i++) {
+            int v = s.charAt(i);
+            out.write((v >>> 8) & 0xFF);
+            out.write((v) & 0xFF);
+        }
+        incCount(len * 2);
+    }
+
+    /**
+     * Writes a string to the underlying output stream using
+     * <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
+     * encoding in a machine-independent manner.
+     * <p>
+     * First, two bytes are written to the output stream as if by the
+     * <code>writeShort</code> method giving the number of bytes to follow. This
+     * value is the number of bytes actually written out, not the length of the
+     * string. Following the length, each character of the string is output, in
+     * sequence, using the modified UTF-8 encoding for the character. If no
+     * exception is thrown, the counter <code>written</code> is incremented by
+     * the total number of bytes written to the output stream. This will be at
+     * least two plus the length of <code>str</code>, and at most two plus
+     * thrice the length of <code>str</code>.
+     *
+     * @param str a string to be written.
+     * @exception IOException if an I/O error occurs.
+     */
+    @Override
+    public final void writeUTF(String str) throws IOException {
+        writeUTF(str, this);
+    }
+
+    /**
+     * Writes a string to the specified DataOutput using
+     * <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
+     * encoding in a machine-independent manner.
+     * <p>
+     * First, two bytes are written to out as if by the <code>writeShort</code>
+     * method giving the number of bytes to follow. This value is the number of
+     * bytes actually written out, not the length of the string. Following the
+     * length, each character of the string is output, in sequence, using the
+     * modified UTF-8 encoding for the character. If no exception is thrown, the
+     * counter <code>written</code> is incremented by the total number of bytes
+     * written to the output stream. This will be at least two plus the length
+     * of <code>str</code>, and at most two plus thrice the length of
+     * <code>str</code>.
+     *
+     * @param str a string to be written.
+     * @param out destination to write to
+     * @return The number of bytes written out.
+     * @exception IOException if an I/O error occurs.
+     */
+    static int writeUTF(String str, DataOutput out) throws IOException {
+        int strlen = str.length();
+        int utflen = 0;
+        int c, count = 0;
+
+        /* use charAt instead of copying String to char array */
+        for (int i = 0; i < strlen; i++) {
+            c = str.charAt(i);
+            if ((c >= 0x0001) && (c <= 0x007F)) {
+                utflen++;
+            } else if (c > 0x07FF) {
+                utflen += 3;
+            } else {
+                utflen += 2;
+            }
+        }
+
+        if (utflen > 65535) {
+            throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
+        }
+
+        byte[] bytearr = null;
+        if (out instanceof DataOutputStream) {
+            DataOutputStream dos = (DataOutputStream) out;
+            if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2))) {
+                dos.bytearr = new byte[(utflen * 2) + 2];
+            }
+            bytearr = dos.bytearr;
+        } else {
+            bytearr = new byte[utflen + 2];
+        }
+
+        bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+        bytearr[count++] = (byte) ((utflen) & 0xFF);
+
+        int i = 0;
+        for (i = 0; i < strlen; i++) {
+            c = str.charAt(i);
+            if (!((c >= 0x0001) && (c <= 0x007F))) {
+                break;
+            }
+            bytearr[count++] = (byte) c;
+        }
+
+        for (; i < strlen; i++) {
+            c = str.charAt(i);
+            if ((c >= 0x0001) && (c <= 0x007F)) {
+                bytearr[count++] = (byte) c;
+
+            } else if (c > 0x07FF) {
+                bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+                bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+                bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
+            } else {
+                bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+                bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
+            }
+        }
+        out.write(bytearr, 0, utflen + 2);
+        return utflen + 2;
+    }
+
+    /**
+     * Returns the current value of the counter <code>written</code>, the number
+     * of bytes written to this data output stream so far. If the counter
+     * overflows, it will be wrapped to Integer.MAX_VALUE.
+     *
+     * @return the value of the <code>written</code> field.
+     * @see java.io.DataOutputStream#written
+     */
+    public final int size() {
+        return written;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
new file mode 100644
index 0000000..2864bbb
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * <p>
+ * This class extends the {@link java.util.zip.GZIPOutputStream} by allowing the
+ * constructor to provide a compression level, and uses a default value of 1,
+ * rather than 5.
+ * </p>
+ */
+public class GZIPOutputStream extends java.util.zip.GZIPOutputStream {
+
+    public static final int DEFAULT_COMPRESSION_LEVEL = 1;
+
+    public GZIPOutputStream(final OutputStream out) throws IOException {
+        this(out, DEFAULT_COMPRESSION_LEVEL);
+    }
+
+    public GZIPOutputStream(final OutputStream out, final int compressionLevel) throws IOException {
+        super(out);
+        def.setLevel(compressionLevel);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
new file mode 100644
index 0000000..bffbe26
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LeakyBucketStreamThrottler implements StreamThrottler {
+
+    private final int maxBytesPerSecond;
+    private final BlockingQueue<Request> requestQueue = new LinkedBlockingQueue<Request>();
+    private final ScheduledExecutorService executorService;
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+    public LeakyBucketStreamThrottler(final int maxBytesPerSecond) {
+        this.maxBytesPerSecond = maxBytesPerSecond;
+
+        executorService = Executors.newSingleThreadScheduledExecutor();
+        final Runnable task = new Drain();
+        executorService.scheduleAtFixedRate(task, 0, 1000, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        this.shutdown.set(true);
+
+        executorService.shutdown();
+        try {
+            // Should not take more than 2 seconds because we run every second. If it takes more than
+            // 2 seconds, it is because the Runnable thread is blocking on a write; in this case,
+            // we will just ignore it and return
+            executorService.awaitTermination(2, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+        }
+    }
+
+    @Override
+    public OutputStream newThrottledOutputStream(final OutputStream toWrap) {
+        return new OutputStream() {
+            @Override
+            public void write(final int b) throws IOException {
+                write(new byte[]{(byte) b}, 0, 1);
+            }
+
+            @Override
+            public void write(byte[] b) throws IOException {
+                write(b, 0, b.length);
+            }
+
+            @Override
+            public void write(byte[] b, int off, int len) throws IOException {
+                final InputStream in = new ByteArrayInputStream(b, off, len);
+                LeakyBucketStreamThrottler.this.copy(in, toWrap);
+            }
+
+            @Override
+            public void close() throws IOException {
+                toWrap.close();
+            }
+
+            @Override
+            public void flush() throws IOException {
+                toWrap.flush();
+            }
+        };
+    }
+
+    @Override
+    public InputStream newThrottledInputStream(final InputStream toWrap) {
+        return new InputStream() {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+            @Override
+            public int read() throws IOException {
+                final ByteArrayOutputStream baos = new ByteArrayOutputStream(1);
+                LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L);
+                if (baos.getBufferLength() < 1) {
+                    return -1;
+                }
+
+                return baos.getUnderlyingBuffer()[0] & 0xFF;
+            }
+
+            @Override
+            public int read(final byte[] b) throws IOException {
+                if(b.length == 0){
+                    return 0;
+                }
+                return read(b, 0, b.length);
+            }
+
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                baos.reset();
+                final int copied = (int) LeakyBucketStreamThrottler.this.copy(toWrap, baos, len);
+                if (copied == 0) {
+                    return -1;
+                }
+                System.arraycopy(baos.getUnderlyingBuffer(), 0, b, off, copied);
+                return copied;
+            }
+
+            @Override
+            public void close() throws IOException {
+                toWrap.close();
+            }
+
+            @Override
+            public int available() throws IOException {
+                return toWrap.available();
+            }
+        };
+    }
+
+    @Override
+    public long copy(final InputStream in, final OutputStream out) throws IOException {
+        return copy(in, out, -1);
+    }
+
+    @Override
+    public long copy(final InputStream in, final OutputStream out, final long maxBytes) throws IOException {
+        long totalBytesCopied = 0;
+        boolean finished = false;
+        while (!finished) {
+            final long requestMax = (maxBytes < 0) ? Long.MAX_VALUE : maxBytes - totalBytesCopied;
+            final Request request = new Request(in, out, requestMax);
+            boolean transferred = false;
+            while (!transferred) {
+                if (shutdown.get()) {
+                    throw new IOException("Throttler shutdown");
+                }
+
+                try {
+                    transferred = requestQueue.offer(request, 1000, TimeUnit.MILLISECONDS);
+                } catch (final InterruptedException e) {
+                    throw new IOException("Interrupted", e);
+                }
+            }
+
+            final BlockingQueue<Response> responseQueue = request.getResponseQueue();
+            Response response = null;
+            while (response == null) {
+                try {
+                    if (shutdown.get()) {
+                        throw new IOException("Throttler shutdown");
+                    }
+                    response = responseQueue.poll(1000L, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    throw new IOException("Interrupted", e);
+                }
+            }
+
+            if (!response.isSuccess()) {
+                throw response.getError();
+            }
+
+            totalBytesCopied += response.getBytesCopied();
+            finished = (response.getBytesCopied() == 0) || (totalBytesCopied >= maxBytes && maxBytes > 0);
+        }
+
+        return totalBytesCopied;
+    }
+
+    /**
+     * This class is responsible for draining water from the leaky bucket. I.e.,
+     * it actually moves the data
+     */
+    private class Drain implements Runnable {
+
+        private final byte[] buffer;
+
+        public Drain() {
+            final int bufferSize = Math.min(4096, maxBytesPerSecond);
+            buffer = new byte[bufferSize];
+        }
+
+        @Override
+        public void run() {
+            final long start = System.currentTimeMillis();
+
+            int bytesTransferred = 0;
+            while (bytesTransferred < maxBytesPerSecond) {
+                final long maxMillisToWait = 1000 - (System.currentTimeMillis() - start);
+                if (maxMillisToWait < 1) {
+                    return;
+                }
+
+                try {
+                    final Request request = requestQueue.poll(maxMillisToWait, TimeUnit.MILLISECONDS);
+                    if (request == null) {
+                        return;
+                    }
+
+                    final BlockingQueue<Response> responseQueue = request.getResponseQueue();
+
+                    final OutputStream out = request.getOutputStream();
+                    final InputStream in = request.getInputStream();
+
+                    try {
+                        final long requestMax = request.getMaxBytesToCopy();
+                        long maxBytesToTransfer;
+                        if (requestMax < 0) {
+                            maxBytesToTransfer = Math.min(buffer.length, maxBytesPerSecond - bytesTransferred);
+                        } else {
+                            maxBytesToTransfer = Math.min(requestMax,
+                                    Math.min(buffer.length, maxBytesPerSecond - bytesTransferred));
+                        }
+                        maxBytesToTransfer = Math.max(1L, maxBytesToTransfer);
+
+                        final int bytesCopied = fillBuffer(in, maxBytesToTransfer);
+                        out.write(buffer, 0, bytesCopied);
+
+                        final Response response = new Response(true, bytesCopied);
+                        responseQueue.put(response);
+                        bytesTransferred += bytesCopied;
+                    } catch (final IOException e) {
+                        final Response response = new Response(e);
+                        responseQueue.put(response);
+                    }
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+
+        private int fillBuffer(final InputStream in, final long maxBytes) throws IOException {
+            int bytesRead = 0;
+            int len;
+            while (bytesRead < maxBytes && (len = in.read(buffer, bytesRead, (int) Math.min(maxBytes - bytesRead, buffer.length - bytesRead))) > 0) {
+                bytesRead += len;
+            }
+
+            return bytesRead;
+        }
+    }
+
+    private static class Response {
+
+        private final boolean success;
+        private final IOException error;
+        private final int bytesCopied;
+
+        public Response(final boolean success, final int bytesCopied) {
+            this.success = success;
+            this.bytesCopied = bytesCopied;
+            this.error = null;
+        }
+
+        public Response(final IOException error) {
+            this.success = false;
+            this.error = error;
+            this.bytesCopied = -1;
+        }
+
+        public boolean isSuccess() {
+            return success;
+        }
+
+        public IOException getError() {
+            return error;
+        }
+
+        public int getBytesCopied() {
+            return bytesCopied;
+        }
+    }
+
+    private static class Request {
+
+        private final OutputStream out;
+        private final InputStream in;
+        private final long maxBytesToCopy;
+        private final BlockingQueue<Response> responseQueue;
+
+        public Request(final InputStream in, final OutputStream out, final long maxBytesToCopy) {
+            this.out = out;
+            this.in = in;
+            this.maxBytesToCopy = maxBytesToCopy;
+            this.responseQueue = new LinkedBlockingQueue<Response>(1);
+        }
+
+        public BlockingQueue<Response> getResponseQueue() {
+            return this.responseQueue;
+        }
+
+        public OutputStream getOutputStream() {
+            return out;
+        }
+
+        public InputStream getInputStream() {
+            return in;
+        }
+
+        public long getMaxBytesToCopy() {
+            return maxBytesToCopy;
+        }
+
+        @Override
+        public String toString() {
+            return "Request[maxBytes=" + maxBytesToCopy + "]";
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
new file mode 100644
index 0000000..0e75a22
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Wraps and InputStream so that the underlying InputStream cannot be closed.
+ * This is used so that the InputStream can be wrapped with yet another
+ * InputStream and prevent the outer layer from closing the inner InputStream
+ */
+public class NonCloseableInputStream extends FilterInputStream {
+
+    private final InputStream toWrap;
+
+    public NonCloseableInputStream(final InputStream toWrap) {
+        super(toWrap);
+        this.toWrap = toWrap;
+    }
+
+    @Override
+    public int read() throws IOException {
+        return toWrap.read();
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        return toWrap.read(b);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        return toWrap.read(b, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
new file mode 100644
index 0000000..9c77637
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class NonCloseableOutputStream extends FilterOutputStream {
+
+    private final OutputStream out;
+
+    public NonCloseableOutputStream(final OutputStream out) {
+        super(out);
+        this.out = out;
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        out.write(b, off, len);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+    }
+
+    @Override
+    public void close() throws IOException {
+        out.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
new file mode 100644
index 0000000..8452761
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * OutputStream that throws away all data, just like as if writing to /dev/null
+ */
+public class NullOutputStream extends OutputStream {
+
+    @Override
+    public void write(final int b) throws IOException {
+    }
+
+    @Override
+    public void write(final byte[] b) throws IOException {
+    }
+
+    @Override
+    public void write(final byte[] b, int off, int len) throws IOException {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void flush() throws IOException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
new file mode 100644
index 0000000..9158050
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public interface StreamThrottler extends Closeable {
+
+    long copy(InputStream in, OutputStream out) throws IOException;
+
+    long copy(InputStream in, OutputStream out, long maxBytes) throws IOException;
+
+    InputStream newThrottledInputStream(final InputStream toWrap);
+
+    OutputStream newThrottledOutputStream(final OutputStream toWrap);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
new file mode 100644
index 0000000..8e3d606
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.stream.io.exception.BytePatternNotFoundException;
+import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
+
+public class StreamUtils {
+
+    public static long copy(final InputStream source, final OutputStream destination) throws IOException {
+        final byte[] buffer = new byte[8192];
+        int len;
+        long totalCount = 0L;
+        while ((len = source.read(buffer)) > 0) {
+            destination.write(buffer, 0, len);
+            totalCount += len;
+        }
+        return totalCount;
+    }
+
+    /**
+     * Copies <code>numBytes</code> from <code>source</code> to
+     * <code>destination</code>. If <code>numBytes</code> are not available from
+     * <code>source</code>, throws EOFException
+     *
+     * @param source
+     * @param destination
+     * @param numBytes
+     * @throws IOException
+     */
+    public static void copy(final InputStream source, final OutputStream destination, final long numBytes) throws IOException {
+        final byte[] buffer = new byte[8192];
+        int len;
+        long bytesLeft = numBytes;
+        while ((len = source.read(buffer, 0, (int) Math.min(bytesLeft, buffer.length))) > 0) {
+            destination.write(buffer, 0, len);
+            bytesLeft -= len;
+        }
+
+        if (bytesLeft > 0) {
+            throw new EOFException("Attempted to copy " + numBytes + " bytes but only " + (numBytes - bytesLeft) + " bytes were available");
+        }
+    }
+
+    /**
+     * Reads data from the given input stream, copying it to the destination
+     * byte array. If the InputStream has less data than the given byte array,
+     * throws an EOFException
+     *
+     * @param source
+     * @param destination
+     * @throws IOException
+     */
+    public static void fillBuffer(final InputStream source, final byte[] destination) throws IOException {
+        fillBuffer(source, destination, true);
+    }
+
+    /**
+     * Reads data from the given input stream, copying it to the destination
+     * byte array. If the InputStream has less data than the given byte array,
+     * throws an EOFException if <code>ensureCapacity</code> is true and
+     * otherwise returns the number of bytes copied
+     *
+     * @param source
+     * @param destination
+     * @param ensureCapacity whether or not to enforce that the InputStream have
+     * at least as much data as the capacity of the destination byte array
+     * @return 
+     * @throws IOException
+     */
+    public static int fillBuffer(final InputStream source, final byte[] destination, final boolean ensureCapacity) throws IOException {
+        int bytesRead = 0;
+        int len;
+        while (bytesRead < destination.length) {
+            len = source.read(destination, bytesRead, destination.length - bytesRead);
+            if (len < 0) {
+                if (ensureCapacity) {
+                    throw new EOFException();
+                } else {
+                    break;
+                }
+            }
+
+            bytesRead += len;
+        }
+
+        return bytesRead;
+    }
+
+    /**
+     * Copies data from in to out until either we are out of data (returns null)
+     * or we hit one of the byte patterns identified by the
+     * <code>stoppers</code> parameter (returns the byte pattern matched). The
+     * bytes in the stopper will be copied.
+     *
+     * @param in
+     * @param out
+     * @param maxBytes
+     * @param stoppers
+     * @return the byte array matched, or null if end of stream was reached
+     * @throws IOException
+     */
+    public static byte[] copyInclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException {
+        if (stoppers.length == 0) {
+            return null;
+        }
+
+        final List<NonThreadSafeCircularBuffer> circularBuffers = new ArrayList<NonThreadSafeCircularBuffer>();
+        for (final byte[] stopper : stoppers) {
+            circularBuffers.add(new NonThreadSafeCircularBuffer(stopper));
+        }
+
+        long bytesRead = 0;
+        while (true) {
+            final int next = in.read();
+            if (next == -1) {
+                return null;
+            } else if (maxBytes > 0 && ++bytesRead >= maxBytes) {
+                throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format");
+            }
+
+            out.write(next);
+
+            for (final NonThreadSafeCircularBuffer circ : circularBuffers) {
+                if (circ.addAndCompare((byte) next)) {
+                    return circ.getByteArray();
+                }
+            }
+        }
+    }
+
+    /**
+     * Copies data from in to out until either we are out of data (returns null)
+     * or we hit one of the byte patterns identified by the
+     * <code>stoppers</code> parameter (returns the byte pattern matched). The
+     * byte pattern matched will NOT be copied to the output and will be un-read
+     * from the input.
+     *
+     * @param in
+     * @param out
+     * @param maxBytes
+     * @param stoppers
+     * @return the byte array matched, or null if end of stream was reached
+     * @throws IOException
+     */
+    public static byte[] copyExclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException {
+        if (stoppers.length == 0) {
+            return null;
+        }
+
+        int longest = 0;
+        NonThreadSafeCircularBuffer longestBuffer = null;
+        final List<NonThreadSafeCircularBuffer> circularBuffers = new ArrayList<NonThreadSafeCircularBuffer>();
+        for (final byte[] stopper : stoppers) {
+            final NonThreadSafeCircularBuffer circularBuffer = new NonThreadSafeCircularBuffer(stopper);
+            if (stopper.length > longest) {
+                longest = stopper.length;
+                longestBuffer = circularBuffer;
+                circularBuffers.add(0, circularBuffer);
+            } else {
+                circularBuffers.add(circularBuffer);
+            }
+        }
+
+        long bytesRead = 0;
+        while (true) {
+            final int next = in.read();
+            if (next == -1) {
+                return null;
+            } else if (maxBytes > 0 && bytesRead++ > maxBytes) {
+                throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format");
+            }
+
+            for (final NonThreadSafeCircularBuffer circ : circularBuffers) {
+                if (circ.addAndCompare((byte) next)) {
+                    // The longest buffer has some data that may not have been written out yet; we need to make sure
+                    // that we copy out those bytes.
+                    final int bytesToCopy = longest - circ.getByteArray().length;
+                    for (int i = 0; i < bytesToCopy; i++) {
+                        final int oldestByte = longestBuffer.getOldestByte();
+                        if (oldestByte != -1) {
+                            out.write(oldestByte);
+                            longestBuffer.addAndCompare((byte) 0);
+                        }
+                    }
+
+                    return circ.getByteArray();
+                }
+            }
+
+            if (longestBuffer.isFilled()) {
+                out.write(longestBuffer.getOldestByte());
+            }
+        }
+    }
+
+    /**
+     * Skips the specified number of bytes from the InputStream
+     *
+     * If unable to skip that number of bytes, throws EOFException
+     *
+     * @param stream
+     * @param bytesToSkip
+     * @throws IOException
+     */
+    public static void skip(final InputStream stream, final long bytesToSkip) throws IOException {
+        if (bytesToSkip <= 0) {
+            return;
+        }
+        long totalSkipped = 0L;
+
+        // If we have a FileInputStream, calling skip(1000000) will return 1000000 even if the file is only
+        // 3 bytes. As a result, we will skip 1 less than the number requested, and then read the last
+        // byte in order to make sure that we've consumed the number of bytes requested. We then check that
+        // the final byte, which we read, is not -1.
+        final long actualBytesToSkip = bytesToSkip - 1;
+        while (totalSkipped < actualBytesToSkip) {
+            final long skippedThisIteration = stream.skip(actualBytesToSkip - totalSkipped);
+            if (skippedThisIteration == 0) {
+                final int nextByte = stream.read();
+                if (nextByte == -1) {
+                    throw new EOFException();
+                } else {
+                    totalSkipped++;
+                }
+            }
+
+            totalSkipped += skippedThisIteration;
+        }
+
+        final int lastByte = stream.read();
+        if (lastByte == -1) {
+            throw new EOFException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
new file mode 100644
index 0000000..2b9050d
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.OutputStream;
+
+/**
+ * This class extends the {@link java.util.zip.ZipOutputStream} by providing a
+ * constructor that allows the user to specify the compression level. The
+ * default compression level is 1, as opposed to Java's default of 5.
+ */
+public class ZipOutputStream extends java.util.zip.ZipOutputStream {
+
+    public static final int DEFAULT_COMPRESSION_LEVEL = 1;
+
+    public ZipOutputStream(final OutputStream out) {
+        this(out, DEFAULT_COMPRESSION_LEVEL);
+    }
+
+    public ZipOutputStream(final OutputStream out, final int compressionLevel) {
+        super(out);
+        def.setLevel(compressionLevel);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
new file mode 100644
index 0000000..5d08616
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.exception;
+
+import java.io.IOException;
+
+public class BytePatternNotFoundException extends IOException {
+
+    private static final long serialVersionUID = -4128911284318513973L;
+
+    public BytePatternNotFoundException(final String explanation) {
+        super(explanation);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
new file mode 100644
index 0000000..b4b4c17
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import java.util.Arrays;
+
+public class NonThreadSafeCircularBuffer {
+
+    private final byte[] lookingFor;
+    private final int[] buffer;
+    private int insertionPointer = 0;
+    private int bufferSize = 0;
+
+    public NonThreadSafeCircularBuffer(final byte[] lookingFor) {
+        this.lookingFor = lookingFor;
+        buffer = new int[lookingFor.length];
+        Arrays.fill(buffer, -1);
+    }
+
+    public byte[] getByteArray() {
+        return lookingFor;
+    }
+
+    /**
+     * Returns the oldest byte in the buffer
+     *
+     * @return
+     */
+    public int getOldestByte() {
+        return buffer[insertionPointer];
+    }
+
+    public boolean isFilled() {
+        return bufferSize >= buffer.length;
+    }
+
+    public boolean addAndCompare(final byte data) {
+        buffer[insertionPointer] = data;
+        insertionPointer = (insertionPointer + 1) % lookingFor.length;
+
+        bufferSize++;
+        if (bufferSize < lookingFor.length) {
+            return false;
+        }
+
+        for (int i = 0; i < lookingFor.length; i++) {
+            final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length];
+            if (compare != lookingFor[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
new file mode 100644
index 0000000..85bfd96
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.Arrays;
+
+/**
+ * <p>
+ * A RingBuffer that can be used to scan byte sequences for subsequences.
+ * </p>
+ *
+ * <p>
+ * This class implements an efficient naive search algorithm, which allows the
+ * user of the library to identify byte sequences in a stream on-the-fly so that
+ * the stream can be segmented without having to buffer the data.
+ * </p>
+ *
+ * <p>
+ * The intended usage paradigm is:
+ * <code>
+ * <pre>
+ * final byte[] searchSequence = ...;
+ * final CircularBuffer buffer = new CircularBuffer(searchSequence);
+ * while ((int nextByte = in.read()) > 0) {
+ *      if ( buffer.addAndCompare(nextByte) ) {
+ *          // This byte is the last byte in the given sequence
+ *      } else {
+ *          // This byte does not complete the given sequence
+ *      }
+ * }
+ * </pre>
+ * </code>
+ * </p>
+ */
+public class NaiveSearchRingBuffer {
+
+    private final byte[] lookingFor;
+    private final int[] buffer;
+    private int insertionPointer = 0;
+    private int bufferSize = 0;
+
+    public NaiveSearchRingBuffer(final byte[] lookingFor) {
+        this.lookingFor = lookingFor;
+        this.buffer = new int[lookingFor.length];
+        Arrays.fill(buffer, -1);
+    }
+
+    /**
+     * Returns the contents of the internal buffer, which represents the last X
+     * bytes added to the buffer, where X is the minimum of the number of bytes
+     * added to the buffer or the length of the byte sequence for which we are
+     * looking
+     *
+     * @return
+     */
+    public byte[] getBufferContents() {
+        final int contentLength = Math.min(lookingFor.length, bufferSize);
+        final byte[] contents = new byte[contentLength];
+        for (int i = 0; i < contentLength; i++) {
+            final byte nextByte = (byte) buffer[(insertionPointer + i) % lookingFor.length];
+            contents[i] = nextByte;
+        }
+        return contents;
+    }
+
+    /**
+     * Returns the oldest byte in the buffer
+     *
+     * @return
+     */
+    public int getOldestByte() {
+        return buffer[insertionPointer];
+    }
+
+    /**
+     * Returns <code>true</code> if the number of bytes that have been added to
+     * the buffer is at least equal to the length of the byte sequence for which
+     * we are searching
+     *
+     * @return
+     */
+    public boolean isFilled() {
+        return bufferSize >= buffer.length;
+    }
+
+    /**
+     * Clears the internal buffer so that a new search may begin
+     */
+    public void clear() {
+        Arrays.fill(buffer, -1);
+        insertionPointer = 0;
+        bufferSize = 0;
+    }
+
+    /**
+     * Add the given byte to the buffer and notify whether or not the byte
+     * completes the desired byte sequence.
+     *
+     * @param data
+     * @return <code>true</code> if this byte completes the byte sequence,
+     * <code>false</code> otherwise.
+     */
+    public boolean addAndCompare(final byte data) {
+        buffer[insertionPointer] = data;
+        insertionPointer = (insertionPointer + 1) % lookingFor.length;
+
+        bufferSize++;
+        if (bufferSize < lookingFor.length) {
+            return false;
+        }
+
+        for (int i = 0; i < lookingFor.length; i++) {
+            final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length];
+            if (compare != lookingFor[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+}