You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/19 19:16:13 UTC
[39/59] [abbrv] [partial] incubator-nifi git commit: Reworked overall
directory structure to make releasing nifi vs maven plugis easier
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
deleted file mode 100644
index 5810488..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
+++ /dev/null
@@ -1,602 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.SSLHandshakeException;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.security.cert.CertificateExpiredException;
-import javax.security.cert.CertificateNotYetValidException;
-import javax.security.cert.X509Certificate;
-
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-import org.apache.nifi.remote.io.socket.BufferStateManager;
-import org.apache.nifi.remote.io.socket.BufferStateManager.Direction;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SSLSocketChannel implements Closeable {
-
- public static final int MAX_WRITE_SIZE = 65536;
-
- private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class);
- private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
-
- private final String hostname;
- private final int port;
- private final SSLEngine engine;
- private final SocketAddress socketAddress;
-
- private BufferStateManager streamInManager;
- private BufferStateManager streamOutManager;
- private BufferStateManager appDataManager;
-
- private SocketChannel channel;
-
- private final byte[] oneByteBuffer = new byte[1];
-
- private int timeoutMillis = 30000;
- private volatile boolean connected = false;
- private boolean handshaking = false;
- private boolean closed = false;
- private volatile boolean interrupted = false;
-
- public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException {
- this.socketAddress = new InetSocketAddress(hostname, port);
- this.channel = SocketChannel.open();
- this.hostname = hostname;
- this.port = port;
- this.engine = sslContext.createSSLEngine();
- this.engine.setUseClientMode(client);
- engine.setNeedClientAuth(true);
-
- streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
- streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
- appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
- }
-
- public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException {
- if (!socketChannel.isConnected()) {
- throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
- }
-
- this.channel = socketChannel;
-
- this.socketAddress = socketChannel.getRemoteAddress();
- final Socket socket = socketChannel.socket();
- this.hostname = socket.getInetAddress().getHostName();
- this.port = socket.getPort();
-
- this.engine = sslContext.createSSLEngine();
- this.engine.setUseClientMode(client);
- engine.setNeedClientAuth(true);
-
- streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
- streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
- appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
- }
-
- public void setTimeout(final int millis) {
- this.timeoutMillis = millis;
- }
-
- public int getTimeout() {
- return timeoutMillis;
- }
-
- public void connect() throws SSLHandshakeException, IOException {
- try {
- channel.configureBlocking(false);
- if (!channel.isConnected()) {
- final long startTime = System.currentTimeMillis();
-
- if (!channel.connect(socketAddress)) {
- while (!channel.finishConnect()) {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
- if (System.currentTimeMillis() > startTime + timeoutMillis) {
- throw new SocketTimeoutException("Timed out connecting to " + hostname + ":" + port);
- }
-
- try {
- Thread.sleep(50L);
- } catch (final InterruptedException e) {
- }
- }
- }
- }
- engine.beginHandshake();
-
- performHandshake();
- logger.debug("{} Successfully completed SSL handshake", this);
-
- streamInManager.clear();
- streamOutManager.clear();
- appDataManager.clear();
-
- connected = true;
- } catch (final Exception e) {
- logger.error("{} Failed to connect due to {}", this, e);
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- closeQuietly(channel);
- engine.closeInbound();
- engine.closeOutbound();
- throw e;
- }
- }
-
- public String getDn() throws CertificateExpiredException, CertificateNotYetValidException, SSLPeerUnverifiedException {
- final X509Certificate[] certs = engine.getSession().getPeerCertificateChain();
- if (certs == null || certs.length == 0) {
- throw new SSLPeerUnverifiedException("No certificates found");
- }
-
- final X509Certificate cert = certs[0];
- cert.checkValidity();
- return cert.getSubjectDN().getName().trim();
- }
-
- private void performHandshake() throws IOException {
- // Generate handshake message
- final byte[] emptyMessage = new byte[0];
- handshaking = true;
- logger.debug("{} Performing Handshake", this);
-
- try {
- while (true) {
- switch (engine.getHandshakeStatus()) {
- case FINISHED:
- return;
- case NEED_WRAP: {
- final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
-
- final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
-
- final SSLEngineResult wrapHelloResult = engine.wrap(appDataOut, outboundBuffer);
- if (wrapHelloResult.getStatus() == Status.BUFFER_OVERFLOW) {
- streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
- continue;
- }
-
- if (wrapHelloResult.getStatus() != Status.OK) {
- throw new SSLHandshakeException("Could not generate SSL Handshake information: SSLEngineResult: "
- + wrapHelloResult.toString());
- }
-
- logger.trace("{} Handshake response after wrapping: {}", this, wrapHelloResult);
-
- final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
- final int bytesToSend = readableStreamOut.remaining();
- writeFully(readableStreamOut);
- logger.trace("{} Sent {} bytes of wrapped data for handshake", this, bytesToSend);
-
- streamOutManager.clear();
- }
- continue;
- case NEED_UNWRAP: {
- final ByteBuffer readableDataIn = streamInManager.prepareForRead(0);
- final ByteBuffer appData = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
-
- // Read handshake response from other side
- logger.trace("{} Unwrapping: {} to {}", new Object[]{this, readableDataIn, appData});
- SSLEngineResult handshakeResponseResult = engine.unwrap(readableDataIn, appData);
- logger.trace("{} Handshake response after unwrapping: {}", this, handshakeResponseResult);
-
- if (handshakeResponseResult.getStatus() == Status.BUFFER_UNDERFLOW) {
- final ByteBuffer writableDataIn = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
- final int bytesRead = readData(writableDataIn);
- if (bytesRead > 0) {
- logger.trace("{} Read {} bytes for handshake", this, bytesRead);
- }
-
- if (bytesRead < 0) {
- throw new SSLHandshakeException("Reached End-of-File marker while performing handshake");
- }
- } else if (handshakeResponseResult.getStatus() == Status.CLOSED) {
- throw new IOException("Channel was closed by peer during handshake");
- } else {
- streamInManager.compact();
- appDataManager.clear();
- }
- }
- break;
- case NEED_TASK:
- performTasks();
- continue;
- case NOT_HANDSHAKING:
- return;
- }
- }
- } finally {
- handshaking = false;
- }
- }
-
- private void performTasks() {
- Runnable runnable;
- while ((runnable = engine.getDelegatedTask()) != null) {
- runnable.run();
- }
- }
-
- private void closeQuietly(final Closeable closeable) {
- try {
- closeable.close();
- } catch (final Exception e) {
- }
- }
-
- private int readData(final ByteBuffer dest) throws IOException {
- final long startTime = System.currentTimeMillis();
-
- while (true) {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- if (dest.remaining() == 0) {
- return 0;
- }
-
- final int readCount = channel.read(dest);
-
- if (readCount == 0) {
- if (System.currentTimeMillis() > startTime + timeoutMillis) {
- throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port);
- }
- try {
- TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
- } catch (InterruptedException e) {
- close();
- Thread.currentThread().interrupt(); // set the interrupt status
- throw new ClosedByInterruptException();
- }
-
- continue;
- }
-
- logger.trace("{} Read {} bytes", this, readCount);
- return readCount;
- }
- }
-
- private Status encryptAndWriteFully(final BufferStateManager src) throws IOException {
- SSLEngineResult result = null;
-
- final ByteBuffer buff = src.prepareForRead(0);
- final ByteBuffer outBuff = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
-
- logger.trace("{} Encrypting {} bytes", this, buff.remaining());
- while (buff.remaining() > 0) {
- result = engine.wrap(buff, outBuff);
- if (result.getStatus() == Status.OK) {
- final ByteBuffer readableOutBuff = streamOutManager.prepareForRead(0);
- writeFully(readableOutBuff);
- streamOutManager.clear();
- } else {
- return result.getStatus();
- }
- }
-
- return result.getStatus();
- }
-
- private void writeFully(final ByteBuffer src) throws IOException {
- long lastByteWrittenTime = System.currentTimeMillis();
-
- int bytesWritten = 0;
- while (src.hasRemaining()) {
- if (interrupted) {
- throw new TransmissionDisabledException();
- }
-
- final int written = channel.write(src);
- bytesWritten += written;
- final long now = System.currentTimeMillis();
- if (written > 0) {
- lastByteWrittenTime = now;
- } else {
- if (now > lastByteWrittenTime + timeoutMillis) {
- throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port);
- }
- try {
- TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
- } catch (final InterruptedException e) {
- close();
- Thread.currentThread().interrupt(); // set the interrupt status
- throw new ClosedByInterruptException();
- }
- }
- }
-
- logger.trace("{} Wrote {} bytes", this, bytesWritten);
- }
-
- public boolean isClosed() {
- if (closed) {
- return true;
- }
- // need to detect if peer has sent closure handshake...if so the answer is true
- final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
- int readCount = 0;
- try {
- readCount = channel.read(writableInBuffer);
- } catch (IOException e) {
- logger.error("{} Failed to readData due to {}", new Object[]{this, e});
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- readCount = -1; // treat the condition same as if End of Stream
- }
- if (readCount == 0) {
- return false;
- }
- if (readCount > 0) {
- logger.trace("{} Read {} bytes", this, readCount);
-
- final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
- final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
- try {
- SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
- logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
- if (unwrapResponse.getStatus().equals(Status.CLOSED)) {
- // Drain the incoming TCP buffer
- final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
- int bytesDiscarded = channel.read(discardBuffer);
- while (bytesDiscarded > 0) {
- discardBuffer.clear();
- bytesDiscarded = channel.read(discardBuffer);
- }
- engine.closeInbound();
- } else {
- streamInManager.compact();
- return false;
- }
- } catch (IOException e) {
- logger.error("{} Failed to check if closed due to {}. Closing channel.", new Object[]{this, e});
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- }
- }
- // either readCount is -1, indicating an end of stream, or the peer sent a closure handshake
- // so go ahead and close down the channel
- closeQuietly(channel.socket());
- closeQuietly(channel);
- closed = true;
- return true;
- }
-
- @Override
- public void close() throws IOException {
- logger.debug("{} Closing Connection", this);
- if (channel == null) {
- return;
- }
-
- if (closed) {
- return;
- }
-
- try {
- engine.closeOutbound();
-
- final byte[] emptyMessage = new byte[0];
-
- final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
- final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
- final SSLEngineResult handshakeResult = engine.wrap(appDataOut, outboundBuffer);
-
- if (handshakeResult.getStatus() != Status.CLOSED) {
- throw new IOException("Invalid close state - will not send network data");
- }
-
- final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
- writeFully(readableStreamOut);
- } finally {
- // Drain the incoming TCP buffer
- final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
- try {
- int bytesDiscarded = channel.read(discardBuffer);
- while (bytesDiscarded > 0) {
- discardBuffer.clear();
- bytesDiscarded = channel.read(discardBuffer);
- }
- } catch (Exception e) {
- }
-
- closeQuietly(channel.socket());
- closeQuietly(channel);
- closed = true;
- }
- }
-
- private int copyFromAppDataBuffer(final byte[] buffer, final int offset, final int len) {
- // If any data already exists in the application data buffer, copy it to the buffer.
- final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
-
- final int appDataRemaining = appDataBuffer.remaining();
- if (appDataRemaining > 0) {
- final int bytesToCopy = Math.min(len, appDataBuffer.remaining());
- appDataBuffer.get(buffer, offset, bytesToCopy);
-
- final int bytesCopied = appDataRemaining - appDataBuffer.remaining();
- logger.trace("{} Copied {} ({}) bytes from unencrypted application buffer to user space",
- new Object[]{this, bytesToCopy, bytesCopied});
- return bytesCopied;
- }
- return 0;
- }
-
- public int available() throws IOException {
- ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
- ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
- final int buffered = appDataBuffer.remaining() + streamDataBuffer.remaining();
- if (buffered > 0) {
- return buffered;
- }
-
- final boolean wasAbleToRead = isDataAvailable();
- if (!wasAbleToRead) {
- return 0;
- }
-
- appDataBuffer = appDataManager.prepareForRead(1);
- streamDataBuffer = streamInManager.prepareForRead(1);
- return appDataBuffer.remaining() + streamDataBuffer.remaining();
- }
-
- public boolean isDataAvailable() throws IOException {
- final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
- final ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
-
- if (appDataBuffer.remaining() > 0 || streamDataBuffer.remaining() > 0) {
- return true;
- }
-
- final ByteBuffer writableBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
- final int bytesRead = channel.read(writableBuffer);
- return (bytesRead > 0);
- }
-
- public int read() throws IOException {
- final int bytesRead = read(oneByteBuffer);
- if (bytesRead == -1) {
- return -1;
- }
- return oneByteBuffer[0] & 0xFF;
- }
-
- public int read(final byte[] buffer) throws IOException {
- return read(buffer, 0, buffer.length);
- }
-
- public int read(final byte[] buffer, final int offset, final int len) throws IOException {
- logger.debug("{} Reading up to {} bytes of data", this, len);
-
- if (!connected) {
- connect();
- }
-
- int copied = copyFromAppDataBuffer(buffer, offset, len);
- if (copied > 0) {
- return copied;
- }
-
- appDataManager.clear();
-
- while (true) {
- // prepare buffers and call unwrap
- final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
- SSLEngineResult unwrapResponse = null;
- final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
- unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
- logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
-
- switch (unwrapResponse.getStatus()) {
- case BUFFER_OVERFLOW:
- throw new SSLHandshakeException("Buffer Overflow, which is not allowed to happen from an unwrap");
- case BUFFER_UNDERFLOW: {
-// appDataManager.prepareForRead(engine.getSession().getApplicationBufferSize());
-
- final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
- final int bytesRead = readData(writableInBuffer);
- if (bytesRead < 0) {
- return -1;
- }
-
- continue;
- }
- case CLOSED:
- throw new IOException("Channel is closed");
- case OK: {
- copied = copyFromAppDataBuffer(buffer, offset, len);
- if (copied == 0) {
- throw new IOException("Failed to decrypt data");
- }
- streamInManager.compact();
- return copied;
- }
- }
- }
- }
-
- public void write(final int data) throws IOException {
- write(new byte[]{(byte) data}, 0, 1);
- }
-
- public void write(final byte[] data) throws IOException {
- write(data, 0, data.length);
- }
-
- public void write(final byte[] data, final int offset, final int len) throws IOException {
- logger.debug("{} Writing {} bytes of data", this, len);
-
- if (!connected) {
- connect();
- }
-
- int iterations = len / MAX_WRITE_SIZE;
- if (len % MAX_WRITE_SIZE > 0) {
- iterations++;
- }
-
- for (int i = 0; i < iterations; i++) {
- streamOutManager.clear();
- final int itrOffset = offset + i * MAX_WRITE_SIZE;
- final int itrLen = Math.min(len - itrOffset, MAX_WRITE_SIZE);
- final ByteBuffer byteBuffer = ByteBuffer.wrap(data, itrOffset, itrLen);
-
- final BufferStateManager buffMan = new BufferStateManager(byteBuffer, Direction.READ);
- final Status status = encryptAndWriteFully(buffMan);
- switch (status) {
- case BUFFER_OVERFLOW:
- streamOutManager.ensureSize(engine.getSession().getPacketBufferSize());
- appDataManager.ensureSize(engine.getSession().getApplicationBufferSize());
- continue;
- case OK:
- continue;
- case CLOSED:
- throw new IOException("Channel is closed");
- case BUFFER_UNDERFLOW:
- throw new AssertionError("Got Buffer Underflow but should not have...");
- }
- }
- }
-
- public void interrupt() {
- this.interrupted = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
deleted file mode 100644
index 154bd08..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public class SSLSocketChannelInputStream extends InputStream {
-
- private final SSLSocketChannel channel;
-
- public SSLSocketChannelInputStream(final SSLSocketChannel channel) {
- this.channel = channel;
- }
-
- @Override
- public int read() throws IOException {
- return channel.read();
- }
-
- @Override
- public int read(final byte[] b) throws IOException {
- return channel.read(b);
- }
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException {
- return channel.read(b, off, len);
- }
-
- /**
- * Closes the underlying SSLSocketChannel, which will also close the
- * OutputStream and connection
- */
- @Override
- public void close() throws IOException {
- channel.close();
- }
-
- @Override
- public int available() throws IOException {
- return channel.available();
- }
-
- public boolean isDataAvailable() throws IOException {
- return available() > 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
deleted file mode 100644
index ce4e420..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.io.socket.ssl;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class SSLSocketChannelOutputStream extends OutputStream {
-
- private final SSLSocketChannel channel;
-
- public SSLSocketChannelOutputStream(final SSLSocketChannel channel) {
- this.channel = channel;
- }
-
- @Override
- public void write(final int b) throws IOException {
- channel.write(b);
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- channel.write(b);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- channel.write(b, off, len);
- }
-
- /**
- * Closes the underlying SSLSocketChannel, which also will close the
- * InputStream and the connection
- */
- @Override
- public void close() throws IOException {
- channel.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
deleted file mode 100644
index aaf37ea..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.InputStream;
-
-/**
- * This class is a slight modification of the BufferedInputStream in the java.io
- * package. The modification is that this implementation does not provide
- * synchronization on method calls, which means that this class is not suitable
- * for use by multiple threads. However, the absence of these synchronized
- * blocks results in potentially much better performance.
- */
-public class BufferedInputStream extends java.io.BufferedInputStream {
-
- public BufferedInputStream(final InputStream in) {
- super(in);
- }
-
- public BufferedInputStream(final InputStream in, final int size) {
- super(in, size);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
deleted file mode 100644
index eadfcab..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * This class is a slight modification of the
- * {@link java.io.BufferedOutputStream} class. This implementation differs in
- * that it does not mark methods as synchronized. This means that this class is
- * not suitable for writing by multiple concurrent threads. However, the removal
- * of the synchronized keyword results in potentially much better performance.
- */
-public class BufferedOutputStream extends FilterOutputStream {
-
- /**
- * The internal buffer where data is stored.
- */
- protected byte buf[];
-
- /**
- * The number of valid bytes in the buffer. This value is always in the
- * range <tt>0</tt> through <tt>buf.length</tt>; elements
- * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte data.
- */
- protected int count;
-
- /**
- * Creates a new buffered output stream to write data to the specified
- * underlying output stream.
- *
- * @param out the underlying output stream.
- */
- public BufferedOutputStream(OutputStream out) {
- this(out, 8192);
- }
-
- /**
- * Creates a new buffered output stream to write data to the specified
- * underlying output stream with the specified buffer size.
- *
- * @param out the underlying output stream.
- * @param size the buffer size.
- * @exception IllegalArgumentException if size <= 0.
- */
- public BufferedOutputStream(OutputStream out, int size) {
- super(out);
- if (size <= 0) {
- throw new IllegalArgumentException("Buffer size <= 0");
- }
- buf = new byte[size];
- }
-
- /**
- * Flush the internal buffer
- */
- private void flushBuffer() throws IOException {
- if (count > 0) {
- out.write(buf, 0, count);
- count = 0;
- }
- }
-
- /**
- * Writes the specified byte to this buffered output stream.
- *
- * @param b the byte to be written.
- * @exception IOException if an I/O error occurs.
- */
- @Override
- public void write(int b) throws IOException {
- if (count >= buf.length) {
- flushBuffer();
- }
- buf[count++] = (byte) b;
- }
-
- /**
- * Writes <code>len</code> bytes from the specified byte array starting at
- * offset <code>off</code> to this buffered output stream.
- *
- * <p>
- * Ordinarily this method stores bytes from the given array into this
- * stream's buffer, flushing the buffer to the underlying output stream as
- * needed. If the requested length is at least as large as this stream's
- * buffer, however, then this method will flush the buffer and write the
- * bytes directly to the underlying output stream. Thus redundant
- * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- * @exception IOException if an I/O error occurs.
- */
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- if (len >= buf.length) {
- /* If the request length exceeds the size of the output buffer,
- flush the output buffer and then write the data directly.
- In this way buffered streams will cascade harmlessly. */
- flushBuffer();
- out.write(b, off, len);
- return;
- }
- if (len >= buf.length - count) {
- flushBuffer();
- }
- System.arraycopy(b, off, buf, count, len);
- count += len;
- }
-
- /**
- * Flushes this buffered output stream. This forces any buffered output
- * bytes to be written out to the underlying output stream.
- *
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public void flush() throws IOException {
- flushBuffer();
- out.flush();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
deleted file mode 100644
index 284cd54..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.InputStream;
-
-/**
- * This class performs the same function as java.io.ByteArrayInputStream but
- * does not mark its methods as synchronized
- */
-public class ByteArrayInputStream extends InputStream {
-
- /**
- * An array of bytes that was provided by the creator of the stream.
- * Elements <code>buf[0]</code> through <code>buf[count-1]</code> are the
- * only bytes that can ever be read from the stream; element
- * <code>buf[pos]</code> is the next byte to be read.
- */
- protected byte buf[];
-
- /**
- * The index of the next character to read from the input stream buffer.
- * This value should always be nonnegative and not larger than the value of
- * <code>count</code>. The next byte to be read from the input stream buffer
- * will be <code>buf[pos]</code>.
- */
- protected int pos;
-
- /**
- * The currently marked position in the stream. ByteArrayInputStream objects
- * are marked at position zero by default when constructed. They may be
- * marked at another position within the buffer by the <code>mark()</code>
- * method. The current buffer position is set to this point by the
- * <code>reset()</code> method.
- * <p>
- * If no mark has been set, then the value of mark is the offset passed to
- * the constructor (or 0 if the offset was not supplied).
- *
- * @since JDK1.1
- */
- protected int mark = 0;
-
- /**
- * The index one greater than the last valid character in the input stream
- * buffer. This value should always be nonnegative and not larger than the
- * length of <code>buf</code>. It is one greater than the position of the
- * last byte within <code>buf</code> that can ever be read from the input
- * stream buffer.
- */
- protected int count;
-
- /**
- * Creates a <code>ByteArrayInputStream</code> so that it uses
- * <code>buf</code> as its buffer array. The buffer array is not copied. The
- * initial value of <code>pos</code> is <code>0</code> and the initial value
- * of <code>count</code> is the length of <code>buf</code>.
- *
- * @param buf the input buffer.
- */
- public ByteArrayInputStream(byte buf[]) {
- this.buf = buf;
- this.pos = 0;
- this.count = buf.length;
- }
-
- /**
- * Creates <code>ByteArrayInputStream</code> that uses <code>buf</code> as
- * its buffer array. The initial value of <code>pos</code> is
- * <code>offset</code> and the initial value of <code>count</code> is the
- * minimum of <code>offset+length</code> and <code>buf.length</code>. The
- * buffer array is not copied. The buffer's mark is set to the specified
- * offset.
- *
- * @param buf the input buffer.
- * @param offset the offset in the buffer of the first byte to read.
- * @param length the maximum number of bytes to read from the buffer.
- */
- public ByteArrayInputStream(byte buf[], int offset, int length) {
- this.buf = buf;
- this.pos = offset;
- this.count = Math.min(offset + length, buf.length);
- this.mark = offset;
- }
-
- /**
- * Reads the next byte of data from this input stream. The value byte is
- * returned as an <code>int</code> in the range <code>0</code> to
- * <code>255</code>. If no byte is available because the end of the stream
- * has been reached, the value <code>-1</code> is returned.
- * <p>
- * This <code>read</code> method cannot block.
- *
- * @return the next byte of data, or <code>-1</code> if the end of the
- * stream has been reached.
- */
- @Override
- public int read() {
- return (pos < count) ? (buf[pos++] & 0xff) : -1;
- }
-
- /**
- * Reads up to <code>len</code> bytes of data into an array of bytes from
- * this input stream. If <code>pos</code> equals <code>count</code>, then
- * <code>-1</code> is returned to indicate end of file. Otherwise, the
- * number <code>k</code> of bytes read is equal to the smaller of
- * <code>len</code> and <code>count-pos</code>. If <code>k</code> is
- * positive, then bytes <code>buf[pos]</code> through
- * <code>buf[pos+k-1]</code> are copied into <code>b[off]</code> through
- * <code>b[off+k-1]</code> in the manner performed by
- * <code>System.arraycopy</code>. The value <code>k</code> is added into
- * <code>pos</code> and <code>k</code> is returned.
- * <p>
- * This <code>read</code> method cannot block.
- *
- * @param b the buffer into which the data is read.
- * @param off the start offset in the destination array <code>b</code>
- * @param len the maximum number of bytes read.
- * @return the total number of bytes read into the buffer, or
- * <code>-1</code> if there is no more data because the end of the stream
- * has been reached.
- * @exception NullPointerException If <code>b</code> is <code>null</code>.
- * @exception IndexOutOfBoundsException If <code>off</code> is negative,
- * <code>len</code> is negative, or <code>len</code> is greater than
- * <code>b.length - off</code>
- */
- @Override
- public int read(byte b[], int off, int len) {
- if (b == null) {
- throw new NullPointerException();
- } else if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- }
-
- if (pos >= count) {
- return -1;
- }
-
- int avail = count - pos;
- if (len > avail) {
- len = avail;
- }
- if (len <= 0) {
- return 0;
- }
- System.arraycopy(buf, pos, b, off, len);
- pos += len;
- return len;
- }
-
- /**
- * Skips <code>n</code> bytes of input from this input stream. Fewer bytes
- * might be skipped if the end of the input stream is reached. The actual
- * number <code>k</code> of bytes to be skipped is equal to the smaller of
- * <code>n</code> and <code>count-pos</code>. The value <code>k</code> is
- * added into <code>pos</code> and <code>k</code> is returned.
- *
- * @param n the number of bytes to be skipped.
- * @return the actual number of bytes skipped.
- */
- @Override
- public long skip(long n) {
- long k = count - pos;
- if (n < k) {
- k = n < 0 ? 0 : n;
- }
-
- pos += k;
- return k;
- }
-
- /**
- * Returns the number of remaining bytes that can be read (or skipped over)
- * from this input stream.
- * <p>
- * The value returned is <code>count - pos</code>, which is the number
- * of bytes remaining to be read from the input buffer.
- *
- * @return the number of remaining bytes that can be read (or skipped over)
- * from this input stream without blocking.
- */
- @Override
- public int available() {
- return count - pos;
- }
-
- /**
- * Tests if this <code>InputStream</code> supports mark/reset. The
- * <code>markSupported</code> method of <code>ByteArrayInputStream</code>
- * always returns <code>true</code>.
- *
- * @since JDK1.1
- */
- @Override
- public boolean markSupported() {
- return true;
- }
-
- /**
- * Set the current marked position in the stream. ByteArrayInputStream
- * objects are marked at position zero by default when constructed. They may
- * be marked at another position within the buffer by this method.
- * <p>
- * If no mark has been set, then the value of the mark is the offset passed
- * to the constructor (or 0 if the offset was not supplied).
- *
- * <p>
- * Note: The <code>readAheadLimit</code> for this class has no meaning.
- *
- * @since JDK1.1
- */
- @Override
- public void mark(int readAheadLimit) {
- mark = pos;
- }
-
- /**
- * Resets the buffer to the marked position. The marked position is 0 unless
- * another position was marked or an offset was specified in the
- * constructor.
- */
- @Override
- public void reset() {
- pos = mark;
- }
-
- /**
- * Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in
- * this class can be called after the stream has been closed without
- * generating an <tt>IOException</tt>.
- * <p>
- */
- @Override
- public void close() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
deleted file mode 100644
index 459563b..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-
-/**
- * This class provides a more efficient implementation of the
- * java.io.ByteArrayOutputStream. The efficiency is gained in two ways:
- * <ul>
- * <li>The write methods are not synchronized</li>
- * <li>The class provides {@link #getUnderlyingBuffer()} and
- * {@link #getBufferLength()}, which can be used to access the underlying byte
- * array directly, rather than the System.arraycopy that {@link #toByteArray()}
- * uses
- * </ul>
- *
- */
-public class ByteArrayOutputStream extends OutputStream {
-
- /**
- * The buffer where data is stored.
- */
- protected byte buf[];
-
- /**
- * The number of valid bytes in the buffer.
- */
- protected int count;
-
- /**
- * Creates a new byte array output stream. The buffer capacity is initially
- * 32 bytes, though its size increases if necessary.
- */
- public ByteArrayOutputStream() {
- this(32);
- }
-
- /**
- * Creates a new byte array output stream, with a buffer capacity of the
- * specified size, in bytes.
- *
- * @param size the initial size.
- * @exception IllegalArgumentException if size is negative.
- */
- public ByteArrayOutputStream(int size) {
- if (size < 0) {
- throw new IllegalArgumentException("Negative initial size: "
- + size);
- }
- buf = new byte[size];
- }
-
- /**
- * Increases the capacity if necessary to ensure that it can hold at least
- * the number of elements specified by the minimum capacity argument.
- *
- * @param minCapacity the desired minimum capacity
- * @throws OutOfMemoryError if {@code minCapacity < 0}. This is interpreted
- * as a request for the unsatisfiably large capacity
- * {@code (long) Integer.MAX_VALUE + (minCapacity - Integer.MAX_VALUE)}.
- */
- private void ensureCapacity(int minCapacity) {
- // overflow-conscious code
- if (minCapacity - buf.length > 0) {
- grow(minCapacity);
- }
- }
-
- /**
- * Increases the capacity to ensure that it can hold at least the number of
- * elements specified by the minimum capacity argument.
- *
- * @param minCapacity the desired minimum capacity
- */
- private void grow(int minCapacity) {
- // overflow-conscious code
- int oldCapacity = buf.length;
- int newCapacity = oldCapacity << 1;
- if (newCapacity - minCapacity < 0) {
- newCapacity = minCapacity;
- }
- if (newCapacity < 0) {
- if (minCapacity < 0) // overflow
- {
- throw new OutOfMemoryError();
- }
- newCapacity = Integer.MAX_VALUE;
- }
- buf = Arrays.copyOf(buf, newCapacity);
- }
-
- /**
- * Writes the specified byte to this byte array output stream.
- *
- * @param b the byte to be written.
- */
- @Override
- public void write(int b) {
- ensureCapacity(count + 1);
- buf[count] = (byte) b;
- count += 1;
- }
-
- /**
- * Writes <code>len</code> bytes from the specified byte array starting at
- * offset <code>off</code> to this byte array output stream.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- */
- @Override
- public void write(byte b[], int off, int len) {
- if ((off < 0) || (off > b.length) || (len < 0)
- || ((off + len) - b.length > 0)) {
- throw new IndexOutOfBoundsException();
- }
- ensureCapacity(count + len);
- System.arraycopy(b, off, buf, count, len);
- count += len;
- }
-
- /**
- * Writes the complete contents of this byte array output stream to the
- * specified output stream argument, as if by calling the output stream's
- * write method using <code>out.write(buf, 0, count)</code>.
- *
- * @param out the output stream to which to write the data.
- * @exception IOException if an I/O error occurs.
- */
- public void writeTo(OutputStream out) throws IOException {
- out.write(buf, 0, count);
- }
-
- /**
- * Resets the <code>count</code> field of this byte array output stream to
- * zero, so that all currently accumulated output in the output stream is
- * discarded. The output stream can be used again, reusing the already
- * allocated buffer space.
- *
- * @see java.io.ByteArrayInputStream#count
- */
- public void reset() {
- count = 0;
- }
-
- /**
- * Creates a newly allocated byte array. Its size is the current size of
- * this output stream and the valid contents of the buffer have been copied
- * into it.
- *
- * @return the current contents of this output stream, as a byte array.
- * @see java.io.ByteArrayOutputStream#size()
- */
- public byte toByteArray ()
- [] {
- return Arrays.copyOf(buf, count);
- }
-
- /**
- * Returns the current size of the buffer.
- *
- * @return the value of the <code>count</code> field, which is the number of
- * valid bytes in this output stream.
- * @see java.io.ByteArrayOutputStream#count
- */
- public int size() {
- return count;
- }
-
- /**
- * Converts the buffer's contents into a string decoding bytes using the
- * platform's default character set. The length of the new <tt>String</tt>
- * is a function of the character set, and hence may not be equal to the
- * size of the buffer.
- *
- * <p>
- * This method always replaces malformed-input and unmappable-character
- * sequences with the default replacement string for the platform's default
- * character set. The {@linkplain java.nio.charset.CharsetDecoder} class
- * should be used when more control over the decoding process is required.
- *
- * @return String decoded from the buffer's contents.
- * @since JDK1.1
- */
- @Override
- public String toString() {
- return new String(buf, 0, count);
- }
-
- /**
- * Converts the buffer's contents into a string by decoding the bytes using
- * the specified {@link java.nio.charset.Charset charsetName}. The length of
- * the new <tt>String</tt> is a function of the charset, and hence may not
- * be equal to the length of the byte array.
- *
- * <p>
- * This method always replaces malformed-input and unmappable-character
- * sequences with this charset's default replacement string. The {@link
- * java.nio.charset.CharsetDecoder} class should be used when more control
- * over the decoding process is required.
- *
- * @param charsetName the name of a supported
- * {@linkplain java.nio.charset.Charset <code>charset</code>}
- * @return String decoded from the buffer's contents.
- * @exception UnsupportedEncodingException If the named charset is not
- * supported
- * @since JDK1.1
- */
- public String toString(String charsetName) throws UnsupportedEncodingException {
- return new String(buf, 0, count, charsetName);
- }
-
- /**
- * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in
- * this class can be called after the stream has been closed without
- * generating an <tt>IOException</tt>.
- * <p>
- *
- */
- @Override
- public void close() {
- }
-
- public byte[] getUnderlyingBuffer() {
- return buf;
- }
-
- public int getBufferLength() {
- return count;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
deleted file mode 100644
index 8294af3..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public class ByteCountingInputStream extends InputStream {
-
- private final InputStream in;
- private long bytesRead = 0L;
- private long bytesSkipped = 0L;
-
- private long bytesSinceMark = 0L;
-
- public ByteCountingInputStream(final InputStream in) {
- this.in = in;
- }
-
- @Override
- public int read() throws IOException {
- final int fromSuper = in.read();
- if (fromSuper >= 0) {
- bytesRead++;
- bytesSinceMark++;
- }
- return fromSuper;
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- final int fromSuper = in.read(b, off, len);
- if (fromSuper >= 0) {
- bytesRead += fromSuper;
- bytesSinceMark += fromSuper;
- }
-
- return fromSuper;
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- @Override
- public long skip(final long n) throws IOException {
- final long skipped = in.skip(n);
- if (skipped >= 0) {
- bytesSkipped += skipped;
- bytesSinceMark += skipped;
- }
- return skipped;
- }
-
- public long getBytesRead() {
- return bytesRead;
- }
-
- public long getBytesSkipped() {
- return bytesSkipped;
- }
-
- public long getBytesConsumed() {
- return getBytesRead() + getBytesSkipped();
- }
-
- @Override
- public void mark(final int readlimit) {
- in.mark(readlimit);
-
- bytesSinceMark = 0L;
- }
-
- @Override
- public boolean markSupported() {
- return in.markSupported();
- }
-
- @Override
- public void reset() throws IOException {
- in.reset();
- bytesRead -= bytesSinceMark;
- }
-
- @Override
- public void close() throws IOException {
- in.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
deleted file mode 100644
index d8e1a42..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class ByteCountingOutputStream extends OutputStream {
-
- private final OutputStream out;
- private long bytesWritten = 0L;
-
- public ByteCountingOutputStream(final OutputStream out) {
- this.out = out;
- }
-
- @Override
- public void write(int b) throws IOException {
- out.write(b);
- bytesWritten++;
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
- ;
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- out.write(b, off, len);
- bytesWritten += len;
- }
-
- public long getBytesWritten() {
- return bytesWritten;
- }
-
- @Override
- public void flush() throws IOException {
- out.flush();
- }
-
- @Override
- public void close() throws IOException {
- out.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
deleted file mode 100644
index 1dd90f5..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.DataOutput;
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * This class is different from java.io.DataOutputStream in that it does
- * synchronize on its methods.
- */
-public class DataOutputStream extends FilterOutputStream implements DataOutput {
-
- /**
- * The number of bytes written to the data output stream so far. If this
- * counter overflows, it will be wrapped to Integer.MAX_VALUE.
- */
- protected int written;
-
- /**
- * bytearr is initialized on demand by writeUTF
- */
- private byte[] bytearr = null;
-
- /**
- * Creates a new data output stream to write data to the specified
- * underlying output stream. The counter <code>written</code> is set to
- * zero.
- *
- * @param out the underlying output stream, to be saved for later use.
- * @see java.io.FilterOutputStream#out
- */
- public DataOutputStream(OutputStream out) {
- super(out);
- }
-
- /**
- * Increases the written counter by the specified value until it reaches
- * Integer.MAX_VALUE.
- */
- private void incCount(int value) {
- int temp = written + value;
- if (temp < 0) {
- temp = Integer.MAX_VALUE;
- }
- written = temp;
- }
-
- /**
- * Writes the specified byte (the low eight bits of the argument
- * <code>b</code>) to the underlying output stream. If no exception is
- * thrown, the counter <code>written</code> is incremented by
- * <code>1</code>.
- * <p>
- * Implements the <code>write</code> method of <code>OutputStream</code>.
- *
- * @param b the <code>byte</code> to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public void write(int b) throws IOException {
- out.write(b);
- incCount(1);
- }
-
- /**
- * Writes <code>len</code> bytes from the specified byte array starting at
- * offset <code>off</code> to the underlying output stream. If no exception
- * is thrown, the counter <code>written</code> is incremented by
- * <code>len</code>.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public void write(byte b[], int off, int len) throws IOException {
- out.write(b, off, len);
- incCount(len);
- }
-
- /**
- * Flushes this data output stream. This forces any buffered output bytes to
- * be written out to the stream.
- * <p>
- * The <code>flush</code> method of <code>DataOutputStream</code> calls the
- * <code>flush</code> method of its underlying output stream.
- *
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- * @see java.io.OutputStream#flush()
- */
- @Override
- public void flush() throws IOException {
- out.flush();
- }
-
- /**
- * Writes a <code>boolean</code> to the underlying output stream as a 1-byte
- * value. The value <code>true</code> is written out as the value
- * <code>(byte)1</code>; the value <code>false</code> is written out as the
- * value <code>(byte)0</code>. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>1</code>.
- *
- * @param v a <code>boolean</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeBoolean(boolean v) throws IOException {
- out.write(v ? 1 : 0);
- incCount(1);
- }
-
- /**
- * Writes out a <code>byte</code> to the underlying output stream as a
- * 1-byte value. If no exception is thrown, the counter <code>written</code>
- * is incremented by <code>1</code>.
- *
- * @param v a <code>byte</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeByte(int v) throws IOException {
- out.write(v);
- incCount(1);
- }
-
- /**
- * Writes a <code>short</code> to the underlying output stream as two bytes,
- * high byte first. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>2</code>.
- *
- * @param v a <code>short</code> to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeShort(int v) throws IOException {
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- incCount(2);
- }
-
- /**
- * Writes a <code>char</code> to the underlying output stream as a 2-byte
- * value, high byte first. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>2</code>.
- *
- * @param v a <code>char</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeChar(int v) throws IOException {
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- incCount(2);
- }
-
- /**
- * Writes an <code>int</code> to the underlying output stream as four bytes,
- * high byte first. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>4</code>.
- *
- * @param v an <code>int</code> to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeInt(int v) throws IOException {
- out.write((v >>> 24) & 0xFF);
- out.write((v >>> 16) & 0xFF);
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- incCount(4);
- }
-
- private final byte writeBuffer[] = new byte[8];
-
- /**
- * Writes a <code>long</code> to the underlying output stream as eight
- * bytes, high byte first. In no exception is thrown, the counter
- * <code>written</code> is incremented by <code>8</code>.
- *
- * @param v a <code>long</code> to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeLong(long v) throws IOException {
- writeBuffer[0] = (byte) (v >>> 56);
- writeBuffer[1] = (byte) (v >>> 48);
- writeBuffer[2] = (byte) (v >>> 40);
- writeBuffer[3] = (byte) (v >>> 32);
- writeBuffer[4] = (byte) (v >>> 24);
- writeBuffer[5] = (byte) (v >>> 16);
- writeBuffer[6] = (byte) (v >>> 8);
- writeBuffer[7] = (byte) (v);
- out.write(writeBuffer, 0, 8);
- incCount(8);
- }
-
- /**
- * Converts the float argument to an <code>int</code> using the
- * <code>floatToIntBits</code> method in class <code>Float</code>, and then
- * writes that <code>int</code> value to the underlying output stream as a
- * 4-byte quantity, high byte first. If no exception is thrown, the counter
- * <code>written</code> is incremented by <code>4</code>.
- *
- * @param v a <code>float</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- * @see java.lang.Float#floatToIntBits(float)
- */
- @Override
- public final void writeFloat(float v) throws IOException {
- writeInt(Float.floatToIntBits(v));
- }
-
- /**
- * Converts the double argument to a <code>long</code> using the
- * <code>doubleToLongBits</code> method in class <code>Double</code>, and
- * then writes that <code>long</code> value to the underlying output stream
- * as an 8-byte quantity, high byte first. If no exception is thrown, the
- * counter <code>written</code> is incremented by <code>8</code>.
- *
- * @param v a <code>double</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- * @see java.lang.Double#doubleToLongBits(double)
- */
- @Override
- public final void writeDouble(double v) throws IOException {
- writeLong(Double.doubleToLongBits(v));
- }
-
- /**
- * Writes out the string to the underlying output stream as a sequence of
- * bytes. Each character in the string is written out, in sequence, by
- * discarding its high eight bits. If no exception is thrown, the counter
- * <code>written</code> is incremented by the length of <code>s</code>.
- *
- * @param s a string of bytes to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeBytes(String s) throws IOException {
- int len = s.length();
- for (int i = 0; i < len; i++) {
- out.write((byte) s.charAt(i));
- }
- incCount(len);
- }
-
- /**
- * Writes a string to the underlying output stream as a sequence of
- * characters. Each character is written to the data output stream as if by
- * the <code>writeChar</code> method. If no exception is thrown, the counter
- * <code>written</code> is incremented by twice the length of
- * <code>s</code>.
- *
- * @param s a <code>String</code> value to be written.
- * @exception IOException if an I/O error occurs.
- * @see java.io.DataOutputStream#writeChar(int)
- * @see java.io.FilterOutputStream#out
- */
- @Override
- public final void writeChars(String s) throws IOException {
- int len = s.length();
- for (int i = 0; i < len; i++) {
- int v = s.charAt(i);
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- }
- incCount(len * 2);
- }
-
- /**
- * Writes a string to the underlying output stream using
- * <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
- * encoding in a machine-independent manner.
- * <p>
- * First, two bytes are written to the output stream as if by the
- * <code>writeShort</code> method giving the number of bytes to follow. This
- * value is the number of bytes actually written out, not the length of the
- * string. Following the length, each character of the string is output, in
- * sequence, using the modified UTF-8 encoding for the character. If no
- * exception is thrown, the counter <code>written</code> is incremented by
- * the total number of bytes written to the output stream. This will be at
- * least two plus the length of <code>str</code>, and at most two plus
- * thrice the length of <code>str</code>.
- *
- * @param str a string to be written.
- * @exception IOException if an I/O error occurs.
- */
- @Override
- public final void writeUTF(String str) throws IOException {
- writeUTF(str, this);
- }
-
- /**
- * Writes a string to the specified DataOutput using
- * <a href="DataInput.html#modified-utf-8">modified UTF-8</a>
- * encoding in a machine-independent manner.
- * <p>
- * First, two bytes are written to out as if by the <code>writeShort</code>
- * method giving the number of bytes to follow. This value is the number of
- * bytes actually written out, not the length of the string. Following the
- * length, each character of the string is output, in sequence, using the
- * modified UTF-8 encoding for the character. If no exception is thrown, the
- * counter <code>written</code> is incremented by the total number of bytes
- * written to the output stream. This will be at least two plus the length
- * of <code>str</code>, and at most two plus thrice the length of
- * <code>str</code>.
- *
- * @param str a string to be written.
- * @param out destination to write to
- * @return The number of bytes written out.
- * @exception IOException if an I/O error occurs.
- */
- static int writeUTF(String str, DataOutput out) throws IOException {
- int strlen = str.length();
- int utflen = 0;
- int c, count = 0;
-
- /* use charAt instead of copying String to char array */
- for (int i = 0; i < strlen; i++) {
- c = str.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F)) {
- utflen++;
- } else if (c > 0x07FF) {
- utflen += 3;
- } else {
- utflen += 2;
- }
- }
-
- if (utflen > 65535) {
- throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes");
- }
-
- byte[] bytearr = null;
- if (out instanceof DataOutputStream) {
- DataOutputStream dos = (DataOutputStream) out;
- if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2))) {
- dos.bytearr = new byte[(utflen * 2) + 2];
- }
- bytearr = dos.bytearr;
- } else {
- bytearr = new byte[utflen + 2];
- }
-
- bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
- bytearr[count++] = (byte) ((utflen) & 0xFF);
-
- int i = 0;
- for (i = 0; i < strlen; i++) {
- c = str.charAt(i);
- if (!((c >= 0x0001) && (c <= 0x007F))) {
- break;
- }
- bytearr[count++] = (byte) c;
- }
-
- for (; i < strlen; i++) {
- c = str.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F)) {
- bytearr[count++] = (byte) c;
-
- } else if (c > 0x07FF) {
- bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
- bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
- bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
- } else {
- bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
- bytearr[count++] = (byte) (0x80 | ((c) & 0x3F));
- }
- }
- out.write(bytearr, 0, utflen + 2);
- return utflen + 2;
- }
-
- /**
- * Returns the current value of the counter <code>written</code>, the number
- * of bytes written to this data output stream so far. If the counter
- * overflows, it will be wrapped to Integer.MAX_VALUE.
- *
- * @return the value of the <code>written</code> field.
- * @see java.io.DataOutputStream#written
- */
- public final int size() {
- return written;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
deleted file mode 100644
index 2864bbb..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * <p>
- * This class extends the {@link java.util.zip.GZIPOutputStream} by allowing the
- * constructor to provide a compression level, and uses a default value of 1,
- * rather than 5.
- * </p>
- */
-public class GZIPOutputStream extends java.util.zip.GZIPOutputStream {
-
- public static final int DEFAULT_COMPRESSION_LEVEL = 1;
-
- public GZIPOutputStream(final OutputStream out) throws IOException {
- this(out, DEFAULT_COMPRESSION_LEVEL);
- }
-
- public GZIPOutputStream(final OutputStream out, final int compressionLevel) throws IOException {
- super(out);
- def.setLevel(compressionLevel);
- }
-}