You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:30:00 UTC

[37/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
new file mode 100644
index 0000000..bc46b0f
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+
+public class CompressionOutputStream extends OutputStream {
+
+    public static final byte[] SYNC_BYTES = new byte[]{'S', 'Y', 'N', 'C'};
+
+    public static final int DEFAULT_COMPRESSION_LEVEL = 1;
+    public static final int DEFAULT_BUFFER_SIZE = 64 << 10;
+    public static final int MIN_BUFFER_SIZE = 8 << 10;
+
+    private final OutputStream out;
+    private final Deflater deflater;
+
+    private final byte[] buffer;
+    private final byte[] compressed;
+
+    private int bufferIndex = 0;
+    private boolean dataWritten = false;
+
+    public CompressionOutputStream(final OutputStream outStream) {
+        this(outStream, DEFAULT_BUFFER_SIZE);
+    }
+
+    public CompressionOutputStream(final OutputStream outStream, final int bufferSize) {
+        this(outStream, bufferSize, DEFAULT_COMPRESSION_LEVEL, Deflater.DEFAULT_STRATEGY);
+    }
+
+    public CompressionOutputStream(final OutputStream outStream, final int bufferSize, final int level, final int strategy) {
+        if (bufferSize < MIN_BUFFER_SIZE) {
+            throw new IllegalArgumentException("Buffer size must be at least " + MIN_BUFFER_SIZE);
+        }
+
+        this.out = outStream;
+        this.deflater = new Deflater(level);
+        this.deflater.setStrategy(strategy);
+        buffer = new byte[bufferSize];
+        compressed = new byte[bufferSize + 64];
+    }
+
+    /**
+     * Compresses the currently buffered chunk of data and sends it to the
+     * output stream
+     *
+     * @throws IOException
+     */
+    protected void compressAndWrite() throws IOException {
+        if (bufferIndex <= 0) {
+            return;
+        }
+
+        deflater.setInput(buffer, 0, bufferIndex);
+        deflater.finish();
+        final int compressedBytes = deflater.deflate(compressed);
+
+        writeChunkHeader(compressedBytes);
+        out.write(compressed, 0, compressedBytes);
+
+        bufferIndex = 0;
+        deflater.reset();
+    }
+
+    private void writeChunkHeader(final int compressedBytes) throws IOException {
+        // If we have already written data, write out a '1' to indicate that we have more data; when we close
+        // the stream, we instead write a '0' to indicate that we are finished sending data.
+        if (dataWritten) {
+            out.write(1);
+        }
+        out.write(SYNC_BYTES);
+        dataWritten = true;
+
+        writeInt(out, bufferIndex);
+        writeInt(out, compressedBytes);
+    }
+
+    private void writeInt(final OutputStream out, final int val) throws IOException {
+        out.write(val >>> 24);
+        out.write(val >>> 16);
+        out.write(val >>> 8);
+        out.write(val);
+    }
+
+    protected boolean bufferFull() {
+        return bufferIndex >= buffer.length;
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        buffer[bufferIndex++] = (byte) (b & 0xFF);
+        if (bufferFull()) {
+            compressAndWrite();
+        }
+    }
+
+    @Override
+    public void write(final byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        int bytesLeft = len;
+        while (bytesLeft > 0) {
+            final int free = buffer.length - bufferIndex;
+            final int bytesThisIteration = Math.min(bytesLeft, free);
+            System.arraycopy(b, off + len - bytesLeft, buffer, bufferIndex, bytesThisIteration);
+            bufferIndex += bytesThisIteration;
+
+            bytesLeft -= bytesThisIteration;
+            if (bufferFull()) {
+                compressAndWrite();
+            }
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        compressAndWrite();
+        super.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        compressAndWrite();
+        out.write(0);   // indicate that the stream is finished.
+        out.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
new file mode 100644
index 0000000..e03dfbf
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+public class InterruptableInputStream extends InputStream {
+
+    private volatile boolean interrupted = false;
+    private final InputStream in;
+
+    public InterruptableInputStream(final InputStream in) {
+        this.in = in;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.read();
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.read(b);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.read(b, off, len);
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        in.close();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        in.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.markSupported();
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        in.reset();
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.skip(n);
+    }
+
+    public void interrupt() {
+        interrupted = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
new file mode 100644
index 0000000..cba5be6
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+public class InterruptableOutputStream extends OutputStream {
+
+    private final OutputStream out;
+    private volatile boolean interrupted = false;
+
+    public InterruptableOutputStream(final OutputStream out) {
+        this.out = out;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.write(b, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.close();
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.flush();
+    }
+
+    public void interrupt() {
+        this.interrupted = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
new file mode 100644
index 0000000..68913bd
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BufferStateManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(BufferStateManager.class);
+
+    private ByteBuffer buffer;
+    private Direction direction = Direction.WRITE;
+
+    public BufferStateManager(final ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    public BufferStateManager(final ByteBuffer buffer, final Direction direction) {
+        this.buffer = buffer;
+        this.direction = direction;
+    }
+
+    /**
+     * Ensures that the buffer is at least as big as the size specified,
+     * resizing the buffer if necessary. This operation MAY change the direction
+     * of the buffer.
+     *
+     * @param requiredSize
+     */
+    public void ensureSize(final int requiredSize) {
+        if (buffer.capacity() < requiredSize) {
+            final ByteBuffer newBuffer = ByteBuffer.allocate(requiredSize);
+
+            // we have to read buffer so make sure the direction is correct.
+            if (direction == Direction.WRITE) {
+                buffer.flip();
+            }
+
+            // Copy from buffer to newBuffer
+            newBuffer.put(buffer);
+
+            // Swap the buffers
+            buffer = newBuffer;
+
+            // the new buffer is ready to be written to
+            direction = Direction.WRITE;
+        }
+    }
+
+    public ByteBuffer prepareForWrite(final int requiredSize) {
+        ensureSize(requiredSize);
+
+        if (direction == Direction.READ) {
+            direction = Direction.WRITE;
+            buffer.position(buffer.limit());
+        }
+
+        buffer.limit(buffer.capacity());
+        return buffer;
+    }
+
+    public ByteBuffer prepareForRead(final int requiredSize) {
+        ensureSize(requiredSize);
+
+        if (direction == Direction.WRITE) {
+            direction = Direction.READ;
+            buffer.flip();
+        }
+
+        return buffer;
+    }
+
+    /**
+     * Clears the contents of the buffer and sets direction to WRITE
+     */
+    public void clear() {
+        logger.debug("Clearing {}", buffer);
+        buffer.clear();
+        direction = Direction.WRITE;
+    }
+
+    public void compact() {
+        final String before = buffer.toString();
+        buffer.compact();
+        logger.debug("Before compact: {}, after: {}", before, buffer);
+        direction = Direction.WRITE;
+    }
+
+    public static enum Direction {
+
+        READ, WRITE;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
new file mode 100644
index 0000000..32a3f26
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+public class SocketChannelInputStream extends InputStream {
+
+    private static final long CHANNEL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+    private final SocketChannel channel;
+    private volatile int timeoutMillis = 30000;
+
+    private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
+    private Byte bufferedByte = null;
+
+    public SocketChannelInputStream(final SocketChannel socketChannel) throws IOException {
+        // this class expects a non-blocking channel
+        socketChannel.configureBlocking(false);
+        this.channel = socketChannel;
+    }
+
+    public void setTimeout(final int timeoutMillis) {
+        this.timeoutMillis = timeoutMillis;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (bufferedByte != null) {
+            final int retVal = bufferedByte & 0xFF;
+            bufferedByte = null;
+            return retVal;
+        }
+
+        oneByteBuffer.flip();
+        oneByteBuffer.clear();
+
+        final long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesRead;
+        do {
+            bytesRead = channel.read(oneByteBuffer);
+            if (bytesRead == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out reading from socket");
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
+                }
+            }
+        } while (bytesRead == 0);
+
+        if (bytesRead == -1) {
+            return -1;
+        }
+        oneByteBuffer.flip();
+        return oneByteBuffer.get() & 0xFF;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws IOException {
+        if (bufferedByte != null) {
+            final byte retVal = bufferedByte;
+            bufferedByte = null;
+            b[off] = retVal;
+            return 1;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+
+        final long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesRead;
+        do {
+            bytesRead = channel.read(buffer);
+            if (bytesRead == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out reading from socket");
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
+                }
+            }
+        } while (bytesRead == 0);
+
+        return bytesRead;
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (bufferedByte != null) {
+            return 1;
+        }
+
+        isDataAvailable(); // attempt to read from socket
+        return (bufferedByte == null) ? 0 : 1;
+    }
+
+    public boolean isDataAvailable() throws IOException {
+        if (bufferedByte != null) {
+            return true;
+        }
+
+        oneByteBuffer.flip();
+        oneByteBuffer.clear();
+        final int bytesRead = channel.read(oneByteBuffer);
+        if (bytesRead == -1) {
+            throw new EOFException("Peer has closed the stream");
+        }
+        if (bytesRead > 0) {
+            oneByteBuffer.flip();
+            bufferedByte = oneByteBuffer.get();
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Closes the underlying socket channel.
+     * @throws java.io.IOException
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
new file mode 100644
index 0000000..77049ad
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+public class SocketChannelOutputStream extends OutputStream {
+
+    private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+    private final SocketChannel channel;
+    private volatile int timeout = 30000;
+
+    private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
+
+    public SocketChannelOutputStream(final SocketChannel socketChannel) throws IOException {
+        // this class expects a non-blocking channel
+        socketChannel.configureBlocking(false);
+        this.channel = socketChannel;
+    }
+
+    public void setTimeout(final int timeoutMillis) {
+        this.timeout = timeoutMillis;
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        oneByteBuffer.flip();
+        oneByteBuffer.clear();
+        oneByteBuffer.put((byte) b);
+        oneByteBuffer.flip();
+
+        final int timeoutMillis = this.timeout;
+        long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesWritten;
+        while (oneByteBuffer.hasRemaining()) {
+            bytesWritten = channel.write(oneByteBuffer);
+            if (bytesWritten == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out writing to socket");
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
+                }
+            } else {
+                return;
+            }
+        }
+    }
+
+    @Override
+    public void write(final byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+
+        final int timeoutMillis = this.timeout;
+        long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesWritten;
+        while (buffer.hasRemaining()) {
+            bytesWritten = channel.write(buffer);
+            if (bytesWritten == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out writing to socket");
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
+                }
+            } else {
+                maxTime = System.currentTimeMillis() + timeoutMillis;
+            }
+        }
+    }
+
+    /**
+     * Closes the underlying SocketChannel
+     * @throws java.io.IOException
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
new file mode 100644
index 0000000..5810488
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.security.cert.CertificateExpiredException;
+import javax.security.cert.CertificateNotYetValidException;
+import javax.security.cert.X509Certificate;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.io.socket.BufferStateManager;
+import org.apache.nifi.remote.io.socket.BufferStateManager.Direction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSLSocketChannel implements Closeable {
+
+    public static final int MAX_WRITE_SIZE = 65536;
+
+    private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class);
+    private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+
+    private final String hostname;
+    private final int port;
+    private final SSLEngine engine;
+    private final SocketAddress socketAddress;
+
+    private BufferStateManager streamInManager;
+    private BufferStateManager streamOutManager;
+    private BufferStateManager appDataManager;
+
+    private SocketChannel channel;
+
+    private final byte[] oneByteBuffer = new byte[1];
+
+    private int timeoutMillis = 30000;
+    private volatile boolean connected = false;
+    private boolean handshaking = false;
+    private boolean closed = false;
+    private volatile boolean interrupted = false;
+
+    public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException {
+        this.socketAddress = new InetSocketAddress(hostname, port);
+        this.channel = SocketChannel.open();
+        this.hostname = hostname;
+        this.port = port;
+        this.engine = sslContext.createSSLEngine();
+        this.engine.setUseClientMode(client);
+        engine.setNeedClientAuth(true);
+
+        streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+    }
+
+    public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException {
+        if (!socketChannel.isConnected()) {
+            throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
+        }
+
+        this.channel = socketChannel;
+
+        this.socketAddress = socketChannel.getRemoteAddress();
+        final Socket socket = socketChannel.socket();
+        this.hostname = socket.getInetAddress().getHostName();
+        this.port = socket.getPort();
+
+        this.engine = sslContext.createSSLEngine();
+        this.engine.setUseClientMode(client);
+        engine.setNeedClientAuth(true);
+
+        streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+    }
+
+    public void setTimeout(final int millis) {
+        this.timeoutMillis = millis;
+    }
+
+    public int getTimeout() {
+        return timeoutMillis;
+    }
+
+    public void connect() throws SSLHandshakeException, IOException {
+        try {
+            channel.configureBlocking(false);
+            if (!channel.isConnected()) {
+                final long startTime = System.currentTimeMillis();
+
+                if (!channel.connect(socketAddress)) {
+                    while (!channel.finishConnect()) {
+                        if (interrupted) {
+                            throw new TransmissionDisabledException();
+                        }
+                        if (System.currentTimeMillis() > startTime + timeoutMillis) {
+                            throw new SocketTimeoutException("Timed out connecting to " + hostname + ":" + port);
+                        }
+
+                        try {
+                            Thread.sleep(50L);
+                        } catch (final InterruptedException e) {
+                        }
+                    }
+                }
+            }
+            engine.beginHandshake();
+
+            performHandshake();
+            logger.debug("{} Successfully completed SSL handshake", this);
+
+            streamInManager.clear();
+            streamOutManager.clear();
+            appDataManager.clear();
+
+            connected = true;
+        } catch (final Exception e) {
+            logger.error("{} Failed to connect due to {}", this, e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+            closeQuietly(channel);
+            engine.closeInbound();
+            engine.closeOutbound();
+            throw e;
+        }
+    }
+
+    public String getDn() throws CertificateExpiredException, CertificateNotYetValidException, SSLPeerUnverifiedException {
+        final X509Certificate[] certs = engine.getSession().getPeerCertificateChain();
+        if (certs == null || certs.length == 0) {
+            throw new SSLPeerUnverifiedException("No certificates found");
+        }
+
+        final X509Certificate cert = certs[0];
+        cert.checkValidity();
+        return cert.getSubjectDN().getName().trim();
+    }
+
+    private void performHandshake() throws IOException {
+        // Generate handshake message
+        final byte[] emptyMessage = new byte[0];
+        handshaking = true;
+        logger.debug("{} Performing Handshake", this);
+
+        try {
+            while (true) {
+                switch (engine.getHandshakeStatus()) {
+                    case FINISHED:
+                        return;
+                    case NEED_WRAP: {
+                        final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+
+                        final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+                        final SSLEngineResult wrapHelloResult = engine.wrap(appDataOut, outboundBuffer);
+                        if (wrapHelloResult.getStatus() == Status.BUFFER_OVERFLOW) {
+                            streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+                            continue;
+                        }
+
+                        if (wrapHelloResult.getStatus() != Status.OK) {
+                            throw new SSLHandshakeException("Could not generate SSL Handshake information: SSLEngineResult: "
+                                    + wrapHelloResult.toString());
+                        }
+
+                        logger.trace("{} Handshake response after wrapping: {}", this, wrapHelloResult);
+
+                        final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
+                        final int bytesToSend = readableStreamOut.remaining();
+                        writeFully(readableStreamOut);
+                        logger.trace("{} Sent {} bytes of wrapped data for handshake", this, bytesToSend);
+
+                        streamOutManager.clear();
+                    }
+                    continue;
+                    case NEED_UNWRAP: {
+                        final ByteBuffer readableDataIn = streamInManager.prepareForRead(0);
+                        final ByteBuffer appData = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+                        // Read handshake response from other side
+                        logger.trace("{} Unwrapping: {} to {}", new Object[]{this, readableDataIn, appData});
+                        SSLEngineResult handshakeResponseResult = engine.unwrap(readableDataIn, appData);
+                        logger.trace("{} Handshake response after unwrapping: {}", this, handshakeResponseResult);
+
+                        if (handshakeResponseResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+                            final ByteBuffer writableDataIn = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+                            final int bytesRead = readData(writableDataIn);
+                            if (bytesRead > 0) {
+                                logger.trace("{} Read {} bytes for handshake", this, bytesRead);
+                            }
+
+                            if (bytesRead < 0) {
+                                throw new SSLHandshakeException("Reached End-of-File marker while performing handshake");
+                            }
+                        } else if (handshakeResponseResult.getStatus() == Status.CLOSED) {
+                            throw new IOException("Channel was closed by peer during handshake");
+                        } else {
+                            streamInManager.compact();
+                            appDataManager.clear();
+                        }
+                    }
+                    break;
+                    case NEED_TASK:
+                        performTasks();
+                        continue;
+                    case NOT_HANDSHAKING:
+                        return;
+                }
+            }
+        } finally {
+            handshaking = false;
+        }
+    }
+
+    private void performTasks() {
+        Runnable runnable;
+        while ((runnable = engine.getDelegatedTask()) != null) {
+            runnable.run();
+        }
+    }
+
+    private void closeQuietly(final Closeable closeable) {
+        try {
+            closeable.close();
+        } catch (final Exception e) {
+        }
+    }
+
+    private int readData(final ByteBuffer dest) throws IOException {
+        final long startTime = System.currentTimeMillis();
+
+        while (true) {
+            if (interrupted) {
+                throw new TransmissionDisabledException();
+            }
+
+            if (dest.remaining() == 0) {
+                return 0;
+            }
+
+            final int readCount = channel.read(dest);
+
+            if (readCount == 0) {
+                if (System.currentTimeMillis() > startTime + timeoutMillis) {
+                    throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port);
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException();
+                }
+
+                continue;
+            }
+
+            logger.trace("{} Read {} bytes", this, readCount);
+            return readCount;
+        }
+    }
+
+    private Status encryptAndWriteFully(final BufferStateManager src) throws IOException {
+        SSLEngineResult result = null;
+
+        final ByteBuffer buff = src.prepareForRead(0);
+        final ByteBuffer outBuff = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+        logger.trace("{} Encrypting {} bytes", this, buff.remaining());
+        while (buff.remaining() > 0) {
+            result = engine.wrap(buff, outBuff);
+            if (result.getStatus() == Status.OK) {
+                final ByteBuffer readableOutBuff = streamOutManager.prepareForRead(0);
+                writeFully(readableOutBuff);
+                streamOutManager.clear();
+            } else {
+                return result.getStatus();
+            }
+        }
+
+        return result.getStatus();
+    }
+
+    private void writeFully(final ByteBuffer src) throws IOException {
+        long lastByteWrittenTime = System.currentTimeMillis();
+
+        int bytesWritten = 0;
+        while (src.hasRemaining()) {
+            if (interrupted) {
+                throw new TransmissionDisabledException();
+            }
+
+            final int written = channel.write(src);
+            bytesWritten += written;
+            final long now = System.currentTimeMillis();
+            if (written > 0) {
+                lastByteWrittenTime = now;
+            } else {
+                if (now > lastByteWrittenTime + timeoutMillis) {
+                    throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port);
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+                } catch (final InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException();
+                }
+            }
+        }
+
+        logger.trace("{} Wrote {} bytes", this, bytesWritten);
+    }
+
+    public boolean isClosed() {
+        if (closed) {
+            return true;
+        }
+        // need to detect if peer has sent closure handshake...if so the answer is true
+        final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+        int readCount = 0;
+        try {
+            readCount = channel.read(writableInBuffer);
+        } catch (IOException e) {
+            logger.error("{} Failed to readData due to {}", new Object[]{this, e});
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+            readCount = -1; // treat the condition same as if End of Stream
+        }
+        if (readCount == 0) {
+            return false;
+        }
+        if (readCount > 0) {
+            logger.trace("{} Read {} bytes", this, readCount);
+
+            final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
+            final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+            try {
+                SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+                logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
+                if (unwrapResponse.getStatus().equals(Status.CLOSED)) {
+                    // Drain the incoming TCP buffer
+                    final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+                    int bytesDiscarded = channel.read(discardBuffer);
+                    while (bytesDiscarded > 0) {
+                        discardBuffer.clear();
+                        bytesDiscarded = channel.read(discardBuffer);
+                    }
+                    engine.closeInbound();
+                } else {
+                    streamInManager.compact();
+                    return false;
+                }
+            } catch (IOException e) {
+                logger.error("{} Failed to check if closed due to {}. Closing channel.", new Object[]{this, e});
+                if (logger.isDebugEnabled()) {
+                    logger.error("", e);
+                }
+            }
+        }
+        // either readCount is -1, indicating an end of stream, or the peer sent a closure handshake
+        // so go ahead and close down the channel
+        closeQuietly(channel.socket());
+        closeQuietly(channel);
+        closed = true;
+        return true;
+    }
+
+    @Override
+    public void close() throws IOException {
+        logger.debug("{} Closing Connection", this);
+        if (channel == null) {
+            return;
+        }
+
+        if (closed) {
+            return;
+        }
+
+        try {
+            engine.closeOutbound();
+
+            final byte[] emptyMessage = new byte[0];
+
+            final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+            final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+            final SSLEngineResult handshakeResult = engine.wrap(appDataOut, outboundBuffer);
+
+            if (handshakeResult.getStatus() != Status.CLOSED) {
+                throw new IOException("Invalid close state - will not send network data");
+            }
+
+            final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
+            writeFully(readableStreamOut);
+        } finally {
+            // Drain the incoming TCP buffer
+            final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+            try {
+                int bytesDiscarded = channel.read(discardBuffer);
+                while (bytesDiscarded > 0) {
+                    discardBuffer.clear();
+                    bytesDiscarded = channel.read(discardBuffer);
+                }
+            } catch (Exception e) {
+            }
+
+            closeQuietly(channel.socket());
+            closeQuietly(channel);
+            closed = true;
+        }
+    }
+
+    private int copyFromAppDataBuffer(final byte[] buffer, final int offset, final int len) {
+        // If any data already exists in the application data buffer, copy it to the buffer.
+        final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+
+        final int appDataRemaining = appDataBuffer.remaining();
+        if (appDataRemaining > 0) {
+            final int bytesToCopy = Math.min(len, appDataBuffer.remaining());
+            appDataBuffer.get(buffer, offset, bytesToCopy);
+
+            final int bytesCopied = appDataRemaining - appDataBuffer.remaining();
+            logger.trace("{} Copied {} ({}) bytes from unencrypted application buffer to user space",
+                    new Object[]{this, bytesToCopy, bytesCopied});
+            return bytesCopied;
+        }
+        return 0;
+    }
+
+    public int available() throws IOException {
+        ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+        ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+        final int buffered = appDataBuffer.remaining() + streamDataBuffer.remaining();
+        if (buffered > 0) {
+            return buffered;
+        }
+
+        final boolean wasAbleToRead = isDataAvailable();
+        if (!wasAbleToRead) {
+            return 0;
+        }
+
+        appDataBuffer = appDataManager.prepareForRead(1);
+        streamDataBuffer = streamInManager.prepareForRead(1);
+        return appDataBuffer.remaining() + streamDataBuffer.remaining();
+    }
+
+    public boolean isDataAvailable() throws IOException {
+        final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+        final ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+
+        if (appDataBuffer.remaining() > 0 || streamDataBuffer.remaining() > 0) {
+            return true;
+        }
+
+        final ByteBuffer writableBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+        final int bytesRead = channel.read(writableBuffer);
+        return (bytesRead > 0);
+    }
+
+    public int read() throws IOException {
+        final int bytesRead = read(oneByteBuffer);
+        if (bytesRead == -1) {
+            return -1;
+        }
+        return oneByteBuffer[0] & 0xFF;
+    }
+
+    public int read(final byte[] buffer) throws IOException {
+        return read(buffer, 0, buffer.length);
+    }
+
+    public int read(final byte[] buffer, final int offset, final int len) throws IOException {
+        logger.debug("{} Reading up to {} bytes of data", this, len);
+
+        if (!connected) {
+            connect();
+        }
+
+        int copied = copyFromAppDataBuffer(buffer, offset, len);
+        if (copied > 0) {
+            return copied;
+        }
+
+        appDataManager.clear();
+
+        while (true) {
+            // prepare buffers and call unwrap
+            final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
+            SSLEngineResult unwrapResponse = null;
+            final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+            unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+            logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
+
+            switch (unwrapResponse.getStatus()) {
+                case BUFFER_OVERFLOW:
+                    throw new SSLHandshakeException("Buffer Overflow, which is not allowed to happen from an unwrap");
+                case BUFFER_UNDERFLOW: {
+//                appDataManager.prepareForRead(engine.getSession().getApplicationBufferSize());
+
+                    final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+                    final int bytesRead = readData(writableInBuffer);
+                    if (bytesRead < 0) {
+                        return -1;
+                    }
+
+                    continue;
+                }
+                case CLOSED:
+                    throw new IOException("Channel is closed");
+                case OK: {
+                    copied = copyFromAppDataBuffer(buffer, offset, len);
+                    if (copied == 0) {
+                        throw new IOException("Failed to decrypt data");
+                    }
+                    streamInManager.compact();
+                    return copied;
+                }
+            }
+        }
+    }
+
+    public void write(final int data) throws IOException {
+        write(new byte[]{(byte) data}, 0, 1);
+    }
+
+    public void write(final byte[] data) throws IOException {
+        write(data, 0, data.length);
+    }
+
+    public void write(final byte[] data, final int offset, final int len) throws IOException {
+        logger.debug("{} Writing {} bytes of data", this, len);
+
+        if (!connected) {
+            connect();
+        }
+
+        int iterations = len / MAX_WRITE_SIZE;
+        if (len % MAX_WRITE_SIZE > 0) {
+            iterations++;
+        }
+
+        for (int i = 0; i < iterations; i++) {
+            streamOutManager.clear();
+            final int itrOffset = offset + i * MAX_WRITE_SIZE;
+            final int itrLen = Math.min(len - itrOffset, MAX_WRITE_SIZE);
+            final ByteBuffer byteBuffer = ByteBuffer.wrap(data, itrOffset, itrLen);
+
+            final BufferStateManager buffMan = new BufferStateManager(byteBuffer, Direction.READ);
+            final Status status = encryptAndWriteFully(buffMan);
+            switch (status) {
+                case BUFFER_OVERFLOW:
+                    streamOutManager.ensureSize(engine.getSession().getPacketBufferSize());
+                    appDataManager.ensureSize(engine.getSession().getApplicationBufferSize());
+                    continue;
+                case OK:
+                    continue;
+                case CLOSED:
+                    throw new IOException("Channel is closed");
+                case BUFFER_UNDERFLOW:
+                    throw new AssertionError("Got Buffer Underflow but should not have...");
+            }
+        }
+    }
+
+    public void interrupt() {
+        this.interrupted = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
new file mode 100644
index 0000000..154bd08
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SSLSocketChannelInputStream extends InputStream {
+
+    private final SSLSocketChannel channel;
+
+    public SSLSocketChannelInputStream(final SSLSocketChannel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public int read() throws IOException {
+        return channel.read();
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        return channel.read(b);
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws IOException {
+        return channel.read(b, off, len);
+    }
+
+    /**
+     * Closes the underlying SSLSocketChannel, which will also close the
+     * OutputStream and connection
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+
+    @Override
+    public int available() throws IOException {
+        return channel.available();
+    }
+
+    public boolean isDataAvailable() throws IOException {
+        return available() > 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
new file mode 100644
index 0000000..ce4e420
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class SSLSocketChannelOutputStream extends OutputStream {
+
+    private final SSLSocketChannel channel;
+
+    public SSLSocketChannelOutputStream(final SSLSocketChannel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        channel.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        channel.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        channel.write(b, off, len);
+    }
+
+    /**
+     * Closes the underlying SSLSocketChannel, which also will close the
+     * InputStream and the connection
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java b/commons/remote-communications-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
new file mode 100644
index 0000000..bd30a96
--- /dev/null
+++ b/commons/remote-communications-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.remote.io.CompressionOutputStream;
+
+import org.junit.Test;
+
+public class TestCompressionInputOutputStreams {
+
+    @Test
+    public void testSimple() throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final byte[] data = "Hello, World!".getBytes("UTF-8");
+
+        final CompressionOutputStream cos = new CompressionOutputStream(baos);
+        cos.write(data);
+        cos.flush();
+        cos.close();
+
+        final byte[] compressedBytes = baos.toByteArray();
+        final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes));
+        final byte[] decompressed = readFully(cis);
+
+        assertTrue(Arrays.equals(data, decompressed));
+    }
+
+    @Test
+    public void testDataLargerThanBuffer() throws IOException {
+        final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r";
+
+        final StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 100; i++) {
+            sb.append(str);
+        }
+        final byte[] data = sb.toString().getBytes("UTF-8");
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192);
+        cos.write(data);
+        cos.flush();
+        cos.close();
+
+        final byte[] compressedBytes = baos.toByteArray();
+        final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes));
+        final byte[] decompressed = readFully(cis);
+
+        assertTrue(Arrays.equals(data, decompressed));
+    }
+
+    @Test
+    public void testDataLargerThanBufferWhileFlushing() throws IOException {
+        final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r";
+        final byte[] data = str.getBytes("UTF-8");
+
+        final StringBuilder sb = new StringBuilder();
+        final byte[] data1024;
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192);
+        for (int i = 0; i < 1024; i++) {
+            cos.write(data);
+            cos.flush();
+            sb.append(str);
+        }
+        cos.close();
+        data1024 = sb.toString().getBytes("UTF-8");
+
+        final byte[] compressedBytes = baos.toByteArray();
+        final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes));
+        final byte[] decompressed = readFully(cis);
+
+        assertTrue(Arrays.equals(data1024, decompressed));
+    }
+
+    @Test
+    public void testSendingMultipleFilesBackToBackOnSameStream() throws IOException {
+        final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r";
+        final byte[] data = str.getBytes("UTF-8");
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192);
+        for (int i = 0; i < 512; i++) {
+            cos.write(data);
+            cos.flush();
+        }
+        cos.close();
+
+        final CompressionOutputStream cos2 = new CompressionOutputStream(baos, 8192);
+        for (int i = 0; i < 512; i++) {
+            cos2.write(data);
+            cos2.flush();
+        }
+        cos2.close();
+
+        final byte[] data512;
+        final StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 512; i++) {
+            sb.append(str);
+        }
+        data512 = sb.toString().getBytes("UTF-8");
+
+        final byte[] compressedBytes = baos.toByteArray();
+        final ByteArrayInputStream bais = new ByteArrayInputStream(compressedBytes);
+
+        final CompressionInputStream cis = new CompressionInputStream(bais);
+        final byte[] decompressed = readFully(cis);
+        assertTrue(Arrays.equals(data512, decompressed));
+
+        final CompressionInputStream cis2 = new CompressionInputStream(bais);
+        final byte[] decompressed2 = readFully(cis2);
+        assertTrue(Arrays.equals(data512, decompressed2));
+    }
+
+    private byte[] readFully(final InputStream in) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final byte[] buffer = new byte[65536];
+        int len;
+        while ((len = in.read(buffer)) >= 0) {
+            baos.write(buffer, 0, len);
+        }
+
+        return baos.toByteArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/search-utils/pom.xml b/commons/search-utils/pom.xml
new file mode 100644
index 0000000..569958f
--- /dev/null
+++ b/commons/search-utils/pom.xml
@@ -0,0 +1,32 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-search-utils</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <name>search-utils</name>
+
+    <dependencies>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/Search.java
----------------------------------------------------------------------
diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/Search.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/Search.java
new file mode 100644
index 0000000..59b444a
--- /dev/null
+++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/Search.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+
+import org.apache.nifi.util.search.ahocorasick.SearchState;
+
+/**
+ * Defines an interface to search for content given a set of search terms. Any
+ * implementation of search must be thread safe.
+ *
+ * @author
+ * @param <T>
+ */
+public interface Search<T> {
+
+    /**
+     * Establishes the dictionary of terms which will be searched in subsequent
+     * search calls. This can be called only once
+     *
+     * @param terms
+     */
+    void initializeDictionary(Set<SearchTerm<T>> terms);
+
+    /**
+     * Searches the given input stream for matches between the already specified
+     * dictionary and the contents scanned.
+     *
+     * @param haystack
+     * @param findAll if true will find all matches if false will find only the
+     * first match
+     * @return SearchState containing results Map might be empty which indicates
+     * no matches found but will not be null
+     * @throws IOException Thrown for any exceptions occurring while searching.
+     * @throws IllegalStateException if the dictionary has not yet been
+     * initialized
+     */
+    SearchState<T> search(InputStream haystack, boolean findAll) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
----------------------------------------------------------------------
diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
new file mode 100644
index 0000000..62de964
--- /dev/null
+++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+/**
+ * This is an immutable thread safe object representing a search term
+ *
+ * @author
+ * @param <T>
+ */
+public class SearchTerm<T> {
+
+    private final byte[] bytes;
+    private final int hashCode;
+    private final T reference;
+
+    /**
+     * Constructs a SearchTerm. Defensively copies the given byte array
+     *
+     * @param bytes
+     * @throws IllegalArgument exception if given bytes are null or 0 length
+     */
+    public SearchTerm(final byte[] bytes) {
+        this(bytes, true, null);
+    }
+
+    /**
+     * Constructs a search term. Optionally performs a defensive copy of the
+     * given byte array. If the caller indicates a defensive copy is not
+     * necessary then they must not change the given arrays state any longer
+     *
+     * @param bytes
+     * @param defensiveCopy
+     * @param reference
+     */
+    public SearchTerm(final byte[] bytes, final boolean defensiveCopy, final T reference) {
+        if (bytes == null || bytes.length == 0) {
+            throw new IllegalArgumentException();
+        }
+        if (defensiveCopy) {
+            this.bytes = Arrays.copyOf(bytes, bytes.length);
+        } else {
+            this.bytes = bytes;
+        }
+        this.hashCode = Arrays.hashCode(this.bytes);
+        this.reference = reference;
+    }
+
+    public int get(final int index) {
+        return bytes[index] & 0xff;
+    }
+
+    /**
+     * @return size in of search term in bytes
+     */
+    public int size() {
+        return bytes.length;
+    }
+
+    /**
+     * @return reference object for this given search term
+     */
+    public T getReference() {
+        return reference;
+    }
+
+    /**
+     * Determines if the given window starts with the same bytes as this term
+     *
+     * @param window Current window of bytes from the haystack being evaluated.
+     * @param windowLength The length of the window to consider
+     * @return true if this term starts with the same bytes of the given window
+     */
+    public boolean startsWith(byte[] window, int windowLength) {
+        if (windowLength > window.length) {
+            throw new IndexOutOfBoundsException();
+        }
+        if (bytes.length < windowLength) {
+            return false;
+        }
+        for (int i = 0; i < bytes.length && i < windowLength; i++) {
+            if (bytes[i] != window[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * @return a defensive copy of the internal byte structure
+     */
+    public byte[] getBytes() {
+        return Arrays.copyOf(bytes, bytes.length);
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final SearchTerm other = (SearchTerm) obj;
+        if (this.hashCode != other.hashCode) {
+            return false;
+        }
+        return Arrays.equals(this.bytes, other.bytes);
+    }
+
+    @Override
+    public String toString() {
+        return new String(bytes);
+    }
+
+    public String toString(final Charset charset) {
+        return new String(bytes, charset);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
----------------------------------------------------------------------
diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
new file mode 100644
index 0000000..3b8afaf
--- /dev/null
+++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search.ahocorasick;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.nifi.util.search.Search;
+import org.apache.nifi.util.search.SearchTerm;
+
+public class AhoCorasick<T> implements Search<T> {
+
+    private Node root = null;
+
+    /**
+     * Constructs a new search object.
+     *
+     * @throws IllegalArgumentException if given terms are null or empty
+     */
+    public AhoCorasick() {
+    }
+
+    @Override
+    public void initializeDictionary(final Set<SearchTerm<T>> terms) {
+        if (root != null) {
+            throw new IllegalStateException();
+        }
+        root = new Node();
+        if (terms == null || terms.isEmpty()) {
+            throw new IllegalArgumentException();
+        }
+        for (final SearchTerm<T> term : terms) {
+            int i = 0;
+            Node nextNode = root;
+            while (true) {
+                nextNode = addMatch(term, i, nextNode);
+                if (nextNode == null) {
+                    break; //we're done
+                }
+                i++;
+            }
+        }
+        initialize();
+    }
+
+    private Node addMatch(final SearchTerm<T> term, final int offset, final Node current) {
+        final int index = term.get(offset);
+        boolean atEnd = (offset == (term.size() - 1));
+        if (current.getNeighbor(index) == null) {
+            if (atEnd) {
+                current.setNeighbor(new Node(term), index);
+                return null;
+            }
+            current.setNeighbor(new Node(), index);
+        } else if (atEnd) {
+            current.getNeighbor(index).setMatchingTerm(term);
+            return null;
+        }
+        return current.getNeighbor(index);
+    }
+
+    private void initialize() {
+        //perform bgs to build failure links
+        final Queue<Node> queue = new LinkedList<>();
+        queue.add(root);
+        root.setFailureNode(null);
+        while (!queue.isEmpty()) {
+            final Node current = queue.poll();
+            for (int i = 0; i < 256; i++) {
+                final Node next = current.getNeighbor(i);
+                if (next != null) {
+                    //traverse failure to get state
+                    Node fail = current.getFailureNode();
+                    while ((fail != null) && fail.getNeighbor(i) == null) {
+                        fail = fail.getFailureNode();
+                    }
+                    if (fail != null) {
+                        next.setFailureNode(fail.getNeighbor(i));
+                    } else {
+                        next.setFailureNode(root);
+                    }
+                    queue.add(next);
+                }
+            }
+        }
+    }
+
+    @Override
+    public SearchState search(final InputStream stream, final boolean findAll) throws IOException {
+        return search(stream, findAll, null);
+    }
+
+    private SearchState search(final InputStream stream, final boolean findAll, final SearchState state) throws IOException {
+        if (root == null) {
+            throw new IllegalStateException();
+        }
+        final SearchState<T> currentState = (state == null) ? new SearchState(root) : state;
+        if (!findAll && currentState.foundMatch()) {
+            throw new IllegalStateException("A match has already been found yet we're being asked to keep searching");
+        }
+        Node current = currentState.getCurrentNode();
+        int currentChar;
+        while ((currentChar = stream.read()) >= 0) {
+            currentState.incrementBytesRead(1L);
+            Node next = current.getNeighbor(currentChar);
+            if (next == null) {
+                next = current.getFailureNode();
+                while ((next != null) && next.getNeighbor(currentChar) == null) {
+                    next = next.getFailureNode();
+                }
+                if (next != null) {
+                    next = next.getNeighbor(currentChar);
+                } else {
+                    next = root;
+                }
+            }
+            if (next == null) {
+                throw new IllegalStateException("tree out of sync");
+            }
+            //Accept condition
+            if (next.hasMatch()) {
+                currentState.addResult(next.getMatchingTerm());
+            }
+            for (Node failNode = next.getFailureNode(); failNode != null; failNode = failNode.getFailureNode()) {
+                if (failNode.hasMatch()) {
+                    currentState.addResult(failNode.getMatchingTerm());
+                }
+            }
+            current = next;
+            if (currentState.foundMatch() && !findAll) {
+                break;//give up as soon as we have at least one match
+            }
+        }
+        currentState.setCurrentNode(current);
+        return currentState;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
----------------------------------------------------------------------
diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
new file mode 100644
index 0000000..0ac325c
--- /dev/null
+++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search.ahocorasick;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.util.search.SearchTerm;
+
+/**
+ *
+ * @author
+ */
+public class Node {
+
+    private final Map<Integer, Node> neighborMap;
+    private Node failureNode;
+    private SearchTerm<?> term;
+
+    Node(final SearchTerm<?> term) {
+        this();
+        this.term = term;
+    }
+
+    Node() {
+        neighborMap = new HashMap<>();
+        term = null;
+    }
+
+    void setFailureNode(final Node fail) {
+        failureNode = fail;
+    }
+
+    public Node getFailureNode() {
+        return failureNode;
+    }
+
+    public boolean hasMatch() {
+        return term != null;
+    }
+
+    void setMatchingTerm(final SearchTerm<?> term) {
+        this.term = term;
+    }
+
+    public SearchTerm<?> getMatchingTerm() {
+        return term;
+    }
+
+    public Node getNeighbor(final int index) {
+        return neighborMap.get(index);
+    }
+
+    void setNeighbor(final Node neighbor, final int index) {
+        neighborMap.put(index, neighbor);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
----------------------------------------------------------------------
diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
new file mode 100644
index 0000000..6d36ad0
--- /dev/null
+++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search.ahocorasick;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.search.SearchTerm;
+
+public class SearchState<T> {
+
+    private Node currentNode;
+    private final Map<SearchTerm<T>, List<Long>> resultMap;
+    private long bytesRead;
+
+    SearchState(final Node rootNode) {
+        resultMap = new HashMap<>(5);
+        currentNode = rootNode;
+        bytesRead = 0L;
+    }
+
+    void incrementBytesRead(final long increment) {
+        bytesRead += increment;
+    }
+
+    void setCurrentNode(final Node curr) {
+        currentNode = curr;
+    }
+
+    public Node getCurrentNode() {
+        return currentNode;
+    }
+
+    public Map<SearchTerm<T>, List<Long>> getResults() {
+        return new HashMap<>(resultMap);
+    }
+
+    void addResult(final SearchTerm matchingTerm) {
+        final List<Long> indexes = (resultMap.containsKey(matchingTerm)) ? resultMap.get(matchingTerm) : new ArrayList<Long>(5);
+        indexes.add(bytesRead);
+        resultMap.put(matchingTerm, indexes);
+    }
+
+    public boolean foundMatch() {
+        return !resultMap.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/.gitignore
----------------------------------------------------------------------
diff --git a/commons/wali/.gitignore b/commons/wali/.gitignore
new file mode 100755
index 0000000..19f2e00
--- /dev/null
+++ b/commons/wali/.gitignore
@@ -0,0 +1,2 @@
+/target
+/target

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/pom.xml
----------------------------------------------------------------------
diff --git a/commons/wali/pom.xml b/commons/wali/pom.xml
new file mode 100644
index 0000000..ce04973
--- /dev/null
+++ b/commons/wali/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <groupId>wali</groupId>
+    <artifactId>wali</artifactId>
+
+    <version>3.0.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>WALI : Write-Ahead Log Implementation</name>
+    
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-stream-utils</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>