You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/22 18:04:27 UTC

[38/79] [abbrv] [partial] incubator-nifi git commit: NIFI-270 Made all changes identified by adam, mark, joey to prep for a cleaner build

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
deleted file mode 100644
index 5810488..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
+++ /dev/null
@@ -1,602 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.SSLHandshakeException;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.security.cert.CertificateExpiredException;
-import javax.security.cert.CertificateNotYetValidException;
-import javax.security.cert.X509Certificate;
-
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-import org.apache.nifi.remote.io.socket.BufferStateManager;
-import org.apache.nifi.remote.io.socket.BufferStateManager.Direction;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SSLSocketChannel implements Closeable {
-
-    public static final int MAX_WRITE_SIZE = 65536;
-
-    private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class);
-    private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
-
-    private final String hostname;
-    private final int port;
-    private final SSLEngine engine;
-    private final SocketAddress socketAddress;
-
-    private BufferStateManager streamInManager;
-    private BufferStateManager streamOutManager;
-    private BufferStateManager appDataManager;
-
-    private SocketChannel channel;
-
-    private final byte[] oneByteBuffer = new byte[1];
-
-    private int timeoutMillis = 30000;
-    private volatile boolean connected = false;
-    private boolean handshaking = false;
-    private boolean closed = false;
-    private volatile boolean interrupted = false;
-
-    public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException {
-        this.socketAddress = new InetSocketAddress(hostname, port);
-        this.channel = SocketChannel.open();
-        this.hostname = hostname;
-        this.port = port;
-        this.engine = sslContext.createSSLEngine();
-        this.engine.setUseClientMode(client);
-        engine.setNeedClientAuth(true);
-
-        streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
-        streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
-        appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
-    }
-
-    public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException {
-        if (!socketChannel.isConnected()) {
-            throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
-        }
-
-        this.channel = socketChannel;
-
-        this.socketAddress = socketChannel.getRemoteAddress();
-        final Socket socket = socketChannel.socket();
-        this.hostname = socket.getInetAddress().getHostName();
-        this.port = socket.getPort();
-
-        this.engine = sslContext.createSSLEngine();
-        this.engine.setUseClientMode(client);
-        engine.setNeedClientAuth(true);
-
-        streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
-        streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
-        appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
-    }
-
-    public void setTimeout(final int millis) {
-        this.timeoutMillis = millis;
-    }
-
-    public int getTimeout() {
-        return timeoutMillis;
-    }
-
-    public void connect() throws SSLHandshakeException, IOException {
-        try {
-            channel.configureBlocking(false);
-            if (!channel.isConnected()) {
-                final long startTime = System.currentTimeMillis();
-
-                if (!channel.connect(socketAddress)) {
-                    while (!channel.finishConnect()) {
-                        if (interrupted) {
-                            throw new TransmissionDisabledException();
-                        }
-                        if (System.currentTimeMillis() > startTime + timeoutMillis) {
-                            throw new SocketTimeoutException("Timed out connecting to " + hostname + ":" + port);
-                        }
-
-                        try {
-                            Thread.sleep(50L);
-                        } catch (final InterruptedException e) {
-                        }
-                    }
-                }
-            }
-            engine.beginHandshake();
-
-            performHandshake();
-            logger.debug("{} Successfully completed SSL handshake", this);
-
-            streamInManager.clear();
-            streamOutManager.clear();
-            appDataManager.clear();
-
-            connected = true;
-        } catch (final Exception e) {
-            logger.error("{} Failed to connect due to {}", this, e);
-            if (logger.isDebugEnabled()) {
-                logger.error("", e);
-            }
-            closeQuietly(channel);
-            engine.closeInbound();
-            engine.closeOutbound();
-            throw e;
-        }
-    }
-
-    public String getDn() throws CertificateExpiredException, CertificateNotYetValidException, SSLPeerUnverifiedException {
-        final X509Certificate[] certs = engine.getSession().getPeerCertificateChain();
-        if (certs == null || certs.length == 0) {
-            throw new SSLPeerUnverifiedException("No certificates found");
-        }
-
-        final X509Certificate cert = certs[0];
-        cert.checkValidity();
-        return cert.getSubjectDN().getName().trim();
-    }
-
-    private void performHandshake() throws IOException {
-        // Generate handshake message
-        final byte[] emptyMessage = new byte[0];
-        handshaking = true;
-        logger.debug("{} Performing Handshake", this);
-
-        try {
-            while (true) {
-                switch (engine.getHandshakeStatus()) {
-                    case FINISHED:
-                        return;
-                    case NEED_WRAP: {
-                        final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
-
-                        final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
-
-                        final SSLEngineResult wrapHelloResult = engine.wrap(appDataOut, outboundBuffer);
-                        if (wrapHelloResult.getStatus() == Status.BUFFER_OVERFLOW) {
-                            streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
-                            continue;
-                        }
-
-                        if (wrapHelloResult.getStatus() != Status.OK) {
-                            throw new SSLHandshakeException("Could not generate SSL Handshake information: SSLEngineResult: "
-                                    + wrapHelloResult.toString());
-                        }
-
-                        logger.trace("{} Handshake response after wrapping: {}", this, wrapHelloResult);
-
-                        final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
-                        final int bytesToSend = readableStreamOut.remaining();
-                        writeFully(readableStreamOut);
-                        logger.trace("{} Sent {} bytes of wrapped data for handshake", this, bytesToSend);
-
-                        streamOutManager.clear();
-                    }
-                    continue;
-                    case NEED_UNWRAP: {
-                        final ByteBuffer readableDataIn = streamInManager.prepareForRead(0);
-                        final ByteBuffer appData = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
-
-                        // Read handshake response from other side
-                        logger.trace("{} Unwrapping: {} to {}", new Object[]{this, readableDataIn, appData});
-                        SSLEngineResult handshakeResponseResult = engine.unwrap(readableDataIn, appData);
-                        logger.trace("{} Handshake response after unwrapping: {}", this, handshakeResponseResult);
-
-                        if (handshakeResponseResult.getStatus() == Status.BUFFER_UNDERFLOW) {
-                            final ByteBuffer writableDataIn = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
-                            final int bytesRead = readData(writableDataIn);
-                            if (bytesRead > 0) {
-                                logger.trace("{} Read {} bytes for handshake", this, bytesRead);
-                            }
-
-                            if (bytesRead < 0) {
-                                throw new SSLHandshakeException("Reached End-of-File marker while performing handshake");
-                            }
-                        } else if (handshakeResponseResult.getStatus() == Status.CLOSED) {
-                            throw new IOException("Channel was closed by peer during handshake");
-                        } else {
-                            streamInManager.compact();
-                            appDataManager.clear();
-                        }
-                    }
-                    break;
-                    case NEED_TASK:
-                        performTasks();
-                        continue;
-                    case NOT_HANDSHAKING:
-                        return;
-                }
-            }
-        } finally {
-            handshaking = false;
-        }
-    }
-
-    private void performTasks() {
-        Runnable runnable;
-        while ((runnable = engine.getDelegatedTask()) != null) {
-            runnable.run();
-        }
-    }
-
-    private void closeQuietly(final Closeable closeable) {
-        try {
-            closeable.close();
-        } catch (final Exception e) {
-        }
-    }
-
-    private int readData(final ByteBuffer dest) throws IOException {
-        final long startTime = System.currentTimeMillis();
-
-        while (true) {
-            if (interrupted) {
-                throw new TransmissionDisabledException();
-            }
-
-            if (dest.remaining() == 0) {
-                return 0;
-            }
-
-            final int readCount = channel.read(dest);
-
-            if (readCount == 0) {
-                if (System.currentTimeMillis() > startTime + timeoutMillis) {
-                    throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port);
-                }
-                try {
-                    TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
-                } catch (InterruptedException e) {
-                    close();
-                    Thread.currentThread().interrupt(); // set the interrupt status
-                    throw new ClosedByInterruptException();
-                }
-
-                continue;
-            }
-
-            logger.trace("{} Read {} bytes", this, readCount);
-            return readCount;
-        }
-    }
-
-    private Status encryptAndWriteFully(final BufferStateManager src) throws IOException {
-        SSLEngineResult result = null;
-
-        final ByteBuffer buff = src.prepareForRead(0);
-        final ByteBuffer outBuff = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
-
-        logger.trace("{} Encrypting {} bytes", this, buff.remaining());
-        while (buff.remaining() > 0) {
-            result = engine.wrap(buff, outBuff);
-            if (result.getStatus() == Status.OK) {
-                final ByteBuffer readableOutBuff = streamOutManager.prepareForRead(0);
-                writeFully(readableOutBuff);
-                streamOutManager.clear();
-            } else {
-                return result.getStatus();
-            }
-        }
-
-        return result.getStatus();
-    }
-
-    private void writeFully(final ByteBuffer src) throws IOException {
-        long lastByteWrittenTime = System.currentTimeMillis();
-
-        int bytesWritten = 0;
-        while (src.hasRemaining()) {
-            if (interrupted) {
-                throw new TransmissionDisabledException();
-            }
-
-            final int written = channel.write(src);
-            bytesWritten += written;
-            final long now = System.currentTimeMillis();
-            if (written > 0) {
-                lastByteWrittenTime = now;
-            } else {
-                if (now > lastByteWrittenTime + timeoutMillis) {
-                    throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port);
-                }
-                try {
-                    TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
-                } catch (final InterruptedException e) {
-                    close();
-                    Thread.currentThread().interrupt(); // set the interrupt status
-                    throw new ClosedByInterruptException();
-                }
-            }
-        }
-
-        logger.trace("{} Wrote {} bytes", this, bytesWritten);
-    }
-
-    public boolean isClosed() {
-        if (closed) {
-            return true;
-        }
-        // need to detect if peer has sent closure handshake...if so the answer is true
-        final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
-        int readCount = 0;
-        try {
-            readCount = channel.read(writableInBuffer);
-        } catch (IOException e) {
-            logger.error("{} Failed to readData due to {}", new Object[]{this, e});
-            if (logger.isDebugEnabled()) {
-                logger.error("", e);
-            }
-            readCount = -1; // treat the condition same as if End of Stream
-        }
-        if (readCount == 0) {
-            return false;
-        }
-        if (readCount > 0) {
-            logger.trace("{} Read {} bytes", this, readCount);
-
-            final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
-            final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
-            try {
-                SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
-                logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
-                if (unwrapResponse.getStatus().equals(Status.CLOSED)) {
-                    // Drain the incoming TCP buffer
-                    final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
-                    int bytesDiscarded = channel.read(discardBuffer);
-                    while (bytesDiscarded > 0) {
-                        discardBuffer.clear();
-                        bytesDiscarded = channel.read(discardBuffer);
-                    }
-                    engine.closeInbound();
-                } else {
-                    streamInManager.compact();
-                    return false;
-                }
-            } catch (IOException e) {
-                logger.error("{} Failed to check if closed due to {}. Closing channel.", new Object[]{this, e});
-                if (logger.isDebugEnabled()) {
-                    logger.error("", e);
-                }
-            }
-        }
-        // either readCount is -1, indicating an end of stream, or the peer sent a closure handshake
-        // so go ahead and close down the channel
-        closeQuietly(channel.socket());
-        closeQuietly(channel);
-        closed = true;
-        return true;
-    }
-
-    @Override
-    public void close() throws IOException {
-        logger.debug("{} Closing Connection", this);
-        if (channel == null) {
-            return;
-        }
-
-        if (closed) {
-            return;
-        }
-
-        try {
-            engine.closeOutbound();
-
-            final byte[] emptyMessage = new byte[0];
-
-            final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
-            final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
-            final SSLEngineResult handshakeResult = engine.wrap(appDataOut, outboundBuffer);
-
-            if (handshakeResult.getStatus() != Status.CLOSED) {
-                throw new IOException("Invalid close state - will not send network data");
-            }
-
-            final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
-            writeFully(readableStreamOut);
-        } finally {
-            // Drain the incoming TCP buffer
-            final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
-            try {
-                int bytesDiscarded = channel.read(discardBuffer);
-                while (bytesDiscarded > 0) {
-                    discardBuffer.clear();
-                    bytesDiscarded = channel.read(discardBuffer);
-                }
-            } catch (Exception e) {
-            }
-
-            closeQuietly(channel.socket());
-            closeQuietly(channel);
-            closed = true;
-        }
-    }
-
-    private int copyFromAppDataBuffer(final byte[] buffer, final int offset, final int len) {
-        // If any data already exists in the application data buffer, copy it to the buffer.
-        final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
-
-        final int appDataRemaining = appDataBuffer.remaining();
-        if (appDataRemaining > 0) {
-            final int bytesToCopy = Math.min(len, appDataBuffer.remaining());
-            appDataBuffer.get(buffer, offset, bytesToCopy);
-
-            final int bytesCopied = appDataRemaining - appDataBuffer.remaining();
-            logger.trace("{} Copied {} ({}) bytes from unencrypted application buffer to user space",
-                    new Object[]{this, bytesToCopy, bytesCopied});
-            return bytesCopied;
-        }
-        return 0;
-    }
-
-    public int available() throws IOException {
-        ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
-        ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
-        final int buffered = appDataBuffer.remaining() + streamDataBuffer.remaining();
-        if (buffered > 0) {
-            return buffered;
-        }
-
-        final boolean wasAbleToRead = isDataAvailable();
-        if (!wasAbleToRead) {
-            return 0;
-        }
-
-        appDataBuffer = appDataManager.prepareForRead(1);
-        streamDataBuffer = streamInManager.prepareForRead(1);
-        return appDataBuffer.remaining() + streamDataBuffer.remaining();
-    }
-
-    public boolean isDataAvailable() throws IOException {
-        final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
-        final ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
-
-        if (appDataBuffer.remaining() > 0 || streamDataBuffer.remaining() > 0) {
-            return true;
-        }
-
-        final ByteBuffer writableBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
-        final int bytesRead = channel.read(writableBuffer);
-        return (bytesRead > 0);
-    }
-
-    public int read() throws IOException {
-        final int bytesRead = read(oneByteBuffer);
-        if (bytesRead == -1) {
-            return -1;
-        }
-        return oneByteBuffer[0] & 0xFF;
-    }
-
-    public int read(final byte[] buffer) throws IOException {
-        return read(buffer, 0, buffer.length);
-    }
-
-    public int read(final byte[] buffer, final int offset, final int len) throws IOException {
-        logger.debug("{} Reading up to {} bytes of data", this, len);
-
-        if (!connected) {
-            connect();
-        }
-
-        int copied = copyFromAppDataBuffer(buffer, offset, len);
-        if (copied > 0) {
-            return copied;
-        }
-
-        appDataManager.clear();
-
-        while (true) {
-            // prepare buffers and call unwrap
-            final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
-            SSLEngineResult unwrapResponse = null;
-            final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
-            unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
-            logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
-
-            switch (unwrapResponse.getStatus()) {
-                case BUFFER_OVERFLOW:
-                    throw new SSLHandshakeException("Buffer Overflow, which is not allowed to happen from an unwrap");
-                case BUFFER_UNDERFLOW: {
-//                appDataManager.prepareForRead(engine.getSession().getApplicationBufferSize());
-
-                    final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
-                    final int bytesRead = readData(writableInBuffer);
-                    if (bytesRead < 0) {
-                        return -1;
-                    }
-
-                    continue;
-                }
-                case CLOSED:
-                    throw new IOException("Channel is closed");
-                case OK: {
-                    copied = copyFromAppDataBuffer(buffer, offset, len);
-                    if (copied == 0) {
-                        throw new IOException("Failed to decrypt data");
-                    }
-                    streamInManager.compact();
-                    return copied;
-                }
-            }
-        }
-    }
-
-    public void write(final int data) throws IOException {
-        write(new byte[]{(byte) data}, 0, 1);
-    }
-
-    public void write(final byte[] data) throws IOException {
-        write(data, 0, data.length);
-    }
-
-    public void write(final byte[] data, final int offset, final int len) throws IOException {
-        logger.debug("{} Writing {} bytes of data", this, len);
-
-        if (!connected) {
-            connect();
-        }
-
-        int iterations = len / MAX_WRITE_SIZE;
-        if (len % MAX_WRITE_SIZE > 0) {
-            iterations++;
-        }
-
-        for (int i = 0; i < iterations; i++) {
-            streamOutManager.clear();
-            final int itrOffset = offset + i * MAX_WRITE_SIZE;
-            final int itrLen = Math.min(len - itrOffset, MAX_WRITE_SIZE);
-            final ByteBuffer byteBuffer = ByteBuffer.wrap(data, itrOffset, itrLen);
-
-            final BufferStateManager buffMan = new BufferStateManager(byteBuffer, Direction.READ);
-            final Status status = encryptAndWriteFully(buffMan);
-            switch (status) {
-                case BUFFER_OVERFLOW:
-                    streamOutManager.ensureSize(engine.getSession().getPacketBufferSize());
-                    appDataManager.ensureSize(engine.getSession().getApplicationBufferSize());
-                    continue;
-                case OK:
-                    continue;
-                case CLOSED:
-                    throw new IOException("Channel is closed");
-                case BUFFER_UNDERFLOW:
-                    throw new AssertionError("Got Buffer Underflow but should not have...");
-            }
-        }
-    }
-
-    public void interrupt() {
-        this.interrupted = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
deleted file mode 100644
index 154bd08..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public class SSLSocketChannelInputStream extends InputStream {
-
-    private final SSLSocketChannel channel;
-
-    public SSLSocketChannelInputStream(final SSLSocketChannel channel) {
-        this.channel = channel;
-    }
-
-    @Override
-    public int read() throws IOException {
-        return channel.read();
-    }
-
-    @Override
-    public int read(final byte[] b) throws IOException {
-        return channel.read(b);
-    }
-
-    @Override
-    public int read(final byte[] b, final int off, final int len) throws IOException {
-        return channel.read(b, off, len);
-    }
-
-    /**
-     * Closes the underlying SSLSocketChannel, which will also close the
-     * OutputStream and connection
-     */
-    @Override
-    public void close() throws IOException {
-        channel.close();
-    }
-
-    @Override
-    public int available() throws IOException {
-        return channel.available();
-    }
-
-    public boolean isDataAvailable() throws IOException {
-        return available() > 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
deleted file mode 100644
index ce4e420..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class SSLSocketChannelOutputStream extends OutputStream {
-
-    private final SSLSocketChannel channel;
-
-    public SSLSocketChannelOutputStream(final SSLSocketChannel channel) {
-        this.channel = channel;
-    }
-
-    @Override
-    public void write(final int b) throws IOException {
-        channel.write(b);
-    }
-
-    @Override
-    public void write(byte[] b) throws IOException {
-        channel.write(b);
-    }
-
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-        channel.write(b, off, len);
-    }
-
-    /**
-     * Closes the underlying SSLSocketChannel, which also will close the
-     * InputStream and the connection
-     */
-    @Override
-    public void close() throws IOException {
-        channel.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
deleted file mode 100644
index aaf37ea..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.InputStream;
-
-/**
- * This class is a slight modification of the BufferedInputStream in the java.io
- * package. The modification is that this implementation does not provide
- * synchronization on method calls, which means that this class is not suitable
- * for use by multiple threads. However, the absence of these synchronized
- * blocks results in potentially much better performance.
- */
-public class BufferedInputStream extends java.io.BufferedInputStream {
-
-    public BufferedInputStream(final InputStream in) {
-        super(in);
-    }
-
-    public BufferedInputStream(final InputStream in, final int size) {
-        super(in, size);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
deleted file mode 100644
index eadfcab..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * This class is a slight modification of the
- * {@link java.io.BufferedOutputStream} class. This implementation differs in
- * that it does not mark methods as synchronized. This means that this class is
- * not suitable for writing by multiple concurrent threads. However, the removal
- * of the synchronized keyword results in potentially much better performance.
- */
-public class BufferedOutputStream extends FilterOutputStream {
-
-    /**
-     * The internal buffer where data is stored.
-     */
-    protected byte buf[];
-
-    /**
-     * The number of valid bytes in the buffer. This value is always in the
-     * range <tt>0</tt> through <tt>buf.length</tt>; elements
-     * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte data.
-     */
-    protected int count;
-
-    /**
-     * Creates a new buffered output stream to write data to the specified
-     * underlying output stream.
-     *
-     * @param out the underlying output stream.
-     */
-    public BufferedOutputStream(OutputStream out) {
-        this(out, 8192);
-    }
-
-    /**
-     * Creates a new buffered output stream to write data to the specified
-     * underlying output stream with the specified buffer size.
-     *
-     * @param out the underlying output stream.
-     * @param size the buffer size.
-     * @exception IllegalArgumentException if size &lt;= 0.
-     */
-    public BufferedOutputStream(OutputStream out, int size) {
-        super(out);
-        if (size <= 0) {
-            throw new IllegalArgumentException("Buffer size <= 0");
-        }
-        buf = new byte[size];
-    }
-
-    /**
-     * Flush the internal buffer
-     */
-    private void flushBuffer() throws IOException {
-        if (count > 0) {
-            out.write(buf, 0, count);
-            count = 0;
-        }
-    }
-
-    /**
-     * Writes the specified byte to this buffered output stream.
-     *
-     * @param b the byte to be written.
-     * @exception IOException if an I/O error occurs.
-     */
-    @Override
-    public void write(int b) throws IOException {
-        if (count >= buf.length) {
-            flushBuffer();
-        }
-        buf[count++] = (byte) b;
-    }
-
-    /**
-     * Writes <code>len</code> bytes from the specified byte array starting at
-     * offset <code>off</code> to this buffered output stream.
-     *
-     * <p>
-     * Ordinarily this method stores bytes from the given array into this
-     * stream's buffer, flushing the buffer to the underlying output stream as
-     * needed. If the requested length is at least as large as this stream's
-     * buffer, however, then this method will flush the buffer and write the
-     * bytes directly to the underlying output stream. Thus redundant
-     * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
-     *
-     * @param b the data.
-     * @param off the start offset in the data.
-     * @param len the number of bytes to write.
-     * @exception IOException if an I/O error occurs.
-     */
-    @Override
-    public void write(byte b[], int off, int len) throws IOException {
-        if (len >= buf.length) {
-            /* If the request length exceeds the size of the output buffer,
-             flush the output buffer and then write the data directly.
-             In this way buffered streams will cascade harmlessly. */
-            flushBuffer();
-            out.write(b, off, len);
-            return;
-        }
-        if (len >= buf.length - count) {
-            flushBuffer();
-        }
-        System.arraycopy(b, off, buf, count, len);
-        count += len;
-    }
-
-    /**
-     * Flushes this buffered output stream. This forces any buffered output
-     * bytes to be written out to the underlying output stream.
-     *
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     */
-    @Override
-    public void flush() throws IOException {
-        flushBuffer();
-        out.flush();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
deleted file mode 100644
index 284cd54..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.InputStream;
-
-/**
- * This class performs the same function as java.io.ByteArrayInputStream but
- * does not mark its methods as synchronized
- */
-public class ByteArrayInputStream extends InputStream {
-
-    /**
-     * An array of bytes that was provided by the creator of the stream.
-     * Elements <code>buf[0]</code> through <code>buf[count-1]</code> are the
-     * only bytes that can ever be read from the stream; element
-     * <code>buf[pos]</code> is the next byte to be read.
-     */
-    protected byte buf[];
-
-    /**
-     * The index of the next character to read from the input stream buffer.
-     * This value should always be nonnegative and not larger than the value of
-     * <code>count</code>. The next byte to be read from the input stream buffer
-     * will be <code>buf[pos]</code>.
-     */
-    protected int pos;
-
-    /**
-     * The currently marked position in the stream. ByteArrayInputStream objects
-     * are marked at position zero by default when constructed. They may be
-     * marked at another position within the buffer by the <code>mark()</code>
-     * method. The current buffer position is set to this point by the
-     * <code>reset()</code> method.
-     * <p>
-     * If no mark has been set, then the value of mark is the offset passed to
-     * the constructor (or 0 if the offset was not supplied).
-     *
-     * @since JDK1.1
-     */
-    protected int mark = 0;
-
-    /**
-     * The index one greater than the last valid character in the input stream
-     * buffer. This value should always be nonnegative and not larger than the
-     * length of <code>buf</code>. It is one greater than the position of the
-     * last byte within <code>buf</code> that can ever be read from the input
-     * stream buffer.
-     */
-    protected int count;
-
-    /**
-     * Creates a <code>ByteArrayInputStream</code> so that it uses
-     * <code>buf</code> as its buffer array. The buffer array is not copied. The
-     * initial value of <code>pos</code> is <code>0</code> and the initial value
-     * of  <code>count</code> is the length of <code>buf</code>.
-     *
-     * @param buf the input buffer.
-     */
-    public ByteArrayInputStream(byte buf[]) {
-        this.buf = buf;
-        this.pos = 0;
-        this.count = buf.length;
-    }
-
-    /**
-     * Creates <code>ByteArrayInputStream</code> that uses <code>buf</code> as
-     * its buffer array. The initial value of <code>pos</code> is
-     * <code>offset</code> and the initial value of <code>count</code> is the
-     * minimum of <code>offset+length</code> and <code>buf.length</code>. The
-     * buffer array is not copied. The buffer's mark is set to the specified
-     * offset.
-     *
-     * @param buf the input buffer.
-     * @param offset the offset in the buffer of the first byte to read.
-     * @param length the maximum number of bytes to read from the buffer.
-     */
-    public ByteArrayInputStream(byte buf[], int offset, int length) {
-        this.buf = buf;
-        this.pos = offset;
-        this.count = Math.min(offset + length, buf.length);
-        this.mark = offset;
-    }
-
-    /**
-     * Reads the next byte of data from this input stream. The value byte is
-     * returned as an <code>int</code> in the range <code>0</code> to
-     * <code>255</code>. If no byte is available because the end of the stream
-     * has been reached, the value <code>-1</code> is returned.
-     * <p>
-     * This <code>read</code> method cannot block.
-     *
-     * @return the next byte of data, or <code>-1</code> if the end of the
-     * stream has been reached.
-     */
-    @Override
-    public int read() {
-        return (pos < count) ? (buf[pos++] & 0xff) : -1;
-    }
-
-    /**
-     * Reads up to <code>len</code> bytes of data into an array of bytes from
-     * this input stream. If <code>pos</code> equals <code>count</code>, then
-     * <code>-1</code> is returned to indicate end of file. Otherwise, the
-     * number <code>k</code> of bytes read is equal to the smaller of
-     * <code>len</code> and <code>count-pos</code>. If <code>k</code> is
-     * positive, then bytes <code>buf[pos]</code> through
-     * <code>buf[pos+k-1]</code> are copied into <code>b[off]</code> through
-     * <code>b[off+k-1]</code> in the manner performed by
-     * <code>System.arraycopy</code>. The value <code>k</code> is added into
-     * <code>pos</code> and <code>k</code> is returned.
-     * <p>
-     * This <code>read</code> method cannot block.
-     *
-     * @param b the buffer into which the data is read.
-     * @param off the start offset in the destination array <code>b</code>
-     * @param len the maximum number of bytes read.
-     * @return the total number of bytes read into the buffer, or
-     * <code>-1</code> if there is no more data because the end of the stream
-     * has been reached.
-     * @exception NullPointerException If <code>b</code> is <code>null</code>.
-     * @exception IndexOutOfBoundsException If <code>off</code> is negative,
-     * <code>len</code> is negative, or <code>len</code> is greater than
-     * <code>b.length - off</code>
-     */
-    @Override
-    public int read(byte b[], int off, int len) {
-        if (b == null) {
-            throw new NullPointerException();
-        } else if (off < 0 || len < 0 || len > b.length - off) {
-            throw new IndexOutOfBoundsException();
-        }
-
-        if (pos >= count) {
-            return -1;
-        }
-
-        int avail = count - pos;
-        if (len > avail) {
-            len = avail;
-        }
-        if (len <= 0) {
-            return 0;
-        }
-        System.arraycopy(buf, pos, b, off, len);
-        pos += len;
-        return len;
-    }
-
-    /**
-     * Skips <code>n</code> bytes of input from this input stream. Fewer bytes
-     * might be skipped if the end of the input stream is reached. The actual
-     * number <code>k</code> of bytes to be skipped is equal to the smaller of
-     * <code>n</code> and  <code>count-pos</code>. The value <code>k</code> is
-     * added into <code>pos</code> and <code>k</code> is returned.
-     *
-     * @param n the number of bytes to be skipped.
-     * @return the actual number of bytes skipped.
-     */
-    @Override
-    public long skip(long n) {
-        long k = count - pos;
-        if (n < k) {
-            k = n < 0 ? 0 : n;
-        }
-
-        pos += k;
-        return k;
-    }
-
-    /**
-     * Returns the number of remaining bytes that can be read (or skipped over)
-     * from this input stream.
-     * <p>
-     * The value returned is <code>count&nbsp;- pos</code>, which is the number
-     * of bytes remaining to be read from the input buffer.
-     *
-     * @return the number of remaining bytes that can be read (or skipped over)
-     * from this input stream without blocking.
-     */
-    @Override
-    public int available() {
-        return count - pos;
-    }
-
-    /**
-     * Tests if this <code>InputStream</code> supports mark/reset. The
-     * <code>markSupported</code> method of <code>ByteArrayInputStream</code>
-     * always returns <code>true</code>.
-     *
-     * @since JDK1.1
-     */
-    @Override
-    public boolean markSupported() {
-        return true;
-    }
-
-    /**
-     * Set the current marked position in the stream. ByteArrayInputStream
-     * objects are marked at position zero by default when constructed. They may
-     * be marked at another position within the buffer by this method.
-     * <p>
-     * If no mark has been set, then the value of the mark is the offset passed
-     * to the constructor (or 0 if the offset was not supplied).
-     *
-     * <p>
-     * Note: The <code>readAheadLimit</code> for this class has no meaning.
-     *
-     * @since JDK1.1
-     */
-    @Override
-    public void mark(int readAheadLimit) {
-        mark = pos;
-    }
-
-    /**
-     * Resets the buffer to the marked position. The marked position is 0 unless
-     * another position was marked or an offset was specified in the
-     * constructor.
-     */
-    @Override
-    public void reset() {
-        pos = mark;
-    }
-
-    /**
-     * Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in
-     * this class can be called after the stream has been closed without
-     * generating an <tt>IOException</tt>.
-     * <p>
-     */
-    @Override
-    public void close() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
deleted file mode 100644
index 459563b..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-
-/**
- * This class provides a more efficient implementation of the
- * java.io.ByteArrayOutputStream. The efficiency is gained in two ways:
- * <ul>
- * <li>The write methods are not synchronized</li>
- * <li>The class provides {@link #getUnderlyingBuffer()} and
- * {@link #getBufferLength()}, which can be used to access the underlying byte
- * array directly, rather than the System.arraycopy that {@link #toByteArray()}
- * uses
- * </ul>
- *
- */
-public class ByteArrayOutputStream extends OutputStream {
-
-    /**
-     * The buffer where data is stored.
-     */
-    protected byte buf[];
-
-    /**
-     * The number of valid bytes in the buffer.
-     */
-    protected int count;
-
-    /**
-     * Creates a new byte array output stream. The buffer capacity is initially
-     * 32 bytes, though its size increases if necessary.
-     */
-    public ByteArrayOutputStream() {
-        this(32);
-    }
-
-    /**
-     * Creates a new byte array output stream, with a buffer capacity of the
-     * specified size, in bytes.
-     *
-     * @param size the initial size.
-     * @exception IllegalArgumentException if size is negative.
-     */
-    public ByteArrayOutputStream(int size) {
-        if (size < 0) {
-            throw new IllegalArgumentException("Negative initial size: "
-                    + size);
-        }
-        buf = new byte[size];
-    }
-
-    /**
-     * Increases the capacity if necessary to ensure that it can hold at least
-     * the number of elements specified by the minimum capacity argument.
-     *
-     * @param minCapacity the desired minimum capacity
-     * @throws OutOfMemoryError if {@code minCapacity < 0}. This is interpreted
-     * as a request for the unsatisfiably large capacity
-     * {@code (long) Integer.MAX_VALUE + (minCapacity - Integer.MAX_VALUE)}.
-     */
-    private void ensureCapacity(int minCapacity) {
-        // overflow-conscious code
-        if (minCapacity - buf.length > 0) {
-            grow(minCapacity);
-        }
-    }
-
-    /**
-     * Increases the capacity to ensure that it can hold at least the number of
-     * elements specified by the minimum capacity argument.
-     *
-     * @param minCapacity the desired minimum capacity
-     */
-    private void grow(int minCapacity) {
-        // overflow-conscious code
-        int oldCapacity = buf.length;
-        int newCapacity = oldCapacity << 1;
-        if (newCapacity - minCapacity < 0) {
-            newCapacity = minCapacity;
-        }
-        if (newCapacity < 0) {
-            if (minCapacity < 0) // overflow
-            {
-                throw new OutOfMemoryError();
-            }
-            newCapacity = Integer.MAX_VALUE;
-        }
-        buf = Arrays.copyOf(buf, newCapacity);
-    }
-
-    /**
-     * Writes the specified byte to this byte array output stream.
-     *
-     * @param b the byte to be written.
-     */
-    @Override
-    public void write(int b) {
-        ensureCapacity(count + 1);
-        buf[count] = (byte) b;
-        count += 1;
-    }
-
-    /**
-     * Writes <code>len</code> bytes from the specified byte array starting at
-     * offset <code>off</code> to this byte array output stream.
-     *
-     * @param b the data.
-     * @param off the start offset in the data.
-     * @param len the number of bytes to write.
-     */
-    @Override
-    public void write(byte b[], int off, int len) {
-        if ((off < 0) || (off > b.length) || (len < 0)
-                || ((off + len) - b.length > 0)) {
-            throw new IndexOutOfBoundsException();
-        }
-        ensureCapacity(count + len);
-        System.arraycopy(b, off, buf, count, len);
-        count += len;
-    }
-
-    /**
-     * Writes the complete contents of this byte array output stream to the
-     * specified output stream argument, as if by calling the output stream's
-     * write method using <code>out.write(buf, 0, count)</code>.
-     *
-     * @param out the output stream to which to write the data.
-     * @exception IOException if an I/O error occurs.
-     */
-    public void writeTo(OutputStream out) throws IOException {
-        out.write(buf, 0, count);
-    }
-
-    /**
-     * Resets the <code>count</code> field of this byte array output stream to
-     * zero, so that all currently accumulated output in the output stream is
-     * discarded. The output stream can be used again, reusing the already
-     * allocated buffer space.
-     *
-     * @see java.io.ByteArrayInputStream#count
-     */
-    public void reset() {
-        count = 0;
-    }
-
-    /**
-     * Creates a newly allocated byte array. Its size is the current size of
-     * this output stream and the valid contents of the buffer have been copied
-     * into it.
-     *
-     * @return the current contents of this output stream, as a byte array.
-     * @see java.io.ByteArrayOutputStream#size()
-     */
-    public byte toByteArray   () 
-        [] {
-        return Arrays.copyOf(buf, count);
-    }
-
-    /**
-     * Returns the current size of the buffer.
-     *
-     * @return the value of the <code>count</code> field, which is the number of
-     * valid bytes in this output stream.
-     * @see java.io.ByteArrayOutputStream#count
-     */
-    public int size() {
-        return count;
-    }
-
-    /**
-     * Converts the buffer's contents into a string decoding bytes using the
-     * platform's default character set. The length of the new <tt>String</tt>
-     * is a function of the character set, and hence may not be equal to the
-     * size of the buffer.
-     *
-     * <p>
-     * This method always replaces malformed-input and unmappable-character
-     * sequences with the default replacement string for the platform's default
-     * character set. The {@linkplain java.nio.charset.CharsetDecoder} class
-     * should be used when more control over the decoding process is required.
-     *
-     * @return String decoded from the buffer's contents.
-     * @since JDK1.1
-     */
-    @Override
-    public String toString() {
-        return new String(buf, 0, count);
-    }
-
-    /**
-     * Converts the buffer's contents into a string by decoding the bytes using
-     * the specified {@link java.nio.charset.Charset charsetName}. The length of
-     * the new <tt>String</tt> is a function of the charset, and hence may not
-     * be equal to the length of the byte array.
-     *
-     * <p>
-     * This method always replaces malformed-input and unmappable-character
-     * sequences with this charset's default replacement string. The {@link
-     * java.nio.charset.CharsetDecoder} class should be used when more control
-     * over the decoding process is required.
-     *
-     * @param charsetName the name of a supported
-     *              {@linkplain java.nio.charset.Charset <code>charset</code>}
-     * @return String decoded from the buffer's contents.
-     * @exception UnsupportedEncodingException If the named charset is not
-     * supported
-     * @since JDK1.1
-     */
-    public String toString(String charsetName) throws UnsupportedEncodingException {
-        return new String(buf, 0, count, charsetName);
-    }
-
-    /**
-     * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in
-     * this class can be called after the stream has been closed without
-     * generating an <tt>IOException</tt>.
-     * <p>
-     *
-     */
-    @Override
-    public void close() {
-    }
-
-    public byte[] getUnderlyingBuffer() {
-        return buf;
-    }
-
-    public int getBufferLength() {
-        return count;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
deleted file mode 100644
index 8294af3..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public class ByteCountingInputStream extends InputStream {
-
-    private final InputStream in;
-    private long bytesRead = 0L;
-    private long bytesSkipped = 0L;
-
-    private long bytesSinceMark = 0L;
-
-    public ByteCountingInputStream(final InputStream in) {
-        this.in = in;
-    }
-
-    @Override
-    public int read() throws IOException {
-        final int fromSuper = in.read();
-        if (fromSuper >= 0) {
-            bytesRead++;
-            bytesSinceMark++;
-        }
-        return fromSuper;
-    }
-
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        final int fromSuper = in.read(b, off, len);
-        if (fromSuper >= 0) {
-            bytesRead += fromSuper;
-            bytesSinceMark += fromSuper;
-        }
-
-        return fromSuper;
-    }
-
-    @Override
-    public int read(byte[] b) throws IOException {
-        return read(b, 0, b.length);
-    }
-
-    @Override
-    public long skip(final long n) throws IOException {
-        final long skipped = in.skip(n);
-        if (skipped >= 0) {
-            bytesSkipped += skipped;
-            bytesSinceMark += skipped;
-        }
-        return skipped;
-    }
-
-    public long getBytesRead() {
-        return bytesRead;
-    }
-
-    public long getBytesSkipped() {
-        return bytesSkipped;
-    }
-
-    public long getBytesConsumed() {
-        return getBytesRead() + getBytesSkipped();
-    }
-
-    @Override
-    public void mark(final int readlimit) {
-        in.mark(readlimit);
-
-        bytesSinceMark = 0L;
-    }
-
-    @Override
-    public boolean markSupported() {
-        return in.markSupported();
-    }
-
-    @Override
-    public void reset() throws IOException {
-        in.reset();
-        bytesRead -= bytesSinceMark;
-    }
-
-    @Override
-    public void close() throws IOException {
-        in.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
deleted file mode 100644
index d8e1a42..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class ByteCountingOutputStream extends OutputStream {
-
-    private final OutputStream out;
-    private long bytesWritten = 0L;
-
-    public ByteCountingOutputStream(final OutputStream out) {
-        this.out = out;
-    }
-
-    @Override
-    public void write(int b) throws IOException {
-        out.write(b);
-        bytesWritten++;
-    }
-
-    @Override
-    public void write(byte[] b) throws IOException {
-        write(b, 0, b.length);
-    }
-
-    ;
-    
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-        out.write(b, off, len);
-        bytesWritten += len;
-    }
-
-    public long getBytesWritten() {
-        return bytesWritten;
-    }
-
-    @Override
-    public void flush() throws IOException {
-        out.flush();
-    }
-
-    @Override
-    public void close() throws IOException {
-        out.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
deleted file mode 100644
index 1dd90f5..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.DataOutput;
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * This class is different from java.io.DataOutputStream in that it does
- * synchronize on its methods.
- */
-public class DataOutputStream extends FilterOutputStream implements DataOutput {
-
-    /**
-     * The number of bytes written to the data output stream so far. If this
-     * counter overflows, it will be wrapped to Integer.MAX_VALUE.
-     */
-    protected int written;
-
-    /**
-     * bytearr is initialized on demand by writeUTF
-     */
-    private byte[] bytearr = null;
-
-    /**
-     * Creates a new data output stream to write data to the specified
-     * underlying output stream. The counter <code>written</code> is set to
-     * zero.
-     *
-     * @param out the underlying output stream, to be saved for later use.
-     * @see java.io.FilterOutputStream#out
-     */
-    public DataOutputStream(OutputStream out) {
-        super(out);
-    }
-
-    /**
-     * Increases the written counter by the specified value until it reaches
-     * Integer.MAX_VALUE.
-     */
-    private void incCount(int value) {
-        int temp = written + value;
-        if (temp < 0) {
-            temp = Integer.MAX_VALUE;
-        }
-        written = temp;
-    }
-
-    /**
-     * Writes the specified byte (the low eight bits of the argument
-     * <code>b</code>) to the underlying output stream. If no exception is
-     * thrown, the counter <code>written</code> is incremented by
-     * <code>1</code>.
-     * <p>
-     * Implements the <code>write</code> method of <code>OutputStream</code>.
-     *
-     * @param b the <code>byte</code> to be written.
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     */
-    @Override
-    public void write(int b) throws IOException {
-        out.write(b);
-        incCount(1);
-    }
-
-    /**
-     * Writes <code>len</code> bytes from the specified byte array starting at
-     * offset <code>off</code> to the underlying output stream. If no exception
-     * is thrown, the counter <code>written</code> is incremented by
-     * <code>len</code>.
-     *
-     * @param b the data.
-     * @param off the start offset in the data.
-     * @param len the number of bytes to write.
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     */
-    @Override
-    public void write(byte b[], int off, int len) throws IOException {
-        out.write(b, off, len);
-        incCount(len);
-    }
-
-    /**
-     * Flushes this data output stream. This forces any buffered output bytes to
-     * be written out to the stream.
-     * <p>
-     * The <code>flush</code> method of <code>DataOutputStream</code> calls the
-     * <code>flush</code> method of its underlying output stream.
-     *
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     * @see java.io.OutputStream#flush()
-     */
-    @Override
-    public void flush() throws IOException {
-        out.flush();
-    }
-
-    /**
-     * Writes a <code>boolean</code> to the underlying output stream as a 1-byte
-     * value. The value <code>true</code> is written out as the value
-     * <code>(byte)1</code>; the value <code>false</code> is written out as the
-     * value <code>(byte)0</code>. If no exception is thrown, the counter
-     * <code>written</code> is incremented by <code>1</code>.
-     *
-     * @param v a <code>boolean</code> value to be written.
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     */
-    @Override
-    public final void writeBoolean(boolean v) throws IOException {
-        out.write(v ? 1 : 0);
-        incCount(1);
-    }
-
-    /**
-     * Writes out a <code>byte</code> to the underlying output stream as a
-     * 1-byte value. If no exception is thrown, the counter <code>written</code>
-     * is incremented by <code>1</code>.
-     *
-     * @param v a <code>byte</code> value to be written.
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     */
-    @Override
-    public final void writeByte(int v) throws IOException {
-        out.write(v);
-        incCount(1);
-    }
-
-    /**
-     * Writes a <code>short</code> to the underlying output stream as two bytes,
-     * high byte first. If no exception is thrown, the counter
-     * <code>written</code> is incremented by <code>2</code>.
-     *
-     * @param v a <code>short</code> to be written.
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     */
-    @Override
-    public final void writeShort(int v) throws IOException {
-        out.write((v >>> 8) & 0xFF);
-        out.write((v) & 0xFF);
-        incCount(2);
-    }
-
-    /**
-     * Writes a <code>char</code> to the underlying output stream as a 2-byte
-     * value, high byte first. If no exception is thrown, the counter
-     * <code>written</code> is incremented by <code>2</code>.
-     *
-     * @param v a <code>char</code> value to be written.
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     */
-    @Override
-    public final void writeChar(int v) throws IOException {
-        out.write((v >>> 8) & 0xFF);
-        out.write((v) & 0xFF);
-        incCount(2);
-    }
-
-    /**
-     * Writes an <code>int</code> to the underlying output stream as four bytes,
-     * high byte first. If no exception is thrown, the counter
-     * <code>written</code> is incremented by <code>4</code>.
-     *
-     * @param v an <code>int</code> to be written.
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     */
-    @Override
-    public final void writeInt(int v) throws IOException {
-        out.write((v >>> 24) & 0xFF);
-        out.write((v >>> 16) & 0xFF);
-        out.write((v >>> 8) & 0xFF);
-        out.write((v) & 0xFF);
-        incCount(4);
-    }
-
-    private final byte writeBuffer[] = new byte[8];
-
-    /**
-     * Writes a <code>long</code> to the underlying output stream as eight
-     * bytes, high byte first. In no exception is thrown, the counter
-     * <code>written</code> is incremented by <code>8</code>.
-     *
-     * @param v a <code>long</code> to be written.
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     */
-    @Override
-    public final void writeLong(long v) throws IOException {
-        writeBuffer[0] = (byte) (v >>> 56);
-        writeBuffer[1] = (byte) (v >>> 48);
-        writeBuffer[2] = (byte) (v >>> 40);
-        writeBuffer[3] = (byte) (v >>> 32);
-        writeBuffer[4] = (byte) (v >>> 24);
-        writeBuffer[5] = (byte) (v >>> 16);
-        writeBuffer[6] = (byte) (v >>> 8);
-        writeBuffer[7] = (byte) (v);
-        out.write(writeBuffer, 0, 8);
-        incCount(8);
-    }
-
-    /**
-     * Converts the float argument to an <code>int</code> using the
-     * <code>floatToIntBits</code> method in class <code>Float</code>, and then
-     * writes that <code>int</code> value to the underlying output stream as a
-     * 4-byte quantity, high byte first. If no exception is thrown, the counter
-     * <code>written</code> is incremented by <code>4</code>.
-     *
-     * @param v a <code>float</code> value to be written.
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     * @see java.lang.Float#floatToIntBits(float)
-     */
-    @Override
-    public final void writeFloat(float v) throws IOException {
-        writeInt(Float.floatToIntBits(v));
-    }
-
-    /**
-     * Converts the double argument to a <code>long</code> using the
-     * <code>doubleToLongBits</code> method in class <code>Double</code>, and
-     * then writes that <code>long</code> value to the underlying output stream
-     * as an 8-byte quantity, high byte first. If no exception is thrown, the
-     * counter <code>written</code> is incremented by <code>8</code>.
-     *
-     * @param v a <code>double</code> value to be written.
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     * @see java.lang.Double#doubleToLongBits(double)
-     */
-    @Override
-    public final void writeDouble(double v) throws IOException {
-        writeLong(Double.doubleToLongBits(v));
-    }
-
-    /**
-     * Writes out the string to the underlying output stream as a sequence of
-     * bytes. Each character in the string is written out, in sequence, by
-     * discarding its high eight bits. If no exception is thrown, the counter
-     * <code>written</code> is incremented by the length of <code>s</code>.
-     *
-     * @param s a string of bytes to be written.
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     */
-    @Override
-    public final void writeBytes(String s) throws IOException {
-        int len = s.length();
-        for (int i = 0; i < len; i++) {
-            out.write((byte) s.charAt(i));
-        }
-        incCount(len);
-    }
-
-    /**
-     * Writes a string to the underlying output stream as a sequence of
-     * characters. Each character is written to the data output stream as if by
-     * the <code>writeChar</code> method. If no exception is thrown, the counter
-     * <code>written</code> is incremented by twice the length of
-     * <code>s</code>.
-     *
-     * @param s a <code>String</code> value to be written.
-     * @exception IOException if an I/O error occurs.
-     * @see java.io.DataOutputStream#writeChar(int)
-     * @see java.io.FilterOutputStream#out
-     */
-    @Override
-    public final void writeChars(String s) throws IOException {
-        int len = s.length();
-        for (int i = 0; i < len; i++) {
-            int v = s.charAt(i);
-            out.write((v >>> 8) & 0xFF);
-            out.write((v) & 0xFF);
-        }
-        incCount(len * 2);
-    }
-
-    /**
-     * Writes a string to the underlying output stream using
-     * <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
-     * encoding in a machine-independent manner.
-     * <p>
-     * First, two bytes are written to the output stream as if by the
-     * <code>writeShort</code> method giving the number of bytes to follow. This
-     * value is the number of bytes actually written out, not the length of the
-     * string. Following the length, each character of the string is output, in
-     * sequence, using the modified UTF-8 encoding for the character. If no
-     * exception is thrown, the counter <code>written</code> is incremented by
-     * the total number of bytes written to the output stream. This will be at
-     * least two plus the length of <code>str</code>, and at most two plus
-     * thrice the length of <code>str</code>.
-     *
-     * @param str a string to be written.
-     * @exception IOException if an I/O error occurs.
-     */
-    @Override
-    public final void writeUTF(String str) throws IOException {
-        writeUTF(str, this);
-    }
-
-    /**
-     * Writes a string to the specified DataOutput using
-     * <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
-     * encoding in a machine-independent manner.
-     * <p>
-     * First, two bytes are written to out as if by the <code>writeShort</code>
-     * method giving the number of bytes to follow. This value is the number of
-     * bytes actually written out, not the length of the string. Following the
-     * length, each character of the string is output, in sequence, using the
-     * modified UTF-8 encoding for the character. If no exception is thrown, the
-     * counter <code>written</code> is incremented by the total number of bytes
-     * written to the output stream. This will be at least two plus the length
-     * of <code>str</code>, and at most two plus thrice the length of
-     * <code>str</code>.
-     *
-     * @param str a string to be written.
-     * @param out destination to write to
-     * @return The number of bytes written out.
-     * @exception IOException if an I/O error occurs.
-     */
-    static int writeUTF(String str, DataOutput out) throws IOException {
-        int strlen = str.length();
-        int utflen = 0;
-        int c, count = 0;
-
-        /* use charAt instead of copying String to char array */
-        for (int i = 0; i < strlen; i++) {
-            c = str.charAt(i);
-            if ((c >= 0x0001) && (c <= 0x007F)) {
-                utflen++;
-            } else if (c > 0x07FF) {
-                utflen += 3;
-            } else {
-                utflen += 2;
-            }
-        }
-
-        if (utflen > 65535) {
-            throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
-        }
-
-        byte[] bytearr = null;
-        if (out instanceof DataOutputStream) {
-            DataOutputStream dos = (DataOutputStream) out;
-            if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2))) {
-                dos.bytearr = new byte[(utflen * 2) + 2];
-            }
-            bytearr = dos.bytearr;
-        } else {
-            bytearr = new byte[utflen + 2];
-        }
-
-        bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
-        bytearr[count++] = (byte) ((utflen) & 0xFF);
-
-        int i = 0;
-        for (i = 0; i < strlen; i++) {
-            c = str.charAt(i);
-            if (!((c >= 0x0001) && (c <= 0x007F))) {
-                break;
-            }
-            bytearr[count++] = (byte) c;
-        }
-
-        for (; i < strlen; i++) {
-            c = str.charAt(i);
-            if ((c >= 0x0001) && (c <= 0x007F)) {
-                bytearr[count++] = (byte) c;
-
-            } else if (c > 0x07FF) {
-                bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
-                bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
-                bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
-            } else {
-                bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
-                bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
-            }
-        }
-        out.write(bytearr, 0, utflen + 2);
-        return utflen + 2;
-    }
-
-    /**
-     * Returns the current value of the counter <code>written</code>, the number
-     * of bytes written to this data output stream so far. If the counter
-     * overflows, it will be wrapped to Integer.MAX_VALUE.
-     *
-     * @return the value of the <code>written</code> field.
-     * @see java.io.DataOutputStream#written
-     */
-    public final int size() {
-        return written;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
deleted file mode 100644
index 2864bbb..0000000
--- a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * <p>
- * This class extends the {@link java.util.zip.GZIPOutputStream} by allowing the
- * constructor to provide a compression level, and uses a default value of 1,
- * rather than 5.
- * </p>
- */
-public class GZIPOutputStream extends java.util.zip.GZIPOutputStream {
-
-    public static final int DEFAULT_COMPRESSION_LEVEL = 1;
-
-    public GZIPOutputStream(final OutputStream out) throws IOException {
-        this(out, DEFAULT_COMPRESSION_LEVEL);
-    }
-
-    public GZIPOutputStream(final OutputStream out, final int compressionLevel) throws IOException {
-        super(out);
-        def.setLevel(compressionLevel);
-    }
-}