You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/20 06:52:27 UTC
[5/8] incubator-nifi git commit: NIFI-189 Collapsed several commons
libs into one
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
new file mode 100644
index 0000000..284cd54
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.InputStream;
+
+/**
+ * This class performs the same function as java.io.ByteArrayInputStream but
+ * does not mark its methods as synchronized
+ */
+public class ByteArrayInputStream extends InputStream {
+
+ /**
+ * An array of bytes that was provided by the creator of the stream.
+ * Elements <code>buf[0]</code> through <code>buf[count-1]</code> are the
+ * only bytes that can ever be read from the stream; element
+ * <code>buf[pos]</code> is the next byte to be read.
+ */
+ protected byte buf[];
+
+ /**
+ * The index of the next character to read from the input stream buffer.
+ * This value should always be nonnegative and not larger than the value of
+ * <code>count</code>. The next byte to be read from the input stream buffer
+ * will be <code>buf[pos]</code>.
+ */
+ protected int pos;
+
+ /**
+ * The currently marked position in the stream. ByteArrayInputStream objects
+ * are marked at position zero by default when constructed. They may be
+ * marked at another position within the buffer by the <code>mark()</code>
+ * method. The current buffer position is set to this point by the
+ * <code>reset()</code> method.
+ * <p>
+ * If no mark has been set, then the value of mark is the offset passed to
+ * the constructor (or 0 if the offset was not supplied).
+ *
+ * @since JDK1.1
+ */
+ protected int mark = 0;
+
+ /**
+ * The index one greater than the last valid character in the input stream
+ * buffer. This value should always be nonnegative and not larger than the
+ * length of <code>buf</code>. It is one greater than the position of the
+ * last byte within <code>buf</code> that can ever be read from the input
+ * stream buffer.
+ */
+ protected int count;
+
+ /**
+ * Creates a <code>ByteArrayInputStream</code> so that it uses
+ * <code>buf</code> as its buffer array. The buffer array is not copied. The
+ * initial value of <code>pos</code> is <code>0</code> and the initial value
+ * of <code>count</code> is the length of <code>buf</code>.
+ *
+ * @param buf the input buffer.
+ */
+ public ByteArrayInputStream(byte buf[]) {
+ this.buf = buf;
+ this.pos = 0;
+ this.count = buf.length;
+ }
+
+ /**
+ * Creates <code>ByteArrayInputStream</code> that uses <code>buf</code> as
+ * its buffer array. The initial value of <code>pos</code> is
+ * <code>offset</code> and the initial value of <code>count</code> is the
+ * minimum of <code>offset+length</code> and <code>buf.length</code>. The
+ * buffer array is not copied. The buffer's mark is set to the specified
+ * offset.
+ *
+ * @param buf the input buffer.
+ * @param offset the offset in the buffer of the first byte to read.
+ * @param length the maximum number of bytes to read from the buffer.
+ */
+ public ByteArrayInputStream(byte buf[], int offset, int length) {
+ this.buf = buf;
+ this.pos = offset;
+ this.count = Math.min(offset + length, buf.length);
+ this.mark = offset;
+ }
+
+ /**
+ * Reads the next byte of data from this input stream. The value byte is
+ * returned as an <code>int</code> in the range <code>0</code> to
+ * <code>255</code>. If no byte is available because the end of the stream
+ * has been reached, the value <code>-1</code> is returned.
+ * <p>
+ * This <code>read</code> method cannot block.
+ *
+ * @return the next byte of data, or <code>-1</code> if the end of the
+ * stream has been reached.
+ */
+ @Override
+ public int read() {
+ return (pos < count) ? (buf[pos++] & 0xff) : -1;
+ }
+
+ /**
+ * Reads up to <code>len</code> bytes of data into an array of bytes from
+ * this input stream. If <code>pos</code> equals <code>count</code>, then
+ * <code>-1</code> is returned to indicate end of file. Otherwise, the
+ * number <code>k</code> of bytes read is equal to the smaller of
+ * <code>len</code> and <code>count-pos</code>. If <code>k</code> is
+ * positive, then bytes <code>buf[pos]</code> through
+ * <code>buf[pos+k-1]</code> are copied into <code>b[off]</code> through
+ * <code>b[off+k-1]</code> in the manner performed by
+ * <code>System.arraycopy</code>. The value <code>k</code> is added into
+ * <code>pos</code> and <code>k</code> is returned.
+ * <p>
+ * This <code>read</code> method cannot block.
+ *
+ * @param b the buffer into which the data is read.
+ * @param off the start offset in the destination array <code>b</code>
+ * @param len the maximum number of bytes read.
+ * @return the total number of bytes read into the buffer, or
+ * <code>-1</code> if there is no more data because the end of the stream
+ * has been reached.
+ * @exception NullPointerException If <code>b</code> is <code>null</code>.
+ * @exception IndexOutOfBoundsException If <code>off</code> is negative,
+ * <code>len</code> is negative, or <code>len</code> is greater than
+ * <code>b.length - off</code>
+ */
+ @Override
+ public int read(byte b[], int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ if (pos >= count) {
+ return -1;
+ }
+
+ int avail = count - pos;
+ if (len > avail) {
+ len = avail;
+ }
+ if (len <= 0) {
+ return 0;
+ }
+ System.arraycopy(buf, pos, b, off, len);
+ pos += len;
+ return len;
+ }
+
+ /**
+ * Skips <code>n</code> bytes of input from this input stream. Fewer bytes
+ * might be skipped if the end of the input stream is reached. The actual
+ * number <code>k</code> of bytes to be skipped is equal to the smaller of
+ * <code>n</code> and <code>count-pos</code>. The value <code>k</code> is
+ * added into <code>pos</code> and <code>k</code> is returned.
+ *
+ * @param n the number of bytes to be skipped.
+ * @return the actual number of bytes skipped.
+ */
+ @Override
+ public long skip(long n) {
+ long k = count - pos;
+ if (n < k) {
+ k = n < 0 ? 0 : n;
+ }
+
+ pos += k;
+ return k;
+ }
+
+ /**
+ * Returns the number of remaining bytes that can be read (or skipped over)
+ * from this input stream.
+ * <p>
+ * The value returned is <code>count - pos</code>, which is the number
+ * of bytes remaining to be read from the input buffer.
+ *
+ * @return the number of remaining bytes that can be read (or skipped over)
+ * from this input stream without blocking.
+ */
+ @Override
+ public int available() {
+ return count - pos;
+ }
+
+ /**
+ * Tests if this <code>InputStream</code> supports mark/reset. The
+ * <code>markSupported</code> method of <code>ByteArrayInputStream</code>
+ * always returns <code>true</code>.
+ *
+ * @since JDK1.1
+ */
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ /**
+ * Set the current marked position in the stream. ByteArrayInputStream
+ * objects are marked at position zero by default when constructed. They may
+ * be marked at another position within the buffer by this method.
+ * <p>
+ * If no mark has been set, then the value of the mark is the offset passed
+ * to the constructor (or 0 if the offset was not supplied).
+ *
+ * <p>
+ * Note: The <code>readAheadLimit</code> for this class has no meaning.
+ *
+ * @since JDK1.1
+ */
+ @Override
+ public void mark(int readAheadLimit) {
+ mark = pos;
+ }
+
+ /**
+ * Resets the buffer to the marked position. The marked position is 0 unless
+ * another position was marked or an offset was specified in the
+ * constructor.
+ */
+ @Override
+ public void reset() {
+ pos = mark;
+ }
+
+ /**
+ * Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in
+ * this class can be called after the stream has been closed without
+ * generating an <tt>IOException</tt>.
+ * <p>
+ */
+ @Override
+ public void close() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
new file mode 100644
index 0000000..459563b
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+/**
+ * This class provides a more efficient implementation of the
+ * java.io.ByteArrayOutputStream. The efficiency is gained in two ways:
+ * <ul>
+ * <li>The write methods are not synchronized</li>
+ * <li>The class provides {@link #getUnderlyingBuffer()} and
+ * {@link #getBufferLength()}, which can be used to access the underlying byte
+ * array directly, rather than the System.arraycopy that {@link #toByteArray()}
+ * uses
+ * </ul>
+ *
+ */
+public class ByteArrayOutputStream extends OutputStream {
+
+ /**
+ * The buffer where data is stored.
+ */
+ protected byte buf[];
+
+ /**
+ * The number of valid bytes in the buffer.
+ */
+ protected int count;
+
+ /**
+ * Creates a new byte array output stream. The buffer capacity is initially
+ * 32 bytes, though its size increases if necessary.
+ */
+ public ByteArrayOutputStream() {
+ this(32);
+ }
+
+ /**
+ * Creates a new byte array output stream, with a buffer capacity of the
+ * specified size, in bytes.
+ *
+ * @param size the initial size.
+ * @exception IllegalArgumentException if size is negative.
+ */
+ public ByteArrayOutputStream(int size) {
+ if (size < 0) {
+ throw new IllegalArgumentException("Negative initial size: "
+ + size);
+ }
+ buf = new byte[size];
+ }
+
+ /**
+ * Increases the capacity if necessary to ensure that it can hold at least
+ * the number of elements specified by the minimum capacity argument.
+ *
+ * @param minCapacity the desired minimum capacity
+ * @throws OutOfMemoryError if {@code minCapacity < 0}. This is interpreted
+ * as a request for the unsatisfiably large capacity
+ * {@code (long) Integer.MAX_VALUE + (minCapacity - Integer.MAX_VALUE)}.
+ */
+ private void ensureCapacity(int minCapacity) {
+ // overflow-conscious code
+ if (minCapacity - buf.length > 0) {
+ grow(minCapacity);
+ }
+ }
+
+ /**
+ * Increases the capacity to ensure that it can hold at least the number of
+ * elements specified by the minimum capacity argument.
+ *
+ * @param minCapacity the desired minimum capacity
+ */
+ private void grow(int minCapacity) {
+ // overflow-conscious code
+ int oldCapacity = buf.length;
+ int newCapacity = oldCapacity << 1;
+ if (newCapacity - minCapacity < 0) {
+ newCapacity = minCapacity;
+ }
+ if (newCapacity < 0) {
+ if (minCapacity < 0) // overflow
+ {
+ throw new OutOfMemoryError();
+ }
+ newCapacity = Integer.MAX_VALUE;
+ }
+ buf = Arrays.copyOf(buf, newCapacity);
+ }
+
+ /**
+ * Writes the specified byte to this byte array output stream.
+ *
+ * @param b the byte to be written.
+ */
+ @Override
+ public void write(int b) {
+ ensureCapacity(count + 1);
+ buf[count] = (byte) b;
+ count += 1;
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array starting at
+ * offset <code>off</code> to this byte array output stream.
+ *
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ */
+ @Override
+ public void write(byte b[], int off, int len) {
+ if ((off < 0) || (off > b.length) || (len < 0)
+ || ((off + len) - b.length > 0)) {
+ throw new IndexOutOfBoundsException();
+ }
+ ensureCapacity(count + len);
+ System.arraycopy(b, off, buf, count, len);
+ count += len;
+ }
+
+ /**
+ * Writes the complete contents of this byte array output stream to the
+ * specified output stream argument, as if by calling the output stream's
+ * write method using <code>out.write(buf, 0, count)</code>.
+ *
+ * @param out the output stream to which to write the data.
+ * @exception IOException if an I/O error occurs.
+ */
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(buf, 0, count);
+ }
+
+ /**
+ * Resets the <code>count</code> field of this byte array output stream to
+ * zero, so that all currently accumulated output in the output stream is
+ * discarded. The output stream can be used again, reusing the already
+ * allocated buffer space.
+ *
+ * @see java.io.ByteArrayInputStream#count
+ */
+ public void reset() {
+ count = 0;
+ }
+
+ /**
+ * Creates a newly allocated byte array. Its size is the current size of
+ * this output stream and the valid contents of the buffer have been copied
+ * into it.
+ *
+ * @return the current contents of this output stream, as a byte array.
+ * @see java.io.ByteArrayOutputStream#size()
+ */
+ public byte toByteArray ()
+ [] {
+ return Arrays.copyOf(buf, count);
+ }
+
+ /**
+ * Returns the current size of the buffer.
+ *
+ * @return the value of the <code>count</code> field, which is the number of
+ * valid bytes in this output stream.
+ * @see java.io.ByteArrayOutputStream#count
+ */
+ public int size() {
+ return count;
+ }
+
+ /**
+ * Converts the buffer's contents into a string decoding bytes using the
+ * platform's default character set. The length of the new <tt>String</tt>
+ * is a function of the character set, and hence may not be equal to the
+ * size of the buffer.
+ *
+ * <p>
+ * This method always replaces malformed-input and unmappable-character
+ * sequences with the default replacement string for the platform's default
+ * character set. The {@linkplain java.nio.charset.CharsetDecoder} class
+ * should be used when more control over the decoding process is required.
+ *
+ * @return String decoded from the buffer's contents.
+ * @since JDK1.1
+ */
+ @Override
+ public String toString() {
+ return new String(buf, 0, count);
+ }
+
+ /**
+ * Converts the buffer's contents into a string by decoding the bytes using
+ * the specified {@link java.nio.charset.Charset charsetName}. The length of
+ * the new <tt>String</tt> is a function of the charset, and hence may not
+ * be equal to the length of the byte array.
+ *
+ * <p>
+ * This method always replaces malformed-input and unmappable-character
+ * sequences with this charset's default replacement string. The {@link
+ * java.nio.charset.CharsetDecoder} class should be used when more control
+ * over the decoding process is required.
+ *
+ * @param charsetName the name of a supported
+ * {@linkplain java.nio.charset.Charset <code>charset</code>}
+ * @return String decoded from the buffer's contents.
+ * @exception UnsupportedEncodingException If the named charset is not
+ * supported
+ * @since JDK1.1
+ */
+ public String toString(String charsetName) throws UnsupportedEncodingException {
+ return new String(buf, 0, count, charsetName);
+ }
+
+ /**
+ * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in
+ * this class can be called after the stream has been closed without
+ * generating an <tt>IOException</tt>.
+ * <p>
+ *
+ */
+ @Override
+ public void close() {
+ }
+
+ public byte[] getUnderlyingBuffer() {
+ return buf;
+ }
+
+ public int getBufferLength() {
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
new file mode 100644
index 0000000..8294af3
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class ByteCountingInputStream extends InputStream {
+
+ private final InputStream in;
+ private long bytesRead = 0L;
+ private long bytesSkipped = 0L;
+
+ private long bytesSinceMark = 0L;
+
+ public ByteCountingInputStream(final InputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public int read() throws IOException {
+ final int fromSuper = in.read();
+ if (fromSuper >= 0) {
+ bytesRead++;
+ bytesSinceMark++;
+ }
+ return fromSuper;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ final int fromSuper = in.read(b, off, len);
+ if (fromSuper >= 0) {
+ bytesRead += fromSuper;
+ bytesSinceMark += fromSuper;
+ }
+
+ return fromSuper;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public long skip(final long n) throws IOException {
+ final long skipped = in.skip(n);
+ if (skipped >= 0) {
+ bytesSkipped += skipped;
+ bytesSinceMark += skipped;
+ }
+ return skipped;
+ }
+
+ public long getBytesRead() {
+ return bytesRead;
+ }
+
+ public long getBytesSkipped() {
+ return bytesSkipped;
+ }
+
+ public long getBytesConsumed() {
+ return getBytesRead() + getBytesSkipped();
+ }
+
+ @Override
+ public void mark(final int readlimit) {
+ in.mark(readlimit);
+
+ bytesSinceMark = 0L;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return in.markSupported();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ in.reset();
+ bytesRead -= bytesSinceMark;
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
new file mode 100644
index 0000000..d8e1a42
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class ByteCountingOutputStream extends OutputStream {
+
+ private final OutputStream out;
+ private long bytesWritten = 0L;
+
+ public ByteCountingOutputStream(final OutputStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ bytesWritten++;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ ;
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ bytesWritten += len;
+ }
+
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
new file mode 100644
index 0000000..1dd90f5
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.DataOutput;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+
+/**
+ * This class is different from java.io.DataOutputStream in that it does
+ * synchronize on its methods.
+ */
+public class DataOutputStream extends FilterOutputStream implements DataOutput {
+
+ /**
+ * The number of bytes written to the data output stream so far. If this
+ * counter overflows, it will be wrapped to Integer.MAX_VALUE.
+ */
+ protected int written;
+
+ /**
+ * bytearr is initialized on demand by writeUTF
+ */
+ private byte[] bytearr = null;
+
+ /**
+ * Creates a new data output stream to write data to the specified
+ * underlying output stream. The counter <code>written</code> is set to
+ * zero.
+ *
+ * @param out the underlying output stream, to be saved for later use.
+ * @see java.io.FilterOutputStream#out
+ */
+ public DataOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ /**
+ * Increases the written counter by the specified value until it reaches
+ * Integer.MAX_VALUE.
+ */
+ private void incCount(int value) {
+ int temp = written + value;
+ if (temp < 0) {
+ temp = Integer.MAX_VALUE;
+ }
+ written = temp;
+ }
+
+ /**
+ * Writes the specified byte (the low eight bits of the argument
+ * <code>b</code>) to the underlying output stream. If no exception is
+ * thrown, the counter <code>written</code> is incremented by
+ * <code>1</code>.
+ * <p>
+ * Implements the <code>write</code> method of <code>OutputStream</code>.
+ *
+ * @param b the <code>byte</code> to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ incCount(1);
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array starting at
+ * offset <code>off</code> to the underlying output stream. If no exception
+ * is thrown, the counter <code>written</code> is incremented by
+ * <code>len</code>.
+ *
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ out.write(b, off, len);
+ incCount(len);
+ }
+
+ /**
+ * Flushes this data output stream. This forces any buffered output bytes to
+ * be written out to the stream.
+ * <p>
+ * The <code>flush</code> method of <code>DataOutputStream</code> calls the
+ * <code>flush</code> method of its underlying output stream.
+ *
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ * @see java.io.OutputStream#flush()
+ */
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ /**
+ * Writes a <code>boolean</code> to the underlying output stream as a 1-byte
+ * value. The value <code>true</code> is written out as the value
+ * <code>(byte)1</code>; the value <code>false</code> is written out as the
+ * value <code>(byte)0</code>. If no exception is thrown, the counter
+ * <code>written</code> is incremented by <code>1</code>.
+ *
+ * @param v a <code>boolean</code> value to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ @Override
+ public final void writeBoolean(boolean v) throws IOException {
+ out.write(v ? 1 : 0);
+ incCount(1);
+ }
+
+ /**
+ * Writes out a <code>byte</code> to the underlying output stream as a
+ * 1-byte value. If no exception is thrown, the counter <code>written</code>
+ * is incremented by <code>1</code>.
+ *
+ * @param v a <code>byte</code> value to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ @Override
+ public final void writeByte(int v) throws IOException {
+ out.write(v);
+ incCount(1);
+ }
+
+ /**
+ * Writes a <code>short</code> to the underlying output stream as two bytes,
+ * high byte first. If no exception is thrown, the counter
+ * <code>written</code> is incremented by <code>2</code>.
+ *
+ * @param v a <code>short</code> to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ @Override
+ public final void writeShort(int v) throws IOException {
+ out.write((v >>> 8) & 0xFF);
+ out.write((v) & 0xFF);
+ incCount(2);
+ }
+
+ /**
+ * Writes a <code>char</code> to the underlying output stream as a 2-byte
+ * value, high byte first. If no exception is thrown, the counter
+ * <code>written</code> is incremented by <code>2</code>.
+ *
+ * @param v a <code>char</code> value to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ @Override
+ public final void writeChar(int v) throws IOException {
+ out.write((v >>> 8) & 0xFF);
+ out.write((v) & 0xFF);
+ incCount(2);
+ }
+
+ /**
+ * Writes an <code>int</code> to the underlying output stream as four bytes,
+ * high byte first. If no exception is thrown, the counter
+ * <code>written</code> is incremented by <code>4</code>.
+ *
+ * @param v an <code>int</code> to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ @Override
+ public final void writeInt(int v) throws IOException {
+ out.write((v >>> 24) & 0xFF);
+ out.write((v >>> 16) & 0xFF);
+ out.write((v >>> 8) & 0xFF);
+ out.write((v) & 0xFF);
+ incCount(4);
+ }
+
+ private final byte writeBuffer[] = new byte[8];
+
+ /**
+ * Writes a <code>long</code> to the underlying output stream as eight
+ * bytes, high byte first. In no exception is thrown, the counter
+ * <code>written</code> is incremented by <code>8</code>.
+ *
+ * @param v a <code>long</code> to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ @Override
+ public final void writeLong(long v) throws IOException {
+ writeBuffer[0] = (byte) (v >>> 56);
+ writeBuffer[1] = (byte) (v >>> 48);
+ writeBuffer[2] = (byte) (v >>> 40);
+ writeBuffer[3] = (byte) (v >>> 32);
+ writeBuffer[4] = (byte) (v >>> 24);
+ writeBuffer[5] = (byte) (v >>> 16);
+ writeBuffer[6] = (byte) (v >>> 8);
+ writeBuffer[7] = (byte) (v);
+ out.write(writeBuffer, 0, 8);
+ incCount(8);
+ }
+
+ /**
+ * Converts the float argument to an <code>int</code> using the
+ * <code>floatToIntBits</code> method in class <code>Float</code>, and then
+ * writes that <code>int</code> value to the underlying output stream as a
+ * 4-byte quantity, high byte first. If no exception is thrown, the counter
+ * <code>written</code> is incremented by <code>4</code>.
+ *
+ * @param v a <code>float</code> value to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ * @see java.lang.Float#floatToIntBits(float)
+ */
+ @Override
+ public final void writeFloat(float v) throws IOException {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ /**
+ * Converts the double argument to a <code>long</code> using the
+ * <code>doubleToLongBits</code> method in class <code>Double</code>, and
+ * then writes that <code>long</code> value to the underlying output stream
+ * as an 8-byte quantity, high byte first. If no exception is thrown, the
+ * counter <code>written</code> is incremented by <code>8</code>.
+ *
+ * @param v a <code>double</code> value to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ * @see java.lang.Double#doubleToLongBits(double)
+ */
+ @Override
+ public final void writeDouble(double v) throws IOException {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ /**
+ * Writes out the string to the underlying output stream as a sequence of
+ * bytes. Each character in the string is written out, in sequence, by
+ * discarding its high eight bits. If no exception is thrown, the counter
+ * <code>written</code> is incremented by the length of <code>s</code>.
+ *
+ * @param s a string of bytes to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ @Override
+ public final void writeBytes(String s) throws IOException {
+ int len = s.length();
+ for (int i = 0; i < len; i++) {
+ out.write((byte) s.charAt(i));
+ }
+ incCount(len);
+ }
+
+ /**
+ * Writes a string to the underlying output stream as a sequence of
+ * characters. Each character is written to the data output stream as if by
+ * the <code>writeChar</code> method. If no exception is thrown, the counter
+ * <code>written</code> is incremented by twice the length of
+ * <code>s</code>.
+ *
+ * @param s a <code>String</code> value to be written.
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.DataOutputStream#writeChar(int)
+ * @see java.io.FilterOutputStream#out
+ */
+ @Override
+ public final void writeChars(String s) throws IOException {
+ int len = s.length();
+ for (int i = 0; i < len; i++) {
+ int v = s.charAt(i);
+ out.write((v >>> 8) & 0xFF);
+ out.write((v) & 0xFF);
+ }
+ incCount(len * 2);
+ }
+
+ /**
+ * Writes a string to the underlying output stream using
+ * <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
+ * encoding in a machine-independent manner.
+ * <p>
+ * First, two bytes are written to the output stream as if by the
+ * <code>writeShort</code> method giving the number of bytes to follow. This
+ * value is the number of bytes actually written out, not the length of the
+ * string. Following the length, each character of the string is output, in
+ * sequence, using the modified UTF-8 encoding for the character. If no
+ * exception is thrown, the counter <code>written</code> is incremented by
+ * the total number of bytes written to the output stream. This will be at
+ * least two plus the length of <code>str</code>, and at most two plus
+ * thrice the length of <code>str</code>.
+ *
+ * @param str a string to be written.
+ * @exception IOException if an I/O error occurs.
+ */
+ @Override
+ public final void writeUTF(String str) throws IOException {
+ writeUTF(str, this);
+ }
+
+ /**
+ * Writes a string to the specified DataOutput using
+ * <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
+ * encoding in a machine-independent manner.
+ * <p>
+ * First, two bytes are written to out as if by the <code>writeShort</code>
+ * method giving the number of bytes to follow. This value is the number of
+ * bytes actually written out, not the length of the string. Following the
+ * length, each character of the string is output, in sequence, using the
+ * modified UTF-8 encoding for the character. If no exception is thrown, the
+ * counter <code>written</code> is incremented by the total number of bytes
+ * written to the output stream. This will be at least two plus the length
+ * of <code>str</code>, and at most two plus thrice the length of
+ * <code>str</code>.
+ *
+ * @param str a string to be written.
+ * @param out destination to write to
+ * @return The number of bytes written out.
+ * @exception IOException if an I/O error occurs.
+ */
+ static int writeUTF(String str, DataOutput out) throws IOException {
+ int strlen = str.length();
+ int utflen = 0;
+ int c, count = 0;
+
+ /* use charAt instead of copying String to char array */
+ for (int i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+
+ if (utflen > 65535) {
+ throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
+ }
+
+ byte[] bytearr = null;
+ if (out instanceof DataOutputStream) {
+ DataOutputStream dos = (DataOutputStream) out;
+ if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2))) {
+ dos.bytearr = new byte[(utflen * 2) + 2];
+ }
+ bytearr = dos.bytearr;
+ } else {
+ bytearr = new byte[utflen + 2];
+ }
+
+ bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+ bytearr[count++] = (byte) ((utflen) & 0xFF);
+
+ int i = 0;
+ for (i = 0; i < strlen; i++) {
+ c = str.charAt(i);
+ if (!((c >= 0x0001) && (c <= 0x007F))) {
+ break;
+ }
+ bytearr[count++] = (byte) c;
+ }
+
+ for (; i < strlen; i++) {
+ c = str.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ bytearr[count++] = (byte) c;
+
+ } else if (c > 0x07FF) {
+ bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
+ } else {
+ bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
+ }
+ }
+ out.write(bytearr, 0, utflen + 2);
+ return utflen + 2;
+ }
+
+ /**
+ * Returns the current value of the counter <code>written</code>, the number
+ * of bytes written to this data output stream so far. If the counter
+ * overflows, it will be wrapped to Integer.MAX_VALUE.
+ *
+ * @return the value of the <code>written</code> field.
+ * @see java.io.DataOutputStream#written
+ */
+ public final int size() {
+ return written;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
new file mode 100644
index 0000000..2864bbb
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * <p>
+ * This class extends the {@link java.util.zip.GZIPOutputStream} by allowing the
+ * constructor to provide a compression level, and uses a default value of 1,
+ * rather than 5.
+ * </p>
+ */
+public class GZIPOutputStream extends java.util.zip.GZIPOutputStream {
+
+ public static final int DEFAULT_COMPRESSION_LEVEL = 1;
+
+ public GZIPOutputStream(final OutputStream out) throws IOException {
+ this(out, DEFAULT_COMPRESSION_LEVEL);
+ }
+
+ public GZIPOutputStream(final OutputStream out, final int compressionLevel) throws IOException {
+ super(out);
+ def.setLevel(compressionLevel);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
new file mode 100644
index 0000000..bffbe26
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LeakyBucketStreamThrottler.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class LeakyBucketStreamThrottler implements StreamThrottler {
+
+ private final int maxBytesPerSecond;
+ private final BlockingQueue<Request> requestQueue = new LinkedBlockingQueue<Request>();
+ private final ScheduledExecutorService executorService;
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+ public LeakyBucketStreamThrottler(final int maxBytesPerSecond) {
+ this.maxBytesPerSecond = maxBytesPerSecond;
+
+ executorService = Executors.newSingleThreadScheduledExecutor();
+ final Runnable task = new Drain();
+ executorService.scheduleAtFixedRate(task, 0, 1000, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close() {
+ this.shutdown.set(true);
+
+ executorService.shutdown();
+ try {
+ // Should not take more than 2 seconds because we run every second. If it takes more than
+ // 2 seconds, it is because the Runnable thread is blocking on a write; in this case,
+ // we will just ignore it and return
+ executorService.awaitTermination(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ @Override
+ public OutputStream newThrottledOutputStream(final OutputStream toWrap) {
+ return new OutputStream() {
+ @Override
+ public void write(final int b) throws IOException {
+ write(new byte[]{(byte) b}, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ final InputStream in = new ByteArrayInputStream(b, off, len);
+ LeakyBucketStreamThrottler.this.copy(in, toWrap);
+ }
+
+ @Override
+ public void close() throws IOException {
+ toWrap.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ toWrap.flush();
+ }
+ };
+ }
+
+ @Override
+ public InputStream newThrottledInputStream(final InputStream toWrap) {
+ return new InputStream() {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ @Override
+ public int read() throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream(1);
+ LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L);
+ if (baos.getBufferLength() < 1) {
+ return -1;
+ }
+
+ return baos.getUnderlyingBuffer()[0] & 0xFF;
+ }
+
+ @Override
+ public int read(final byte[] b) throws IOException {
+ if(b.length == 0){
+ return 0;
+ }
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ baos.reset();
+ final int copied = (int) LeakyBucketStreamThrottler.this.copy(toWrap, baos, len);
+ if (copied == 0) {
+ return -1;
+ }
+ System.arraycopy(baos.getUnderlyingBuffer(), 0, b, off, copied);
+ return copied;
+ }
+
+ @Override
+ public void close() throws IOException {
+ toWrap.close();
+ }
+
+ @Override
+ public int available() throws IOException {
+ return toWrap.available();
+ }
+ };
+ }
+
+ @Override
+ public long copy(final InputStream in, final OutputStream out) throws IOException {
+ return copy(in, out, -1);
+ }
+
+ @Override
+ public long copy(final InputStream in, final OutputStream out, final long maxBytes) throws IOException {
+ long totalBytesCopied = 0;
+ boolean finished = false;
+ while (!finished) {
+ final long requestMax = (maxBytes < 0) ? Long.MAX_VALUE : maxBytes - totalBytesCopied;
+ final Request request = new Request(in, out, requestMax);
+ boolean transferred = false;
+ while (!transferred) {
+ if (shutdown.get()) {
+ throw new IOException("Throttler shutdown");
+ }
+
+ try {
+ transferred = requestQueue.offer(request, 1000, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ throw new IOException("Interrupted", e);
+ }
+ }
+
+ final BlockingQueue<Response> responseQueue = request.getResponseQueue();
+ Response response = null;
+ while (response == null) {
+ try {
+ if (shutdown.get()) {
+ throw new IOException("Throttler shutdown");
+ }
+ response = responseQueue.poll(1000L, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted", e);
+ }
+ }
+
+ if (!response.isSuccess()) {
+ throw response.getError();
+ }
+
+ totalBytesCopied += response.getBytesCopied();
+ finished = (response.getBytesCopied() == 0) || (totalBytesCopied >= maxBytes && maxBytes > 0);
+ }
+
+ return totalBytesCopied;
+ }
+
+ /**
+ * This class is responsible for draining water from the leaky bucket. I.e.,
+ * it actually moves the data
+ */
+ private class Drain implements Runnable {
+
+ private final byte[] buffer;
+
+ public Drain() {
+ final int bufferSize = Math.min(4096, maxBytesPerSecond);
+ buffer = new byte[bufferSize];
+ }
+
+ @Override
+ public void run() {
+ final long start = System.currentTimeMillis();
+
+ int bytesTransferred = 0;
+ while (bytesTransferred < maxBytesPerSecond) {
+ final long maxMillisToWait = 1000 - (System.currentTimeMillis() - start);
+ if (maxMillisToWait < 1) {
+ return;
+ }
+
+ try {
+ final Request request = requestQueue.poll(maxMillisToWait, TimeUnit.MILLISECONDS);
+ if (request == null) {
+ return;
+ }
+
+ final BlockingQueue<Response> responseQueue = request.getResponseQueue();
+
+ final OutputStream out = request.getOutputStream();
+ final InputStream in = request.getInputStream();
+
+ try {
+ final long requestMax = request.getMaxBytesToCopy();
+ long maxBytesToTransfer;
+ if (requestMax < 0) {
+ maxBytesToTransfer = Math.min(buffer.length, maxBytesPerSecond - bytesTransferred);
+ } else {
+ maxBytesToTransfer = Math.min(requestMax,
+ Math.min(buffer.length, maxBytesPerSecond - bytesTransferred));
+ }
+ maxBytesToTransfer = Math.max(1L, maxBytesToTransfer);
+
+ final int bytesCopied = fillBuffer(in, maxBytesToTransfer);
+ out.write(buffer, 0, bytesCopied);
+
+ final Response response = new Response(true, bytesCopied);
+ responseQueue.put(response);
+ bytesTransferred += bytesCopied;
+ } catch (final IOException e) {
+ final Response response = new Response(e);
+ responseQueue.put(response);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private int fillBuffer(final InputStream in, final long maxBytes) throws IOException {
+ int bytesRead = 0;
+ int len;
+ while (bytesRead < maxBytes && (len = in.read(buffer, bytesRead, (int) Math.min(maxBytes - bytesRead, buffer.length - bytesRead))) > 0) {
+ bytesRead += len;
+ }
+
+ return bytesRead;
+ }
+ }
+
+ private static class Response {
+
+ private final boolean success;
+ private final IOException error;
+ private final int bytesCopied;
+
+ public Response(final boolean success, final int bytesCopied) {
+ this.success = success;
+ this.bytesCopied = bytesCopied;
+ this.error = null;
+ }
+
+ public Response(final IOException error) {
+ this.success = false;
+ this.error = error;
+ this.bytesCopied = -1;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public IOException getError() {
+ return error;
+ }
+
+ public int getBytesCopied() {
+ return bytesCopied;
+ }
+ }
+
+ private static class Request {
+
+ private final OutputStream out;
+ private final InputStream in;
+ private final long maxBytesToCopy;
+ private final BlockingQueue<Response> responseQueue;
+
+ public Request(final InputStream in, final OutputStream out, final long maxBytesToCopy) {
+ this.out = out;
+ this.in = in;
+ this.maxBytesToCopy = maxBytesToCopy;
+ this.responseQueue = new LinkedBlockingQueue<Response>(1);
+ }
+
+ public BlockingQueue<Response> getResponseQueue() {
+ return this.responseQueue;
+ }
+
+ public OutputStream getOutputStream() {
+ return out;
+ }
+
+ public InputStream getInputStream() {
+ return in;
+ }
+
+ public long getMaxBytesToCopy() {
+ return maxBytesToCopy;
+ }
+
+ @Override
+ public String toString() {
+ return "Request[maxBytes=" + maxBytesToCopy + "]";
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
new file mode 100644
index 0000000..0e75a22
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableInputStream.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Wraps and InputStream so that the underlying InputStream cannot be closed.
+ * This is used so that the InputStream can be wrapped with yet another
+ * InputStream and prevent the outer layer from closing the inner InputStream
+ */
+public class NonCloseableInputStream extends FilterInputStream {
+
+ private final InputStream toWrap;
+
+ public NonCloseableInputStream(final InputStream toWrap) {
+ super(toWrap);
+ this.toWrap = toWrap;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return toWrap.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return toWrap.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return toWrap.read(b, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
new file mode 100644
index 0000000..9c77637
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NonCloseableOutputStream.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class NonCloseableOutputStream extends FilterOutputStream {
+
+ private final OutputStream out;
+
+ public NonCloseableOutputStream(final OutputStream out) {
+ super(out);
+ this.out = out;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.flush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
new file mode 100644
index 0000000..8452761
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/NullOutputStream.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * OutputStream that throws away all data, just like as if writing to /dev/null
+ */
+public class NullOutputStream extends OutputStream {
+
+ @Override
+ public void write(final int b) throws IOException {
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ }
+
+ @Override
+ public void write(final byte[] b, int off, int len) throws IOException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
new file mode 100644
index 0000000..9158050
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamThrottler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public interface StreamThrottler extends Closeable {
+
+ long copy(InputStream in, OutputStream out) throws IOException;
+
+ long copy(InputStream in, OutputStream out, long maxBytes) throws IOException;
+
+ InputStream newThrottledInputStream(final InputStream toWrap);
+
+ OutputStream newThrottledOutputStream(final OutputStream toWrap);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
new file mode 100644
index 0000000..8e3d606
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/StreamUtils.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.stream.io.exception.BytePatternNotFoundException;
+import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
+
+public class StreamUtils {
+
+ public static long copy(final InputStream source, final OutputStream destination) throws IOException {
+ final byte[] buffer = new byte[8192];
+ int len;
+ long totalCount = 0L;
+ while ((len = source.read(buffer)) > 0) {
+ destination.write(buffer, 0, len);
+ totalCount += len;
+ }
+ return totalCount;
+ }
+
+ /**
+ * Copies <code>numBytes</code> from <code>source</code> to
+ * <code>destination</code>. If <code>numBytes</code> are not available from
+ * <code>source</code>, throws EOFException
+ *
+ * @param source
+ * @param destination
+ * @param numBytes
+ * @throws IOException
+ */
+ public static void copy(final InputStream source, final OutputStream destination, final long numBytes) throws IOException {
+ final byte[] buffer = new byte[8192];
+ int len;
+ long bytesLeft = numBytes;
+ while ((len = source.read(buffer, 0, (int) Math.min(bytesLeft, buffer.length))) > 0) {
+ destination.write(buffer, 0, len);
+ bytesLeft -= len;
+ }
+
+ if (bytesLeft > 0) {
+ throw new EOFException("Attempted to copy " + numBytes + " bytes but only " + (numBytes - bytesLeft) + " bytes were available");
+ }
+ }
+
+ /**
+ * Reads data from the given input stream, copying it to the destination
+ * byte array. If the InputStream has less data than the given byte array,
+ * throws an EOFException
+ *
+ * @param source
+ * @param destination
+ * @throws IOException
+ */
+ public static void fillBuffer(final InputStream source, final byte[] destination) throws IOException {
+ fillBuffer(source, destination, true);
+ }
+
+ /**
+ * Reads data from the given input stream, copying it to the destination
+ * byte array. If the InputStream has less data than the given byte array,
+ * throws an EOFException if <code>ensureCapacity</code> is true and
+ * otherwise returns the number of bytes copied
+ *
+ * @param source
+ * @param destination
+ * @param ensureCapacity whether or not to enforce that the InputStream have
+ * at least as much data as the capacity of the destination byte array
+ * @return
+ * @throws IOException
+ */
+ public static int fillBuffer(final InputStream source, final byte[] destination, final boolean ensureCapacity) throws IOException {
+ int bytesRead = 0;
+ int len;
+ while (bytesRead < destination.length) {
+ len = source.read(destination, bytesRead, destination.length - bytesRead);
+ if (len < 0) {
+ if (ensureCapacity) {
+ throw new EOFException();
+ } else {
+ break;
+ }
+ }
+
+ bytesRead += len;
+ }
+
+ return bytesRead;
+ }
+
+ /**
+ * Copies data from in to out until either we are out of data (returns null)
+ * or we hit one of the byte patterns identified by the
+ * <code>stoppers</code> parameter (returns the byte pattern matched). The
+ * bytes in the stopper will be copied.
+ *
+ * @param in
+ * @param out
+ * @param maxBytes
+ * @param stoppers
+ * @return the byte array matched, or null if end of stream was reached
+ * @throws IOException
+ */
+ public static byte[] copyInclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException {
+ if (stoppers.length == 0) {
+ return null;
+ }
+
+ final List<NonThreadSafeCircularBuffer> circularBuffers = new ArrayList<NonThreadSafeCircularBuffer>();
+ for (final byte[] stopper : stoppers) {
+ circularBuffers.add(new NonThreadSafeCircularBuffer(stopper));
+ }
+
+ long bytesRead = 0;
+ while (true) {
+ final int next = in.read();
+ if (next == -1) {
+ return null;
+ } else if (maxBytes > 0 && ++bytesRead >= maxBytes) {
+ throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format");
+ }
+
+ out.write(next);
+
+ for (final NonThreadSafeCircularBuffer circ : circularBuffers) {
+ if (circ.addAndCompare((byte) next)) {
+ return circ.getByteArray();
+ }
+ }
+ }
+ }
+
+ /**
+ * Copies data from in to out until either we are out of data (returns null)
+ * or we hit one of the byte patterns identified by the
+ * <code>stoppers</code> parameter (returns the byte pattern matched). The
+ * byte pattern matched will NOT be copied to the output and will be un-read
+ * from the input.
+ *
+ * @param in
+ * @param out
+ * @param maxBytes
+ * @param stoppers
+ * @return the byte array matched, or null if end of stream was reached
+ * @throws IOException
+ */
+ public static byte[] copyExclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException {
+ if (stoppers.length == 0) {
+ return null;
+ }
+
+ int longest = 0;
+ NonThreadSafeCircularBuffer longestBuffer = null;
+ final List<NonThreadSafeCircularBuffer> circularBuffers = new ArrayList<NonThreadSafeCircularBuffer>();
+ for (final byte[] stopper : stoppers) {
+ final NonThreadSafeCircularBuffer circularBuffer = new NonThreadSafeCircularBuffer(stopper);
+ if (stopper.length > longest) {
+ longest = stopper.length;
+ longestBuffer = circularBuffer;
+ circularBuffers.add(0, circularBuffer);
+ } else {
+ circularBuffers.add(circularBuffer);
+ }
+ }
+
+ long bytesRead = 0;
+ while (true) {
+ final int next = in.read();
+ if (next == -1) {
+ return null;
+ } else if (maxBytes > 0 && bytesRead++ > maxBytes) {
+ throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format");
+ }
+
+ for (final NonThreadSafeCircularBuffer circ : circularBuffers) {
+ if (circ.addAndCompare((byte) next)) {
+ // The longest buffer has some data that may not have been written out yet; we need to make sure
+ // that we copy out those bytes.
+ final int bytesToCopy = longest - circ.getByteArray().length;
+ for (int i = 0; i < bytesToCopy; i++) {
+ final int oldestByte = longestBuffer.getOldestByte();
+ if (oldestByte != -1) {
+ out.write(oldestByte);
+ longestBuffer.addAndCompare((byte) 0);
+ }
+ }
+
+ return circ.getByteArray();
+ }
+ }
+
+ if (longestBuffer.isFilled()) {
+ out.write(longestBuffer.getOldestByte());
+ }
+ }
+ }
+
+ /**
+ * Skips the specified number of bytes from the InputStream
+ *
+ * If unable to skip that number of bytes, throws EOFException
+ *
+ * @param stream
+ * @param bytesToSkip
+ * @throws IOException
+ */
+ public static void skip(final InputStream stream, final long bytesToSkip) throws IOException {
+ if (bytesToSkip <= 0) {
+ return;
+ }
+ long totalSkipped = 0L;
+
+ // If we have a FileInputStream, calling skip(1000000) will return 1000000 even if the file is only
+ // 3 bytes. As a result, we will skip 1 less than the number requested, and then read the last
+ // byte in order to make sure that we've consumed the number of bytes requested. We then check that
+ // the final byte, which we read, is not -1.
+ final long actualBytesToSkip = bytesToSkip - 1;
+ while (totalSkipped < actualBytesToSkip) {
+ final long skippedThisIteration = stream.skip(actualBytesToSkip - totalSkipped);
+ if (skippedThisIteration == 0) {
+ final int nextByte = stream.read();
+ if (nextByte == -1) {
+ throw new EOFException();
+ } else {
+ totalSkipped++;
+ }
+ }
+
+ totalSkipped += skippedThisIteration;
+ }
+
+ final int lastByte = stream.read();
+ if (lastByte == -1) {
+ throw new EOFException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
new file mode 100644
index 0000000..2b9050d
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ZipOutputStream.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.OutputStream;
+
+/**
+ * This class extends the {@link java.util.zip.ZipOutputStream} by providing a
+ * constructor that allows the user to specify the compression level. The
+ * default compression level is 1, as opposed to Java's default of 5.
+ */
+public class ZipOutputStream extends java.util.zip.ZipOutputStream {
+
+ public static final int DEFAULT_COMPRESSION_LEVEL = 1;
+
+ public ZipOutputStream(final OutputStream out) {
+ this(out, DEFAULT_COMPRESSION_LEVEL);
+ }
+
+ public ZipOutputStream(final OutputStream out, final int compressionLevel) {
+ super(out);
+ def.setLevel(compressionLevel);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
new file mode 100644
index 0000000..5d08616
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/BytePatternNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.exception;
+
+import java.io.IOException;
+
+public class BytePatternNotFoundException extends IOException {
+
+ private static final long serialVersionUID = -4128911284318513973L;
+
+ public BytePatternNotFoundException(final String explanation) {
+ super(explanation);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
new file mode 100644
index 0000000..b4b4c17
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/NonThreadSafeCircularBuffer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import java.util.Arrays;
+
+public class NonThreadSafeCircularBuffer {
+
+ private final byte[] lookingFor;
+ private final int[] buffer;
+ private int insertionPointer = 0;
+ private int bufferSize = 0;
+
+ public NonThreadSafeCircularBuffer(final byte[] lookingFor) {
+ this.lookingFor = lookingFor;
+ buffer = new int[lookingFor.length];
+ Arrays.fill(buffer, -1);
+ }
+
+ public byte[] getByteArray() {
+ return lookingFor;
+ }
+
+ /**
+ * Returns the oldest byte in the buffer
+ *
+ * @return
+ */
+ public int getOldestByte() {
+ return buffer[insertionPointer];
+ }
+
+ public boolean isFilled() {
+ return bufferSize >= buffer.length;
+ }
+
+ public boolean addAndCompare(final byte data) {
+ buffer[insertionPointer] = data;
+ insertionPointer = (insertionPointer + 1) % lookingFor.length;
+
+ bufferSize++;
+ if (bufferSize < lookingFor.length) {
+ return false;
+ }
+
+ for (int i = 0; i < lookingFor.length; i++) {
+ final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length];
+ if (compare != lookingFor[i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java b/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
new file mode 100644
index 0000000..85bfd96
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/util/NaiveSearchRingBuffer.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.Arrays;
+
+/**
+ * <p>
+ * A RingBuffer that can be used to scan byte sequences for subsequences.
+ * </p>
+ *
+ * <p>
+ * This class implements an efficient naive search algorithm, which allows the
+ * user of the library to identify byte sequences in a stream on-the-fly so that
+ * the stream can be segmented without having to buffer the data.
+ * </p>
+ *
+ * <p>
+ * The intended usage paradigm is:
+ * <code>
+ * <pre>
+ * final byte[] searchSequence = ...;
+ * final CircularBuffer buffer = new CircularBuffer(searchSequence);
+ * while ((int nextByte = in.read()) > 0) {
+ * if ( buffer.addAndCompare(nextByte) ) {
+ * // This byte is the last byte in the given sequence
+ * } else {
+ * // This byte does not complete the given sequence
+ * }
+ * }
+ * </pre>
+ * </code>
+ * </p>
+ */
+public class NaiveSearchRingBuffer {
+
+ private final byte[] lookingFor;
+ private final int[] buffer;
+ private int insertionPointer = 0;
+ private int bufferSize = 0;
+
+ public NaiveSearchRingBuffer(final byte[] lookingFor) {
+ this.lookingFor = lookingFor;
+ this.buffer = new int[lookingFor.length];
+ Arrays.fill(buffer, -1);
+ }
+
+ /**
+ * Returns the contents of the internal buffer, which represents the last X
+ * bytes added to the buffer, where X is the minimum of the number of bytes
+ * added to the buffer or the length of the byte sequence for which we are
+ * looking
+ *
+ * @return
+ */
+ public byte[] getBufferContents() {
+ final int contentLength = Math.min(lookingFor.length, bufferSize);
+ final byte[] contents = new byte[contentLength];
+ for (int i = 0; i < contentLength; i++) {
+ final byte nextByte = (byte) buffer[(insertionPointer + i) % lookingFor.length];
+ contents[i] = nextByte;
+ }
+ return contents;
+ }
+
+ /**
+ * Returns the oldest byte in the buffer
+ *
+ * @return
+ */
+ public int getOldestByte() {
+ return buffer[insertionPointer];
+ }
+
+ /**
+ * Returns <code>true</code> if the number of bytes that have been added to
+ * the buffer is at least equal to the length of the byte sequence for which
+ * we are searching
+ *
+ * @return
+ */
+ public boolean isFilled() {
+ return bufferSize >= buffer.length;
+ }
+
+ /**
+ * Clears the internal buffer so that a new search may begin
+ */
+ public void clear() {
+ Arrays.fill(buffer, -1);
+ insertionPointer = 0;
+ bufferSize = 0;
+ }
+
+ /**
+ * Add the given byte to the buffer and notify whether or not the byte
+ * completes the desired byte sequence.
+ *
+ * @param data
+ * @return <code>true</code> if this byte completes the byte sequence,
+ * <code>false</code> otherwise.
+ */
+ public boolean addAndCompare(final byte data) {
+ buffer[insertionPointer] = data;
+ insertionPointer = (insertionPointer + 1) % lookingFor.length;
+
+ bufferSize++;
+ if (bufferSize < lookingFor.length) {
+ return false;
+ }
+
+ for (int i = 0; i < lookingFor.length; i++) {
+ final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length];
+ if (compare != lookingFor[i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}