You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/20 06:52:28 UTC

[6/8] incubator-nifi git commit: NIFI-189 Collapsed several commons libs into one

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java
deleted file mode 100644
index 33fb444..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.IOException;
-import java.nio.file.Path;
-
-public interface UpdateMonitor {
-
-    Object getCurrentState(Path path) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
new file mode 100644
index 0000000..77c34c9
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class StandardVersionNegotiator implements VersionNegotiator {
+
+    private final List<Integer> versions;
+    private int curVersion;
+
+    public StandardVersionNegotiator(final int... supportedVersions) {
+        if (Objects.requireNonNull(supportedVersions).length == 0) {
+            throw new IllegalArgumentException("At least one version must be supported");
+        }
+
+        final List<Integer> supported = new ArrayList<>();
+        for (final int version : supportedVersions) {
+            supported.add(version);
+        }
+        this.versions = Collections.unmodifiableList(supported);
+        this.curVersion = supportedVersions[0];
+    }
+
+    @Override
+    public int getVersion() {
+        return curVersion;
+    }
+
+    @Override
+    public void setVersion(final int version) throws IllegalArgumentException {
+        if (!isVersionSupported(version)) {
+            throw new IllegalArgumentException("Version " + version + " is not supported");
+        }
+
+        this.curVersion = version;
+    }
+
+    @Override
+    public int getPreferredVersion() {
+        return versions.get(0);
+    }
+
+    @Override
+    public Integer getPreferredVersion(final int maxVersion) {
+        for (final Integer version : this.versions) {
+            if (maxVersion >= version) {
+                return version;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean isVersionSupported(final int version) {
+        return versions.contains(version);
+    }
+
+    @Override
+    public List<Integer> getSupportedVersions() {
+        return versions;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
new file mode 100644
index 0000000..74f9b3d
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.List;
+
+public interface VersionNegotiator {
+
+    /**
+     * @return the currently configured Version of this resource
+     */
+    int getVersion();
+
+    /**
+     * Sets the version of this resource to the specified version. Only the
+     * lower byte of the version is relevant.
+     *
+     * @param version
+     * @throws IllegalArgumentException if the given Version is not supported by
+     * this resource, as is indicated by the {@link #isVersionSupported(int)}
+     * method
+     */
+    void setVersion(int version) throws IllegalArgumentException;
+
+    /**
+     *
+     * @return the Version of this resource that is preferred
+     */
+    int getPreferredVersion();
+
+    /**
+     * Gets the preferred version of this resource that is no greater than the
+     * given maxVersion. If no acceptable version exists that is less than
+     * <code>maxVersion</code>, then <code>null</code> is returned
+     *
+     * @param maxVersion
+     * @return
+     */
+    Integer getPreferredVersion(int maxVersion);
+
+    /**
+     * Indicates whether or not the specified version is supported by this
+     * resource
+     *
+     * @param version
+     * @return
+     */
+    boolean isVersionSupported(int version);
+
+    List<Integer> getSupportedVersions();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
new file mode 100644
index 0000000..05fd915
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+/**
+ * Indicates that the user disabled transmission while communications were
+ * taking place with a peer
+ */
+public class TransmissionDisabledException extends RuntimeException {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
new file mode 100644
index 0000000..71cf894
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+public class CompressionInputStream extends InputStream {
+
+    private final InputStream in;
+    private final Inflater inflater;
+
+    private byte[] compressedBuffer;
+    private byte[] buffer;
+
+    private int bufferIndex;
+    private boolean eos = false;    // whether or not we've reached the end of stream
+    private boolean allDataRead = false;    // different from eos b/c eos means allDataRead == true && buffer is empty
+
+    private final byte[] fourByteBuffer = new byte[4];
+
+    public CompressionInputStream(final InputStream in) {
+        this.in = in;
+        inflater = new Inflater();
+
+        buffer = new byte[0];
+        compressedBuffer = new byte[0];
+        bufferIndex = 1;
+    }
+
+    private String toHex(final byte[] array) {
+        final StringBuilder sb = new StringBuilder("0x");
+        for (final byte b : array) {
+            final String hex = Integer.toHexString(b).toUpperCase();
+            if (hex.length() == 1) {
+                sb.append("0");
+            }
+            sb.append(hex);
+        }
+        return sb.toString();
+    }
+
+    protected void readChunkHeader() throws IOException {
+        // Ensure that we have a valid SYNC chunk
+        fillBuffer(fourByteBuffer);
+        if (!Arrays.equals(CompressionOutputStream.SYNC_BYTES, fourByteBuffer)) {
+            throw new IOException("Invalid CompressionInputStream. Expected first 4 bytes to be 'SYNC' but were " + toHex(fourByteBuffer));
+        }
+
+        // determine the size of the decompressed buffer
+        fillBuffer(fourByteBuffer);
+        buffer = new byte[toInt(fourByteBuffer)];
+
+        // determine the size of the compressed buffer
+        fillBuffer(fourByteBuffer);
+        compressedBuffer = new byte[toInt(fourByteBuffer)];
+
+        bufferIndex = buffer.length;	// indicate that buffer is empty
+    }
+
+    private int toInt(final byte[] data) {
+        return ((data[0] & 0xFF) << 24)
+                | ((data[1] & 0xFF) << 16)
+                | ((data[2] & 0xFF) << 8)
+                | (data[3] & 0xFF);
+    }
+
+    protected void bufferAndDecompress() throws IOException {
+        if (allDataRead) {
+            eos = true;
+            return;
+        }
+
+        readChunkHeader();
+        fillBuffer(compressedBuffer);
+
+        inflater.setInput(compressedBuffer);
+        try {
+            inflater.inflate(buffer);
+        } catch (final DataFormatException e) {
+            throw new IOException(e);
+        }
+        inflater.reset();
+
+        bufferIndex = 0;
+        final int moreDataByte = in.read();
+        if (moreDataByte < 1) {
+            allDataRead = true;
+        } else if (moreDataByte > 1) {
+            throw new IOException("Expected indicator of whether or not more data was to come (-1, 0, or 1) but got " + moreDataByte);
+        }
+    }
+
+    private void fillBuffer(final byte[] buffer) throws IOException {
+        int len;
+        int bytesLeft = buffer.length;
+        int bytesRead = 0;
+        while (bytesLeft > 0 && (len = in.read(buffer, bytesRead, bytesLeft)) > 0) {
+            bytesLeft -= len;
+            bytesRead += len;
+        }
+
+        if (bytesRead < buffer.length) {
+            throw new EOFException();
+        }
+    }
+
+    private boolean isBufferEmpty() {
+        return bufferIndex >= buffer.length;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (eos) {
+            return -1;
+        }
+
+        if (isBufferEmpty()) {
+            bufferAndDecompress();
+        }
+
+        if (isBufferEmpty()) {
+            eos = true;
+            return -1;
+        }
+
+        return buffer[bufferIndex++];
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws IOException {
+        if (eos) {
+            return -1;
+        }
+
+        if (isBufferEmpty()) {
+            bufferAndDecompress();
+        }
+
+        if (isBufferEmpty()) {
+            eos = true;
+            return -1;
+        }
+
+        final int free = buffer.length - bufferIndex;
+        final int bytesToTransfer = Math.min(len, free);
+        System.arraycopy(buffer, bufferIndex, b, off, bytesToTransfer);
+        bufferIndex += bytesToTransfer;
+
+        return bytesToTransfer;
+    }
+
+    /**
+     * Does nothing. Does NOT close underlying InputStream
+     * @throws java.io.IOException
+     */
+    @Override
+    public void close() throws IOException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
new file mode 100644
index 0000000..bc46b0f
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+
+public class CompressionOutputStream extends OutputStream {
+
+    public static final byte[] SYNC_BYTES = new byte[]{'S', 'Y', 'N', 'C'};
+
+    public static final int DEFAULT_COMPRESSION_LEVEL = 1;
+    public static final int DEFAULT_BUFFER_SIZE = 64 << 10;
+    public static final int MIN_BUFFER_SIZE = 8 << 10;
+
+    private final OutputStream out;
+    private final Deflater deflater;
+
+    private final byte[] buffer;
+    private final byte[] compressed;
+
+    private int bufferIndex = 0;
+    private boolean dataWritten = false;
+
+    public CompressionOutputStream(final OutputStream outStream) {
+        this(outStream, DEFAULT_BUFFER_SIZE);
+    }
+
+    public CompressionOutputStream(final OutputStream outStream, final int bufferSize) {
+        this(outStream, bufferSize, DEFAULT_COMPRESSION_LEVEL, Deflater.DEFAULT_STRATEGY);
+    }
+
+    public CompressionOutputStream(final OutputStream outStream, final int bufferSize, final int level, final int strategy) {
+        if (bufferSize < MIN_BUFFER_SIZE) {
+            throw new IllegalArgumentException("Buffer size must be at least " + MIN_BUFFER_SIZE);
+        }
+
+        this.out = outStream;
+        this.deflater = new Deflater(level);
+        this.deflater.setStrategy(strategy);
+        buffer = new byte[bufferSize];
+        compressed = new byte[bufferSize + 64];
+    }
+
+    /**
+     * Compresses the currently buffered chunk of data and sends it to the
+     * output stream
+     *
+     * @throws IOException
+     */
+    protected void compressAndWrite() throws IOException {
+        if (bufferIndex <= 0) {
+            return;
+        }
+
+        deflater.setInput(buffer, 0, bufferIndex);
+        deflater.finish();
+        final int compressedBytes = deflater.deflate(compressed);
+
+        writeChunkHeader(compressedBytes);
+        out.write(compressed, 0, compressedBytes);
+
+        bufferIndex = 0;
+        deflater.reset();
+    }
+
+    private void writeChunkHeader(final int compressedBytes) throws IOException {
+        // If we have already written data, write out a '1' to indicate that we have more data; when we close
+        // the stream, we instead write a '0' to indicate that we are finished sending data.
+        if (dataWritten) {
+            out.write(1);
+        }
+        out.write(SYNC_BYTES);
+        dataWritten = true;
+
+        writeInt(out, bufferIndex);
+        writeInt(out, compressedBytes);
+    }
+
+    private void writeInt(final OutputStream out, final int val) throws IOException {
+        out.write(val >>> 24);
+        out.write(val >>> 16);
+        out.write(val >>> 8);
+        out.write(val);
+    }
+
+    protected boolean bufferFull() {
+        return bufferIndex >= buffer.length;
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        buffer[bufferIndex++] = (byte) (b & 0xFF);
+        if (bufferFull()) {
+            compressAndWrite();
+        }
+    }
+
+    @Override
+    public void write(final byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        int bytesLeft = len;
+        while (bytesLeft > 0) {
+            final int free = buffer.length - bufferIndex;
+            final int bytesThisIteration = Math.min(bytesLeft, free);
+            System.arraycopy(b, off + len - bytesLeft, buffer, bufferIndex, bytesThisIteration);
+            bufferIndex += bytesThisIteration;
+
+            bytesLeft -= bytesThisIteration;
+            if (bufferFull()) {
+                compressAndWrite();
+            }
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        compressAndWrite();
+        super.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        compressAndWrite();
+        out.write(0);   // indicate that the stream is finished.
+        out.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
new file mode 100644
index 0000000..e03dfbf
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+public class InterruptableInputStream extends InputStream {
+
+    private volatile boolean interrupted = false;
+    private final InputStream in;
+
+    public InterruptableInputStream(final InputStream in) {
+        this.in = in;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.read();
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.read(b);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.read(b, off, len);
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        in.close();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        in.mark(readlimit);
+    }
+
+    @Override
+    public boolean markSupported() {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.markSupported();
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        in.reset();
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        return in.skip(n);
+    }
+
+    public void interrupt() {
+        interrupted = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
new file mode 100644
index 0000000..cba5be6
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+public class InterruptableOutputStream extends OutputStream {
+
+    private final OutputStream out;
+    private volatile boolean interrupted = false;
+
+    public InterruptableOutputStream(final OutputStream out) {
+        this.out = out;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.write(b, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.close();
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (interrupted) {
+            throw new TransmissionDisabledException();
+        }
+
+        out.flush();
+    }
+
+    public void interrupt() {
+        this.interrupted = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
new file mode 100644
index 0000000..68913bd
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BufferStateManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(BufferStateManager.class);
+
+    private ByteBuffer buffer;
+    private Direction direction = Direction.WRITE;
+
+    public BufferStateManager(final ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    public BufferStateManager(final ByteBuffer buffer, final Direction direction) {
+        this.buffer = buffer;
+        this.direction = direction;
+    }
+
+    /**
+     * Ensures that the buffer is at least as big as the size specified,
+     * resizing the buffer if necessary. This operation MAY change the direction
+     * of the buffer.
+     *
+     * @param requiredSize
+     */
+    public void ensureSize(final int requiredSize) {
+        if (buffer.capacity() < requiredSize) {
+            final ByteBuffer newBuffer = ByteBuffer.allocate(requiredSize);
+
+            // we have to read buffer so make sure the direction is correct.
+            if (direction == Direction.WRITE) {
+                buffer.flip();
+            }
+
+            // Copy from buffer to newBuffer
+            newBuffer.put(buffer);
+
+            // Swap the buffers
+            buffer = newBuffer;
+
+            // the new buffer is ready to be written to
+            direction = Direction.WRITE;
+        }
+    }
+
+    public ByteBuffer prepareForWrite(final int requiredSize) {
+        ensureSize(requiredSize);
+
+        if (direction == Direction.READ) {
+            direction = Direction.WRITE;
+            buffer.position(buffer.limit());
+        }
+
+        buffer.limit(buffer.capacity());
+        return buffer;
+    }
+
+    public ByteBuffer prepareForRead(final int requiredSize) {
+        ensureSize(requiredSize);
+
+        if (direction == Direction.WRITE) {
+            direction = Direction.READ;
+            buffer.flip();
+        }
+
+        return buffer;
+    }
+
+    /**
+     * Clears the contents of the buffer and sets direction to WRITE
+     */
+    public void clear() {
+        logger.debug("Clearing {}", buffer);
+        buffer.clear();
+        direction = Direction.WRITE;
+    }
+
+    public void compact() {
+        final String before = buffer.toString();
+        buffer.compact();
+        logger.debug("Before compact: {}, after: {}", before, buffer);
+        direction = Direction.WRITE;
+    }
+
+    public static enum Direction {
+
+        READ, WRITE;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
new file mode 100644
index 0000000..32a3f26
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+public class SocketChannelInputStream extends InputStream {
+
+    private static final long CHANNEL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+    private final SocketChannel channel;
+    private volatile int timeoutMillis = 30000;
+
+    private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
+    private Byte bufferedByte = null;
+
+    public SocketChannelInputStream(final SocketChannel socketChannel) throws IOException {
+        // this class expects a non-blocking channel
+        socketChannel.configureBlocking(false);
+        this.channel = socketChannel;
+    }
+
+    public void setTimeout(final int timeoutMillis) {
+        this.timeoutMillis = timeoutMillis;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (bufferedByte != null) {
+            final int retVal = bufferedByte & 0xFF;
+            bufferedByte = null;
+            return retVal;
+        }
+
+        oneByteBuffer.flip();
+        oneByteBuffer.clear();
+
+        final long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesRead;
+        do {
+            bytesRead = channel.read(oneByteBuffer);
+            if (bytesRead == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out reading from socket");
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
+                }
+            }
+        } while (bytesRead == 0);
+
+        if (bytesRead == -1) {
+            return -1;
+        }
+        oneByteBuffer.flip();
+        return oneByteBuffer.get() & 0xFF;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws IOException {
+        if (bufferedByte != null) {
+            final byte retVal = bufferedByte;
+            bufferedByte = null;
+            b[off] = retVal;
+            return 1;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+
+        final long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesRead;
+        do {
+            bytesRead = channel.read(buffer);
+            if (bytesRead == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out reading from socket");
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
+                }
+            }
+        } while (bytesRead == 0);
+
+        return bytesRead;
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (bufferedByte != null) {
+            return 1;
+        }
+
+        isDataAvailable(); // attempt to read from socket
+        return (bufferedByte == null) ? 0 : 1;
+    }
+
+    public boolean isDataAvailable() throws IOException {
+        if (bufferedByte != null) {
+            return true;
+        }
+
+        oneByteBuffer.flip();
+        oneByteBuffer.clear();
+        final int bytesRead = channel.read(oneByteBuffer);
+        if (bytesRead == -1) {
+            throw new EOFException("Peer has closed the stream");
+        }
+        if (bytesRead > 0) {
+            oneByteBuffer.flip();
+            bufferedByte = oneByteBuffer.get();
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Closes the underlying socket channel.
+     * @throws java.io.IOException
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
new file mode 100644
index 0000000..77049ad
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+public class SocketChannelOutputStream extends OutputStream {
+
+    private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+    private final SocketChannel channel;
+    private volatile int timeout = 30000;
+
+    private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
+
+    public SocketChannelOutputStream(final SocketChannel socketChannel) throws IOException {
+        // this class expects a non-blocking channel
+        socketChannel.configureBlocking(false);
+        this.channel = socketChannel;
+    }
+
+    public void setTimeout(final int timeoutMillis) {
+        this.timeout = timeoutMillis;
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        oneByteBuffer.flip();
+        oneByteBuffer.clear();
+        oneByteBuffer.put((byte) b);
+        oneByteBuffer.flip();
+
+        final int timeoutMillis = this.timeout;
+        long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesWritten;
+        while (oneByteBuffer.hasRemaining()) {
+            bytesWritten = channel.write(oneByteBuffer);
+            if (bytesWritten == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out writing to socket");
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
+                }
+            } else {
+                return;
+            }
+        }
+    }
+
+    @Override
+    public void write(final byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+
+        final int timeoutMillis = this.timeout;
+        long maxTime = System.currentTimeMillis() + timeoutMillis;
+        int bytesWritten;
+        while (buffer.hasRemaining()) {
+            bytesWritten = channel.write(buffer);
+            if (bytesWritten == 0) {
+                if (System.currentTimeMillis() > maxTime) {
+                    throw new SocketTimeoutException("Timed out writing to socket");
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
+                }
+            } else {
+                maxTime = System.currentTimeMillis() + timeoutMillis;
+            }
+        }
+    }
+
+    /**
+     * Closes the underlying SocketChannel
+     * @throws java.io.IOException
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
new file mode 100644
index 0000000..5810488
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.security.cert.CertificateExpiredException;
+import javax.security.cert.CertificateNotYetValidException;
+import javax.security.cert.X509Certificate;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.io.socket.BufferStateManager;
+import org.apache.nifi.remote.io.socket.BufferStateManager.Direction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSLSocketChannel implements Closeable {
+
+    public static final int MAX_WRITE_SIZE = 65536;
+
+    private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class);
+    private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+
+    private final String hostname;
+    private final int port;
+    private final SSLEngine engine;
+    private final SocketAddress socketAddress;
+
+    private BufferStateManager streamInManager;
+    private BufferStateManager streamOutManager;
+    private BufferStateManager appDataManager;
+
+    private SocketChannel channel;
+
+    private final byte[] oneByteBuffer = new byte[1];
+
+    private int timeoutMillis = 30000;
+    private volatile boolean connected = false;
+    private boolean handshaking = false;
+    private boolean closed = false;
+    private volatile boolean interrupted = false;
+
+    public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException {
+        this.socketAddress = new InetSocketAddress(hostname, port);
+        this.channel = SocketChannel.open();
+        this.hostname = hostname;
+        this.port = port;
+        this.engine = sslContext.createSSLEngine();
+        this.engine.setUseClientMode(client);
+        engine.setNeedClientAuth(true);
+
+        streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+    }
+
+    public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException {
+        if (!socketChannel.isConnected()) {
+            throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
+        }
+
+        this.channel = socketChannel;
+
+        this.socketAddress = socketChannel.getRemoteAddress();
+        final Socket socket = socketChannel.socket();
+        this.hostname = socket.getInetAddress().getHostName();
+        this.port = socket.getPort();
+
+        this.engine = sslContext.createSSLEngine();
+        this.engine.setUseClientMode(client);
+        engine.setNeedClientAuth(true);
+
+        streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+    }
+
+    public void setTimeout(final int millis) {
+        this.timeoutMillis = millis;
+    }
+
+    public int getTimeout() {
+        return timeoutMillis;
+    }
+
+    public void connect() throws SSLHandshakeException, IOException {
+        try {
+            channel.configureBlocking(false);
+            if (!channel.isConnected()) {
+                final long startTime = System.currentTimeMillis();
+
+                if (!channel.connect(socketAddress)) {
+                    while (!channel.finishConnect()) {
+                        if (interrupted) {
+                            throw new TransmissionDisabledException();
+                        }
+                        if (System.currentTimeMillis() > startTime + timeoutMillis) {
+                            throw new SocketTimeoutException("Timed out connecting to " + hostname + ":" + port);
+                        }
+
+                        try {
+                            Thread.sleep(50L);
+                        } catch (final InterruptedException e) {
+                        }
+                    }
+                }
+            }
+            engine.beginHandshake();
+
+            performHandshake();
+            logger.debug("{} Successfully completed SSL handshake", this);
+
+            streamInManager.clear();
+            streamOutManager.clear();
+            appDataManager.clear();
+
+            connected = true;
+        } catch (final Exception e) {
+            logger.error("{} Failed to connect due to {}", this, e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+            closeQuietly(channel);
+            engine.closeInbound();
+            engine.closeOutbound();
+            throw e;
+        }
+    }
+
+    public String getDn() throws CertificateExpiredException, CertificateNotYetValidException, SSLPeerUnverifiedException {
+        final X509Certificate[] certs = engine.getSession().getPeerCertificateChain();
+        if (certs == null || certs.length == 0) {
+            throw new SSLPeerUnverifiedException("No certificates found");
+        }
+
+        final X509Certificate cert = certs[0];
+        cert.checkValidity();
+        return cert.getSubjectDN().getName().trim();
+    }
+
+    private void performHandshake() throws IOException {
+        // Generate handshake message
+        final byte[] emptyMessage = new byte[0];
+        handshaking = true;
+        logger.debug("{} Performing Handshake", this);
+
+        try {
+            while (true) {
+                switch (engine.getHandshakeStatus()) {
+                    case FINISHED:
+                        return;
+                    case NEED_WRAP: {
+                        final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+
+                        final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+                        final SSLEngineResult wrapHelloResult = engine.wrap(appDataOut, outboundBuffer);
+                        if (wrapHelloResult.getStatus() == Status.BUFFER_OVERFLOW) {
+                            streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+                            continue;
+                        }
+
+                        if (wrapHelloResult.getStatus() != Status.OK) {
+                            throw new SSLHandshakeException("Could not generate SSL Handshake information: SSLEngineResult: "
+                                    + wrapHelloResult.toString());
+                        }
+
+                        logger.trace("{} Handshake response after wrapping: {}", this, wrapHelloResult);
+
+                        final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
+                        final int bytesToSend = readableStreamOut.remaining();
+                        writeFully(readableStreamOut);
+                        logger.trace("{} Sent {} bytes of wrapped data for handshake", this, bytesToSend);
+
+                        streamOutManager.clear();
+                    }
+                    continue;
+                    case NEED_UNWRAP: {
+                        final ByteBuffer readableDataIn = streamInManager.prepareForRead(0);
+                        final ByteBuffer appData = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+                        // Read handshake response from other side
+                        logger.trace("{} Unwrapping: {} to {}", new Object[]{this, readableDataIn, appData});
+                        SSLEngineResult handshakeResponseResult = engine.unwrap(readableDataIn, appData);
+                        logger.trace("{} Handshake response after unwrapping: {}", this, handshakeResponseResult);
+
+                        if (handshakeResponseResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+                            final ByteBuffer writableDataIn = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+                            final int bytesRead = readData(writableDataIn);
+                            if (bytesRead > 0) {
+                                logger.trace("{} Read {} bytes for handshake", this, bytesRead);
+                            }
+
+                            if (bytesRead < 0) {
+                                throw new SSLHandshakeException("Reached End-of-File marker while performing handshake");
+                            }
+                        } else if (handshakeResponseResult.getStatus() == Status.CLOSED) {
+                            throw new IOException("Channel was closed by peer during handshake");
+                        } else {
+                            streamInManager.compact();
+                            appDataManager.clear();
+                        }
+                    }
+                    break;
+                    case NEED_TASK:
+                        performTasks();
+                        continue;
+                    case NOT_HANDSHAKING:
+                        return;
+                }
+            }
+        } finally {
+            handshaking = false;
+        }
+    }
+
+    private void performTasks() {
+        Runnable runnable;
+        while ((runnable = engine.getDelegatedTask()) != null) {
+            runnable.run();
+        }
+    }
+
+    private void closeQuietly(final Closeable closeable) {
+        try {
+            closeable.close();
+        } catch (final Exception e) {
+        }
+    }
+
+    private int readData(final ByteBuffer dest) throws IOException {
+        final long startTime = System.currentTimeMillis();
+
+        while (true) {
+            if (interrupted) {
+                throw new TransmissionDisabledException();
+            }
+
+            if (dest.remaining() == 0) {
+                return 0;
+            }
+
+            final int readCount = channel.read(dest);
+
+            if (readCount == 0) {
+                if (System.currentTimeMillis() > startTime + timeoutMillis) {
+                    throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port);
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException();
+                }
+
+                continue;
+            }
+
+            logger.trace("{} Read {} bytes", this, readCount);
+            return readCount;
+        }
+    }
+
+    private Status encryptAndWriteFully(final BufferStateManager src) throws IOException {
+        SSLEngineResult result = null;
+
+        final ByteBuffer buff = src.prepareForRead(0);
+        final ByteBuffer outBuff = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+        logger.trace("{} Encrypting {} bytes", this, buff.remaining());
+        while (buff.remaining() > 0) {
+            result = engine.wrap(buff, outBuff);
+            if (result.getStatus() == Status.OK) {
+                final ByteBuffer readableOutBuff = streamOutManager.prepareForRead(0);
+                writeFully(readableOutBuff);
+                streamOutManager.clear();
+            } else {
+                return result.getStatus();
+            }
+        }
+
+        return result.getStatus();
+    }
+
+    private void writeFully(final ByteBuffer src) throws IOException {
+        long lastByteWrittenTime = System.currentTimeMillis();
+
+        int bytesWritten = 0;
+        while (src.hasRemaining()) {
+            if (interrupted) {
+                throw new TransmissionDisabledException();
+            }
+
+            final int written = channel.write(src);
+            bytesWritten += written;
+            final long now = System.currentTimeMillis();
+            if (written > 0) {
+                lastByteWrittenTime = now;
+            } else {
+                if (now > lastByteWrittenTime + timeoutMillis) {
+                    throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port);
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+                } catch (final InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException();
+                }
+            }
+        }
+
+        logger.trace("{} Wrote {} bytes", this, bytesWritten);
+    }
+
+    public boolean isClosed() {
+        if (closed) {
+            return true;
+        }
+        // need to detect if peer has sent closure handshake...if so the answer is true
+        final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+        int readCount = 0;
+        try {
+            readCount = channel.read(writableInBuffer);
+        } catch (IOException e) {
+            logger.error("{} Failed to readData due to {}", new Object[]{this, e});
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+            readCount = -1; // treat the condition same as if End of Stream
+        }
+        if (readCount == 0) {
+            return false;
+        }
+        if (readCount > 0) {
+            logger.trace("{} Read {} bytes", this, readCount);
+
+            final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
+            final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+            try {
+                SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+                logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
+                if (unwrapResponse.getStatus().equals(Status.CLOSED)) {
+                    // Drain the incoming TCP buffer
+                    final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+                    int bytesDiscarded = channel.read(discardBuffer);
+                    while (bytesDiscarded > 0) {
+                        discardBuffer.clear();
+                        bytesDiscarded = channel.read(discardBuffer);
+                    }
+                    engine.closeInbound();
+                } else {
+                    streamInManager.compact();
+                    return false;
+                }
+            } catch (IOException e) {
+                logger.error("{} Failed to check if closed due to {}. Closing channel.", new Object[]{this, e});
+                if (logger.isDebugEnabled()) {
+                    logger.error("", e);
+                }
+            }
+        }
+        // either readCount is -1, indicating an end of stream, or the peer sent a closure handshake
+        // so go ahead and close down the channel
+        closeQuietly(channel.socket());
+        closeQuietly(channel);
+        closed = true;
+        return true;
+    }
+
+    @Override
+    public void close() throws IOException {
+        logger.debug("{} Closing Connection", this);
+        if (channel == null) {
+            return;
+        }
+
+        if (closed) {
+            return;
+        }
+
+        try {
+            engine.closeOutbound();
+
+            final byte[] emptyMessage = new byte[0];
+
+            final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+            final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+            final SSLEngineResult handshakeResult = engine.wrap(appDataOut, outboundBuffer);
+
+            if (handshakeResult.getStatus() != Status.CLOSED) {
+                throw new IOException("Invalid close state - will not send network data");
+            }
+
+            final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
+            writeFully(readableStreamOut);
+        } finally {
+            // Drain the incoming TCP buffer
+            final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+            try {
+                int bytesDiscarded = channel.read(discardBuffer);
+                while (bytesDiscarded > 0) {
+                    discardBuffer.clear();
+                    bytesDiscarded = channel.read(discardBuffer);
+                }
+            } catch (Exception e) {
+            }
+
+            closeQuietly(channel.socket());
+            closeQuietly(channel);
+            closed = true;
+        }
+    }
+
+    private int copyFromAppDataBuffer(final byte[] buffer, final int offset, final int len) {
+        // If any data already exists in the application data buffer, copy it to the buffer.
+        final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+
+        final int appDataRemaining = appDataBuffer.remaining();
+        if (appDataRemaining > 0) {
+            final int bytesToCopy = Math.min(len, appDataBuffer.remaining());
+            appDataBuffer.get(buffer, offset, bytesToCopy);
+
+            final int bytesCopied = appDataRemaining - appDataBuffer.remaining();
+            logger.trace("{} Copied {} ({}) bytes from unencrypted application buffer to user space",
+                    new Object[]{this, bytesToCopy, bytesCopied});
+            return bytesCopied;
+        }
+        return 0;
+    }
+
+    public int available() throws IOException {
+        ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+        ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+        final int buffered = appDataBuffer.remaining() + streamDataBuffer.remaining();
+        if (buffered > 0) {
+            return buffered;
+        }
+
+        final boolean wasAbleToRead = isDataAvailable();
+        if (!wasAbleToRead) {
+            return 0;
+        }
+
+        appDataBuffer = appDataManager.prepareForRead(1);
+        streamDataBuffer = streamInManager.prepareForRead(1);
+        return appDataBuffer.remaining() + streamDataBuffer.remaining();
+    }
+
+    public boolean isDataAvailable() throws IOException {
+        final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+        final ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+
+        if (appDataBuffer.remaining() > 0 || streamDataBuffer.remaining() > 0) {
+            return true;
+        }
+
+        final ByteBuffer writableBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+        final int bytesRead = channel.read(writableBuffer);
+        return (bytesRead > 0);
+    }
+
+    public int read() throws IOException {
+        final int bytesRead = read(oneByteBuffer);
+        if (bytesRead == -1) {
+            return -1;
+        }
+        return oneByteBuffer[0] & 0xFF;
+    }
+
+    public int read(final byte[] buffer) throws IOException {
+        return read(buffer, 0, buffer.length);
+    }
+
+    public int read(final byte[] buffer, final int offset, final int len) throws IOException {
+        logger.debug("{} Reading up to {} bytes of data", this, len);
+
+        if (!connected) {
+            connect();
+        }
+
+        int copied = copyFromAppDataBuffer(buffer, offset, len);
+        if (copied > 0) {
+            return copied;
+        }
+
+        appDataManager.clear();
+
+        while (true) {
+            // prepare buffers and call unwrap
+            final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
+            SSLEngineResult unwrapResponse = null;
+            final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+            unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+            logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
+
+            switch (unwrapResponse.getStatus()) {
+                case BUFFER_OVERFLOW:
+                    throw new SSLHandshakeException("Buffer Overflow, which is not allowed to happen from an unwrap");
+                case BUFFER_UNDERFLOW: {
+//                appDataManager.prepareForRead(engine.getSession().getApplicationBufferSize());
+
+                    final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+                    final int bytesRead = readData(writableInBuffer);
+                    if (bytesRead < 0) {
+                        return -1;
+                    }
+
+                    continue;
+                }
+                case CLOSED:
+                    throw new IOException("Channel is closed");
+                case OK: {
+                    copied = copyFromAppDataBuffer(buffer, offset, len);
+                    if (copied == 0) {
+                        throw new IOException("Failed to decrypt data");
+                    }
+                    streamInManager.compact();
+                    return copied;
+                }
+            }
+        }
+    }
+
+    public void write(final int data) throws IOException {
+        write(new byte[]{(byte) data}, 0, 1);
+    }
+
+    public void write(final byte[] data) throws IOException {
+        write(data, 0, data.length);
+    }
+
+    public void write(final byte[] data, final int offset, final int len) throws IOException {
+        logger.debug("{} Writing {} bytes of data", this, len);
+
+        if (!connected) {
+            connect();
+        }
+
+        int iterations = len / MAX_WRITE_SIZE;
+        if (len % MAX_WRITE_SIZE > 0) {
+            iterations++;
+        }
+
+        for (int i = 0; i < iterations; i++) {
+            streamOutManager.clear();
+            final int itrOffset = offset + i * MAX_WRITE_SIZE;
+            final int itrLen = Math.min(len - itrOffset, MAX_WRITE_SIZE);
+            final ByteBuffer byteBuffer = ByteBuffer.wrap(data, itrOffset, itrLen);
+
+            final BufferStateManager buffMan = new BufferStateManager(byteBuffer, Direction.READ);
+            final Status status = encryptAndWriteFully(buffMan);
+            switch (status) {
+                case BUFFER_OVERFLOW:
+                    streamOutManager.ensureSize(engine.getSession().getPacketBufferSize());
+                    appDataManager.ensureSize(engine.getSession().getApplicationBufferSize());
+                    continue;
+                case OK:
+                    continue;
+                case CLOSED:
+                    throw new IOException("Channel is closed");
+                case BUFFER_UNDERFLOW:
+                    throw new AssertionError("Got Buffer Underflow but should not have...");
+            }
+        }
+    }
+
+    public void interrupt() {
+        this.interrupted = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
new file mode 100644
index 0000000..154bd08
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SSLSocketChannelInputStream extends InputStream {
+
+    private final SSLSocketChannel channel;
+
+    public SSLSocketChannelInputStream(final SSLSocketChannel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public int read() throws IOException {
+        return channel.read();
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        return channel.read(b);
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws IOException {
+        return channel.read(b, off, len);
+    }
+
+    /**
+     * Closes the underlying SSLSocketChannel, which will also close the
+     * OutputStream and connection
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+
+    @Override
+    public int available() throws IOException {
+        return channel.available();
+    }
+
+    public boolean isDataAvailable() throws IOException {
+        return available() > 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
new file mode 100644
index 0000000..ce4e420
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class SSLSocketChannelOutputStream extends OutputStream {
+
+    private final SSLSocketChannel channel;
+
+    public SSLSocketChannelOutputStream(final SSLSocketChannel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        channel.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        channel.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        channel.write(b, off, len);
+    }
+
+    /**
+     * Closes the underlying SSLSocketChannel, which also will close the
+     * InputStream and the connection
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
new file mode 100644
index 0000000..aaf37ea
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.InputStream;
+
+/**
+ * This class is a slight modification of the BufferedInputStream in the java.io
+ * package. The modification is that this implementation does not provide
+ * synchronization on method calls, which means that this class is not suitable
+ * for use by multiple threads. However, the absence of these synchronized
+ * blocks results in potentially much better performance.
+ */
+public class BufferedInputStream extends java.io.BufferedInputStream {
+
+    public BufferedInputStream(final InputStream in) {
+        super(in);
+    }
+
+    public BufferedInputStream(final InputStream in, final int size) {
+        super(in, size);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
new file mode 100644
index 0000000..eadfcab
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * This class is a slight modification of the
+ * {@link java.io.BufferedOutputStream} class. This implementation differs in
+ * that it does not mark methods as synchronized. This means that this class is
+ * not suitable for writing by multiple concurrent threads. However, the removal
+ * of the synchronized keyword results in potentially much better performance.
+ */
+public class BufferedOutputStream extends FilterOutputStream {
+
+    /**
+     * The internal buffer where data is stored.
+     */
+    protected byte buf[];
+
+    /**
+     * The number of valid bytes in the buffer. This value is always in the
+     * range <tt>0</tt> through <tt>buf.length</tt>; elements
+     * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte data.
+     */
+    protected int count;
+
+    /**
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream.
+     *
+     * @param out the underlying output stream.
+     */
+    public BufferedOutputStream(OutputStream out) {
+        this(out, 8192);
+    }
+
+    /**
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream with the specified buffer size.
+     *
+     * @param out the underlying output stream.
+     * @param size the buffer size.
+     * @exception IllegalArgumentException if size &lt;= 0.
+     */
+    public BufferedOutputStream(OutputStream out, int size) {
+        super(out);
+        if (size <= 0) {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buf = new byte[size];
+    }
+
+    /**
+     * Flush the internal buffer
+     */
+    private void flushBuffer() throws IOException {
+        if (count > 0) {
+            out.write(buf, 0, count);
+            count = 0;
+        }
+    }
+
+    /**
+     * Writes the specified byte to this buffered output stream.
+     *
+     * @param b the byte to be written.
+     * @exception IOException if an I/O error occurs.
+     */
+    @Override
+    public void write(int b) throws IOException {
+        if (count >= buf.length) {
+            flushBuffer();
+        }
+        buf[count++] = (byte) b;
+    }
+
+    /**
+     * Writes <code>len</code> bytes from the specified byte array starting at
+     * offset <code>off</code> to this buffered output stream.
+     *
+     * <p>
+     * Ordinarily this method stores bytes from the given array into this
+     * stream's buffer, flushing the buffer to the underlying output stream as
+     * needed. If the requested length is at least as large as this stream's
+     * buffer, however, then this method will flush the buffer and write the
+     * bytes directly to the underlying output stream. Thus redundant
+     * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
+     *
+     * @param b the data.
+     * @param off the start offset in the data.
+     * @param len the number of bytes to write.
+     * @exception IOException if an I/O error occurs.
+     */
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+        if (len >= buf.length) {
+            /* If the request length exceeds the size of the output buffer,
+             flush the output buffer and then write the data directly.
+             In this way buffered streams will cascade harmlessly. */
+            flushBuffer();
+            out.write(b, off, len);
+            return;
+        }
+        if (len >= buf.length - count) {
+            flushBuffer();
+        }
+        System.arraycopy(b, off, buf, count, len);
+        count += len;
+    }
+
+    /**
+     * Flushes this buffered output stream. This forces any buffered output
+     * bytes to be written out to the underlying output stream.
+     *
+     * @exception IOException if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    @Override
+    public void flush() throws IOException {
+        flushBuffer();
+        out.flush();
+    }
+}