You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/20 06:52:28 UTC
[6/8] incubator-nifi git commit: NIFI-189 Collapsed several commons
libs into one
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java b/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java
deleted file mode 100644
index 33fb444..0000000
--- a/commons/nifi-utils/src/main/java/org/apache/nifi/io/UpdateMonitor.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.io;
-
-import java.io.IOException;
-import java.nio.file.Path;
-
-public interface UpdateMonitor {
-
- Object getCurrentState(Path path) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
new file mode 100644
index 0000000..77c34c9
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class StandardVersionNegotiator implements VersionNegotiator {
+
+ private final List<Integer> versions;
+ private int curVersion;
+
+ public StandardVersionNegotiator(final int... supportedVersions) {
+ if (Objects.requireNonNull(supportedVersions).length == 0) {
+ throw new IllegalArgumentException("At least one version must be supported");
+ }
+
+ final List<Integer> supported = new ArrayList<>();
+ for (final int version : supportedVersions) {
+ supported.add(version);
+ }
+ this.versions = Collections.unmodifiableList(supported);
+ this.curVersion = supportedVersions[0];
+ }
+
+ @Override
+ public int getVersion() {
+ return curVersion;
+ }
+
+ @Override
+ public void setVersion(final int version) throws IllegalArgumentException {
+ if (!isVersionSupported(version)) {
+ throw new IllegalArgumentException("Version " + version + " is not supported");
+ }
+
+ this.curVersion = version;
+ }
+
+ @Override
+ public int getPreferredVersion() {
+ return versions.get(0);
+ }
+
+ @Override
+ public Integer getPreferredVersion(final int maxVersion) {
+ for (final Integer version : this.versions) {
+ if (maxVersion >= version) {
+ return version;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isVersionSupported(final int version) {
+ return versions.contains(version);
+ }
+
+ @Override
+ public List<Integer> getSupportedVersions() {
+ return versions;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
new file mode 100644
index 0000000..74f9b3d
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.List;
+
+public interface VersionNegotiator {
+
+ /**
+ * @return the currently configured Version of this resource
+ */
+ int getVersion();
+
+ /**
+ * Sets the version of this resource to the specified version. Only the
+ * lower byte of the version is relevant.
+ *
+ * @param version
+ * @throws IllegalArgumentException if the given Version is not supported by
+ * this resource, as is indicated by the {@link #isVersionSupported(int)}
+ * method
+ */
+ void setVersion(int version) throws IllegalArgumentException;
+
+ /**
+ *
+ * @return the Version of this resource that is preferred
+ */
+ int getPreferredVersion();
+
+ /**
+ * Gets the preferred version of this resource that is no greater than the
+ * given maxVersion. If no acceptable version exists that is less than
+ * <code>maxVersion</code>, then <code>null</code> is returned
+ *
+ * @param maxVersion
+ * @return
+ */
+ Integer getPreferredVersion(int maxVersion);
+
+ /**
+ * Indicates whether or not the specified version is supported by this
+ * resource
+ *
+ * @param version
+ * @return
+ */
+ boolean isVersionSupported(int version);
+
+ List<Integer> getSupportedVersions();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
new file mode 100644
index 0000000..05fd915
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+/**
+ * Indicates that the user disabled transmission while communications were
+ * taking place with a peer
+ */
+public class TransmissionDisabledException extends RuntimeException {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
new file mode 100644
index 0000000..71cf894
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+public class CompressionInputStream extends InputStream {
+
+ private final InputStream in;
+ private final Inflater inflater;
+
+ private byte[] compressedBuffer;
+ private byte[] buffer;
+
+ private int bufferIndex;
+ private boolean eos = false; // whether or not we've reached the end of stream
+ private boolean allDataRead = false; // different from eos b/c eos means allDataRead == true && buffer is empty
+
+ private final byte[] fourByteBuffer = new byte[4];
+
+ public CompressionInputStream(final InputStream in) {
+ this.in = in;
+ inflater = new Inflater();
+
+ buffer = new byte[0];
+ compressedBuffer = new byte[0];
+ bufferIndex = 1;
+ }
+
+ private String toHex(final byte[] array) {
+ final StringBuilder sb = new StringBuilder("0x");
+ for (final byte b : array) {
+ final String hex = Integer.toHexString(b).toUpperCase();
+ if (hex.length() == 1) {
+ sb.append("0");
+ }
+ sb.append(hex);
+ }
+ return sb.toString();
+ }
+
+ protected void readChunkHeader() throws IOException {
+ // Ensure that we have a valid SYNC chunk
+ fillBuffer(fourByteBuffer);
+ if (!Arrays.equals(CompressionOutputStream.SYNC_BYTES, fourByteBuffer)) {
+ throw new IOException("Invalid CompressionInputStream. Expected first 4 bytes to be 'SYNC' but were " + toHex(fourByteBuffer));
+ }
+
+ // determine the size of the decompressed buffer
+ fillBuffer(fourByteBuffer);
+ buffer = new byte[toInt(fourByteBuffer)];
+
+ // determine the size of the compressed buffer
+ fillBuffer(fourByteBuffer);
+ compressedBuffer = new byte[toInt(fourByteBuffer)];
+
+ bufferIndex = buffer.length; // indicate that buffer is empty
+ }
+
+ private int toInt(final byte[] data) {
+ return ((data[0] & 0xFF) << 24)
+ | ((data[1] & 0xFF) << 16)
+ | ((data[2] & 0xFF) << 8)
+ | (data[3] & 0xFF);
+ }
+
+ protected void bufferAndDecompress() throws IOException {
+ if (allDataRead) {
+ eos = true;
+ return;
+ }
+
+ readChunkHeader();
+ fillBuffer(compressedBuffer);
+
+ inflater.setInput(compressedBuffer);
+ try {
+ inflater.inflate(buffer);
+ } catch (final DataFormatException e) {
+ throw new IOException(e);
+ }
+ inflater.reset();
+
+ bufferIndex = 0;
+ final int moreDataByte = in.read();
+ if (moreDataByte < 1) {
+ allDataRead = true;
+ } else if (moreDataByte > 1) {
+ throw new IOException("Expected indicator of whether or not more data was to come (-1, 0, or 1) but got " + moreDataByte);
+ }
+ }
+
+ private void fillBuffer(final byte[] buffer) throws IOException {
+ int len;
+ int bytesLeft = buffer.length;
+ int bytesRead = 0;
+ while (bytesLeft > 0 && (len = in.read(buffer, bytesRead, bytesLeft)) > 0) {
+ bytesLeft -= len;
+ bytesRead += len;
+ }
+
+ if (bytesRead < buffer.length) {
+ throw new EOFException();
+ }
+ }
+
+ private boolean isBufferEmpty() {
+ return bufferIndex >= buffer.length;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (eos) {
+ return -1;
+ }
+
+ if (isBufferEmpty()) {
+ bufferAndDecompress();
+ }
+
+ if (isBufferEmpty()) {
+ eos = true;
+ return -1;
+ }
+
+ return buffer[bufferIndex++];
+ }
+
+ @Override
+ public int read(final byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ if (eos) {
+ return -1;
+ }
+
+ if (isBufferEmpty()) {
+ bufferAndDecompress();
+ }
+
+ if (isBufferEmpty()) {
+ eos = true;
+ return -1;
+ }
+
+ final int free = buffer.length - bufferIndex;
+ final int bytesToTransfer = Math.min(len, free);
+ System.arraycopy(buffer, bufferIndex, b, off, bytesToTransfer);
+ bufferIndex += bytesToTransfer;
+
+ return bytesToTransfer;
+ }
+
+ /**
+ * Does nothing. Does NOT close underlying InputStream
+ * @throws java.io.IOException
+ */
+ @Override
+ public void close() throws IOException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
new file mode 100644
index 0000000..bc46b0f
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+
+public class CompressionOutputStream extends OutputStream {
+
+ public static final byte[] SYNC_BYTES = new byte[]{'S', 'Y', 'N', 'C'};
+
+ public static final int DEFAULT_COMPRESSION_LEVEL = 1;
+ public static final int DEFAULT_BUFFER_SIZE = 64 << 10;
+ public static final int MIN_BUFFER_SIZE = 8 << 10;
+
+ private final OutputStream out;
+ private final Deflater deflater;
+
+ private final byte[] buffer;
+ private final byte[] compressed;
+
+ private int bufferIndex = 0;
+ private boolean dataWritten = false;
+
+ public CompressionOutputStream(final OutputStream outStream) {
+ this(outStream, DEFAULT_BUFFER_SIZE);
+ }
+
+ public CompressionOutputStream(final OutputStream outStream, final int bufferSize) {
+ this(outStream, bufferSize, DEFAULT_COMPRESSION_LEVEL, Deflater.DEFAULT_STRATEGY);
+ }
+
+ public CompressionOutputStream(final OutputStream outStream, final int bufferSize, final int level, final int strategy) {
+ if (bufferSize < MIN_BUFFER_SIZE) {
+ throw new IllegalArgumentException("Buffer size must be at least " + MIN_BUFFER_SIZE);
+ }
+
+ this.out = outStream;
+ this.deflater = new Deflater(level);
+ this.deflater.setStrategy(strategy);
+ buffer = new byte[bufferSize];
+ compressed = new byte[bufferSize + 64];
+ }
+
+ /**
+ * Compresses the currently buffered chunk of data and sends it to the
+ * output stream
+ *
+ * @throws IOException
+ */
+ protected void compressAndWrite() throws IOException {
+ if (bufferIndex <= 0) {
+ return;
+ }
+
+ deflater.setInput(buffer, 0, bufferIndex);
+ deflater.finish();
+ final int compressedBytes = deflater.deflate(compressed);
+
+ writeChunkHeader(compressedBytes);
+ out.write(compressed, 0, compressedBytes);
+
+ bufferIndex = 0;
+ deflater.reset();
+ }
+
+ private void writeChunkHeader(final int compressedBytes) throws IOException {
+ // If we have already written data, write out a '1' to indicate that we have more data; when we close
+ // the stream, we instead write a '0' to indicate that we are finished sending data.
+ if (dataWritten) {
+ out.write(1);
+ }
+ out.write(SYNC_BYTES);
+ dataWritten = true;
+
+ writeInt(out, bufferIndex);
+ writeInt(out, compressedBytes);
+ }
+
+ private void writeInt(final OutputStream out, final int val) throws IOException {
+ out.write(val >>> 24);
+ out.write(val >>> 16);
+ out.write(val >>> 8);
+ out.write(val);
+ }
+
+ protected boolean bufferFull() {
+ return bufferIndex >= buffer.length;
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ buffer[bufferIndex++] = (byte) (b & 0xFF);
+ if (bufferFull()) {
+ compressAndWrite();
+ }
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ int bytesLeft = len;
+ while (bytesLeft > 0) {
+ final int free = buffer.length - bufferIndex;
+ final int bytesThisIteration = Math.min(bytesLeft, free);
+ System.arraycopy(b, off + len - bytesLeft, buffer, bufferIndex, bytesThisIteration);
+ bufferIndex += bytesThisIteration;
+
+ bytesLeft -= bytesThisIteration;
+ if (bufferFull()) {
+ compressAndWrite();
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ compressAndWrite();
+ super.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ compressAndWrite();
+ out.write(0); // indicate that the stream is finished.
+ out.flush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
new file mode 100644
index 0000000..e03dfbf
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+public class InterruptableInputStream extends InputStream {
+
+ private volatile boolean interrupted = false;
+ private final InputStream in;
+
+ public InterruptableInputStream(final InputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ return in.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ return in.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ return in.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ in.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ in.mark(readlimit);
+ }
+
+ @Override
+ public boolean markSupported() {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ return in.markSupported();
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ in.reset();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ return in.skip(n);
+ }
+
+ public void interrupt() {
+ interrupted = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
new file mode 100644
index 0000000..cba5be6
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+public class InterruptableOutputStream extends OutputStream {
+
+ private final OutputStream out;
+ private volatile boolean interrupted = false;
+
+ public InterruptableOutputStream(final OutputStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ out.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ out.flush();
+ }
+
+ public void interrupt() {
+ this.interrupted = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
new file mode 100644
index 0000000..68913bd
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BufferStateManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(BufferStateManager.class);
+
+ private ByteBuffer buffer;
+ private Direction direction = Direction.WRITE;
+
+ public BufferStateManager(final ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public BufferStateManager(final ByteBuffer buffer, final Direction direction) {
+ this.buffer = buffer;
+ this.direction = direction;
+ }
+
+ /**
+ * Ensures that the buffer is at least as big as the size specified,
+ * resizing the buffer if necessary. This operation MAY change the direction
+ * of the buffer.
+ *
+ * @param requiredSize
+ */
+ public void ensureSize(final int requiredSize) {
+ if (buffer.capacity() < requiredSize) {
+ final ByteBuffer newBuffer = ByteBuffer.allocate(requiredSize);
+
+ // we have to read buffer so make sure the direction is correct.
+ if (direction == Direction.WRITE) {
+ buffer.flip();
+ }
+
+ // Copy from buffer to newBuffer
+ newBuffer.put(buffer);
+
+ // Swap the buffers
+ buffer = newBuffer;
+
+ // the new buffer is ready to be written to
+ direction = Direction.WRITE;
+ }
+ }
+
+ public ByteBuffer prepareForWrite(final int requiredSize) {
+ ensureSize(requiredSize);
+
+ if (direction == Direction.READ) {
+ direction = Direction.WRITE;
+ buffer.position(buffer.limit());
+ }
+
+ buffer.limit(buffer.capacity());
+ return buffer;
+ }
+
+ public ByteBuffer prepareForRead(final int requiredSize) {
+ ensureSize(requiredSize);
+
+ if (direction == Direction.WRITE) {
+ direction = Direction.READ;
+ buffer.flip();
+ }
+
+ return buffer;
+ }
+
+ /**
+ * Clears the contents of the buffer and sets direction to WRITE
+ */
+ public void clear() {
+ logger.debug("Clearing {}", buffer);
+ buffer.clear();
+ direction = Direction.WRITE;
+ }
+
+ public void compact() {
+ final String before = buffer.toString();
+ buffer.compact();
+ logger.debug("Before compact: {}, after: {}", before, buffer);
+ direction = Direction.WRITE;
+ }
+
+ public static enum Direction {
+
+ READ, WRITE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
new file mode 100644
index 0000000..32a3f26
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+public class SocketChannelInputStream extends InputStream {
+
+ private static final long CHANNEL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+ private final SocketChannel channel;
+ private volatile int timeoutMillis = 30000;
+
+ private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
+ private Byte bufferedByte = null;
+
+ public SocketChannelInputStream(final SocketChannel socketChannel) throws IOException {
+ // this class expects a non-blocking channel
+ socketChannel.configureBlocking(false);
+ this.channel = socketChannel;
+ }
+
+ public void setTimeout(final int timeoutMillis) {
+ this.timeoutMillis = timeoutMillis;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (bufferedByte != null) {
+ final int retVal = bufferedByte & 0xFF;
+ bufferedByte = null;
+ return retVal;
+ }
+
+ oneByteBuffer.flip();
+ oneByteBuffer.clear();
+
+ final long maxTime = System.currentTimeMillis() + timeoutMillis;
+ int bytesRead;
+ do {
+ bytesRead = channel.read(oneByteBuffer);
+ if (bytesRead == 0) {
+ if (System.currentTimeMillis() > maxTime) {
+ throw new SocketTimeoutException("Timed out reading from socket");
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
+ }
+ }
+ } while (bytesRead == 0);
+
+ if (bytesRead == -1) {
+ return -1;
+ }
+ oneByteBuffer.flip();
+ return oneByteBuffer.get() & 0xFF;
+ }
+
+ @Override
+ public int read(final byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ if (bufferedByte != null) {
+ final byte retVal = bufferedByte;
+ bufferedByte = null;
+ b[off] = retVal;
+ return 1;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+
+ final long maxTime = System.currentTimeMillis() + timeoutMillis;
+ int bytesRead;
+ do {
+ bytesRead = channel.read(buffer);
+ if (bytesRead == 0) {
+ if (System.currentTimeMillis() > maxTime) {
+ throw new SocketTimeoutException("Timed out reading from socket");
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
+ }
+ }
+ } while (bytesRead == 0);
+
+ return bytesRead;
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (bufferedByte != null) {
+ return 1;
+ }
+
+ isDataAvailable(); // attempt to read from socket
+ return (bufferedByte == null) ? 0 : 1;
+ }
+
+ public boolean isDataAvailable() throws IOException {
+ if (bufferedByte != null) {
+ return true;
+ }
+
+ oneByteBuffer.flip();
+ oneByteBuffer.clear();
+ final int bytesRead = channel.read(oneByteBuffer);
+ if (bytesRead == -1) {
+ throw new EOFException("Peer has closed the stream");
+ }
+ if (bytesRead > 0) {
+ oneByteBuffer.flip();
+ bufferedByte = oneByteBuffer.get();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Closes the underlying socket channel.
+ * @throws java.io.IOException
+ */
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
new file mode 100644
index 0000000..77049ad
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+public class SocketChannelOutputStream extends OutputStream {
+
+ private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+ private final SocketChannel channel;
+ private volatile int timeout = 30000;
+
+ private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
+
+ public SocketChannelOutputStream(final SocketChannel socketChannel) throws IOException {
+ // this class expects a non-blocking channel
+ socketChannel.configureBlocking(false);
+ this.channel = socketChannel;
+ }
+
+ public void setTimeout(final int timeoutMillis) {
+ this.timeout = timeoutMillis;
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ oneByteBuffer.flip();
+ oneByteBuffer.clear();
+ oneByteBuffer.put((byte) b);
+ oneByteBuffer.flip();
+
+ final int timeoutMillis = this.timeout;
+ long maxTime = System.currentTimeMillis() + timeoutMillis;
+ int bytesWritten;
+ while (oneByteBuffer.hasRemaining()) {
+ bytesWritten = channel.write(oneByteBuffer);
+ if (bytesWritten == 0) {
+ if (System.currentTimeMillis() > maxTime) {
+ throw new SocketTimeoutException("Timed out writing to socket");
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
+ }
+ } else {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+
+ final int timeoutMillis = this.timeout;
+ long maxTime = System.currentTimeMillis() + timeoutMillis;
+ int bytesWritten;
+ while (buffer.hasRemaining()) {
+ bytesWritten = channel.write(buffer);
+ if (bytesWritten == 0) {
+ if (System.currentTimeMillis() > maxTime) {
+ throw new SocketTimeoutException("Timed out writing to socket");
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
+ }
+ } else {
+ maxTime = System.currentTimeMillis() + timeoutMillis;
+ }
+ }
+ }
+
+ /**
+ * Closes the underlying SocketChannel
+ * @throws java.io.IOException
+ */
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
new file mode 100644
index 0000000..5810488
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.security.cert.CertificateExpiredException;
+import javax.security.cert.CertificateNotYetValidException;
+import javax.security.cert.X509Certificate;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.io.socket.BufferStateManager;
+import org.apache.nifi.remote.io.socket.BufferStateManager.Direction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSLSocketChannel implements Closeable {
+
+ public static final int MAX_WRITE_SIZE = 65536;
+
+ private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class);
+ private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+
+ private final String hostname;
+ private final int port;
+ private final SSLEngine engine;
+ private final SocketAddress socketAddress;
+
+ private BufferStateManager streamInManager;
+ private BufferStateManager streamOutManager;
+ private BufferStateManager appDataManager;
+
+ private SocketChannel channel;
+
+ private final byte[] oneByteBuffer = new byte[1];
+
+ private int timeoutMillis = 30000;
+ private volatile boolean connected = false;
+ private boolean handshaking = false;
+ private boolean closed = false;
+ private volatile boolean interrupted = false;
+
+ public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException {
+ this.socketAddress = new InetSocketAddress(hostname, port);
+ this.channel = SocketChannel.open();
+ this.hostname = hostname;
+ this.port = port;
+ this.engine = sslContext.createSSLEngine();
+ this.engine.setUseClientMode(client);
+ engine.setNeedClientAuth(true);
+
+ streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+ }
+
+ public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException {
+ if (!socketChannel.isConnected()) {
+ throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
+ }
+
+ this.channel = socketChannel;
+
+ this.socketAddress = socketChannel.getRemoteAddress();
+ final Socket socket = socketChannel.socket();
+ this.hostname = socket.getInetAddress().getHostName();
+ this.port = socket.getPort();
+
+ this.engine = sslContext.createSSLEngine();
+ this.engine.setUseClientMode(client);
+ engine.setNeedClientAuth(true);
+
+ streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+ }
+
+ public void setTimeout(final int millis) {
+ this.timeoutMillis = millis;
+ }
+
+ public int getTimeout() {
+ return timeoutMillis;
+ }
+
+ public void connect() throws SSLHandshakeException, IOException {
+ try {
+ channel.configureBlocking(false);
+ if (!channel.isConnected()) {
+ final long startTime = System.currentTimeMillis();
+
+ if (!channel.connect(socketAddress)) {
+ while (!channel.finishConnect()) {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+ if (System.currentTimeMillis() > startTime + timeoutMillis) {
+ throw new SocketTimeoutException("Timed out connecting to " + hostname + ":" + port);
+ }
+
+ try {
+ Thread.sleep(50L);
+ } catch (final InterruptedException e) {
+ }
+ }
+ }
+ }
+ engine.beginHandshake();
+
+ performHandshake();
+ logger.debug("{} Successfully completed SSL handshake", this);
+
+ streamInManager.clear();
+ streamOutManager.clear();
+ appDataManager.clear();
+
+ connected = true;
+ } catch (final Exception e) {
+ logger.error("{} Failed to connect due to {}", this, e);
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ closeQuietly(channel);
+ engine.closeInbound();
+ engine.closeOutbound();
+ throw e;
+ }
+ }
+
+ public String getDn() throws CertificateExpiredException, CertificateNotYetValidException, SSLPeerUnverifiedException {
+ final X509Certificate[] certs = engine.getSession().getPeerCertificateChain();
+ if (certs == null || certs.length == 0) {
+ throw new SSLPeerUnverifiedException("No certificates found");
+ }
+
+ final X509Certificate cert = certs[0];
+ cert.checkValidity();
+ return cert.getSubjectDN().getName().trim();
+ }
+
+ private void performHandshake() throws IOException {
+ // Generate handshake message
+ final byte[] emptyMessage = new byte[0];
+ handshaking = true;
+ logger.debug("{} Performing Handshake", this);
+
+ try {
+ while (true) {
+ switch (engine.getHandshakeStatus()) {
+ case FINISHED:
+ return;
+ case NEED_WRAP: {
+ final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+
+ final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+ final SSLEngineResult wrapHelloResult = engine.wrap(appDataOut, outboundBuffer);
+ if (wrapHelloResult.getStatus() == Status.BUFFER_OVERFLOW) {
+ streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+ continue;
+ }
+
+ if (wrapHelloResult.getStatus() != Status.OK) {
+ throw new SSLHandshakeException("Could not generate SSL Handshake information: SSLEngineResult: "
+ + wrapHelloResult.toString());
+ }
+
+ logger.trace("{} Handshake response after wrapping: {}", this, wrapHelloResult);
+
+ final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
+ final int bytesToSend = readableStreamOut.remaining();
+ writeFully(readableStreamOut);
+ logger.trace("{} Sent {} bytes of wrapped data for handshake", this, bytesToSend);
+
+ streamOutManager.clear();
+ }
+ continue;
+ case NEED_UNWRAP: {
+ final ByteBuffer readableDataIn = streamInManager.prepareForRead(0);
+ final ByteBuffer appData = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+ // Read handshake response from other side
+ logger.trace("{} Unwrapping: {} to {}", new Object[]{this, readableDataIn, appData});
+ SSLEngineResult handshakeResponseResult = engine.unwrap(readableDataIn, appData);
+ logger.trace("{} Handshake response after unwrapping: {}", this, handshakeResponseResult);
+
+ if (handshakeResponseResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+ final ByteBuffer writableDataIn = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+ final int bytesRead = readData(writableDataIn);
+ if (bytesRead > 0) {
+ logger.trace("{} Read {} bytes for handshake", this, bytesRead);
+ }
+
+ if (bytesRead < 0) {
+ throw new SSLHandshakeException("Reached End-of-File marker while performing handshake");
+ }
+ } else if (handshakeResponseResult.getStatus() == Status.CLOSED) {
+ throw new IOException("Channel was closed by peer during handshake");
+ } else {
+ streamInManager.compact();
+ appDataManager.clear();
+ }
+ }
+ break;
+ case NEED_TASK:
+ performTasks();
+ continue;
+ case NOT_HANDSHAKING:
+ return;
+ }
+ }
+ } finally {
+ handshaking = false;
+ }
+ }
+
+ private void performTasks() {
+ Runnable runnable;
+ while ((runnable = engine.getDelegatedTask()) != null) {
+ runnable.run();
+ }
+ }
+
+ private void closeQuietly(final Closeable closeable) {
+ try {
+ closeable.close();
+ } catch (final Exception e) {
+ }
+ }
+
+ private int readData(final ByteBuffer dest) throws IOException {
+ final long startTime = System.currentTimeMillis();
+
+ while (true) {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ if (dest.remaining() == 0) {
+ return 0;
+ }
+
+ final int readCount = channel.read(dest);
+
+ if (readCount == 0) {
+ if (System.currentTimeMillis() > startTime + timeoutMillis) {
+ throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port);
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException();
+ }
+
+ continue;
+ }
+
+ logger.trace("{} Read {} bytes", this, readCount);
+ return readCount;
+ }
+ }
+
+ private Status encryptAndWriteFully(final BufferStateManager src) throws IOException {
+ SSLEngineResult result = null;
+
+ final ByteBuffer buff = src.prepareForRead(0);
+ final ByteBuffer outBuff = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+ logger.trace("{} Encrypting {} bytes", this, buff.remaining());
+ while (buff.remaining() > 0) {
+ result = engine.wrap(buff, outBuff);
+ if (result.getStatus() == Status.OK) {
+ final ByteBuffer readableOutBuff = streamOutManager.prepareForRead(0);
+ writeFully(readableOutBuff);
+ streamOutManager.clear();
+ } else {
+ return result.getStatus();
+ }
+ }
+
+ return result.getStatus();
+ }
+
+ private void writeFully(final ByteBuffer src) throws IOException {
+ long lastByteWrittenTime = System.currentTimeMillis();
+
+ int bytesWritten = 0;
+ while (src.hasRemaining()) {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ final int written = channel.write(src);
+ bytesWritten += written;
+ final long now = System.currentTimeMillis();
+ if (written > 0) {
+ lastByteWrittenTime = now;
+ } else {
+ if (now > lastByteWrittenTime + timeoutMillis) {
+ throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port);
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+ } catch (final InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException();
+ }
+ }
+ }
+
+ logger.trace("{} Wrote {} bytes", this, bytesWritten);
+ }
+
+ public boolean isClosed() {
+ if (closed) {
+ return true;
+ }
+ // need to detect if peer has sent closure handshake...if so the answer is true
+ final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+ int readCount = 0;
+ try {
+ readCount = channel.read(writableInBuffer);
+ } catch (IOException e) {
+ logger.error("{} Failed to readData due to {}", new Object[]{this, e});
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ readCount = -1; // treat the condition same as if End of Stream
+ }
+ if (readCount == 0) {
+ return false;
+ }
+ if (readCount > 0) {
+ logger.trace("{} Read {} bytes", this, readCount);
+
+ final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
+ final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+ try {
+ SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+ logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
+ if (unwrapResponse.getStatus().equals(Status.CLOSED)) {
+ // Drain the incoming TCP buffer
+ final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+ int bytesDiscarded = channel.read(discardBuffer);
+ while (bytesDiscarded > 0) {
+ discardBuffer.clear();
+ bytesDiscarded = channel.read(discardBuffer);
+ }
+ engine.closeInbound();
+ } else {
+ streamInManager.compact();
+ return false;
+ }
+ } catch (IOException e) {
+ logger.error("{} Failed to check if closed due to {}. Closing channel.", new Object[]{this, e});
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ }
+ }
+ // either readCount is -1, indicating an end of stream, or the peer sent a closure handshake
+ // so go ahead and close down the channel
+ closeQuietly(channel.socket());
+ closeQuietly(channel);
+ closed = true;
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.debug("{} Closing Connection", this);
+ if (channel == null) {
+ return;
+ }
+
+ if (closed) {
+ return;
+ }
+
+ try {
+ engine.closeOutbound();
+
+ final byte[] emptyMessage = new byte[0];
+
+ final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+ final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+ final SSLEngineResult handshakeResult = engine.wrap(appDataOut, outboundBuffer);
+
+ if (handshakeResult.getStatus() != Status.CLOSED) {
+ throw new IOException("Invalid close state - will not send network data");
+ }
+
+ final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
+ writeFully(readableStreamOut);
+ } finally {
+ // Drain the incoming TCP buffer
+ final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+ try {
+ int bytesDiscarded = channel.read(discardBuffer);
+ while (bytesDiscarded > 0) {
+ discardBuffer.clear();
+ bytesDiscarded = channel.read(discardBuffer);
+ }
+ } catch (Exception e) {
+ }
+
+ closeQuietly(channel.socket());
+ closeQuietly(channel);
+ closed = true;
+ }
+ }
+
+ private int copyFromAppDataBuffer(final byte[] buffer, final int offset, final int len) {
+ // If any data already exists in the application data buffer, copy it to the buffer.
+ final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+
+ final int appDataRemaining = appDataBuffer.remaining();
+ if (appDataRemaining > 0) {
+ final int bytesToCopy = Math.min(len, appDataBuffer.remaining());
+ appDataBuffer.get(buffer, offset, bytesToCopy);
+
+ final int bytesCopied = appDataRemaining - appDataBuffer.remaining();
+ logger.trace("{} Copied {} ({}) bytes from unencrypted application buffer to user space",
+ new Object[]{this, bytesToCopy, bytesCopied});
+ return bytesCopied;
+ }
+ return 0;
+ }
+
+ public int available() throws IOException {
+ ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+ ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+ final int buffered = appDataBuffer.remaining() + streamDataBuffer.remaining();
+ if (buffered > 0) {
+ return buffered;
+ }
+
+ final boolean wasAbleToRead = isDataAvailable();
+ if (!wasAbleToRead) {
+ return 0;
+ }
+
+ appDataBuffer = appDataManager.prepareForRead(1);
+ streamDataBuffer = streamInManager.prepareForRead(1);
+ return appDataBuffer.remaining() + streamDataBuffer.remaining();
+ }
+
+ public boolean isDataAvailable() throws IOException {
+ final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+ final ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+
+ if (appDataBuffer.remaining() > 0 || streamDataBuffer.remaining() > 0) {
+ return true;
+ }
+
+ final ByteBuffer writableBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+ final int bytesRead = channel.read(writableBuffer);
+ return (bytesRead > 0);
+ }
+
+ public int read() throws IOException {
+ final int bytesRead = read(oneByteBuffer);
+ if (bytesRead == -1) {
+ return -1;
+ }
+ return oneByteBuffer[0] & 0xFF;
+ }
+
+ public int read(final byte[] buffer) throws IOException {
+ return read(buffer, 0, buffer.length);
+ }
+
+ public int read(final byte[] buffer, final int offset, final int len) throws IOException {
+ logger.debug("{} Reading up to {} bytes of data", this, len);
+
+ if (!connected) {
+ connect();
+ }
+
+ int copied = copyFromAppDataBuffer(buffer, offset, len);
+ if (copied > 0) {
+ return copied;
+ }
+
+ appDataManager.clear();
+
+ while (true) {
+ // prepare buffers and call unwrap
+ final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
+ SSLEngineResult unwrapResponse = null;
+ final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+ unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+ logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
+
+ switch (unwrapResponse.getStatus()) {
+ case BUFFER_OVERFLOW:
+ throw new SSLHandshakeException("Buffer Overflow, which is not allowed to happen from an unwrap");
+ case BUFFER_UNDERFLOW: {
+// appDataManager.prepareForRead(engine.getSession().getApplicationBufferSize());
+
+ final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+ final int bytesRead = readData(writableInBuffer);
+ if (bytesRead < 0) {
+ return -1;
+ }
+
+ continue;
+ }
+ case CLOSED:
+ throw new IOException("Channel is closed");
+ case OK: {
+ copied = copyFromAppDataBuffer(buffer, offset, len);
+ if (copied == 0) {
+ throw new IOException("Failed to decrypt data");
+ }
+ streamInManager.compact();
+ return copied;
+ }
+ }
+ }
+ }
+
+ public void write(final int data) throws IOException {
+ write(new byte[]{(byte) data}, 0, 1);
+ }
+
+ public void write(final byte[] data) throws IOException {
+ write(data, 0, data.length);
+ }
+
+ public void write(final byte[] data, final int offset, final int len) throws IOException {
+ logger.debug("{} Writing {} bytes of data", this, len);
+
+ if (!connected) {
+ connect();
+ }
+
+ int iterations = len / MAX_WRITE_SIZE;
+ if (len % MAX_WRITE_SIZE > 0) {
+ iterations++;
+ }
+
+ for (int i = 0; i < iterations; i++) {
+ streamOutManager.clear();
+ final int itrOffset = offset + i * MAX_WRITE_SIZE;
+ final int itrLen = Math.min(len - itrOffset, MAX_WRITE_SIZE);
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(data, itrOffset, itrLen);
+
+ final BufferStateManager buffMan = new BufferStateManager(byteBuffer, Direction.READ);
+ final Status status = encryptAndWriteFully(buffMan);
+ switch (status) {
+ case BUFFER_OVERFLOW:
+ streamOutManager.ensureSize(engine.getSession().getPacketBufferSize());
+ appDataManager.ensureSize(engine.getSession().getApplicationBufferSize());
+ continue;
+ case OK:
+ continue;
+ case CLOSED:
+ throw new IOException("Channel is closed");
+ case BUFFER_UNDERFLOW:
+ throw new AssertionError("Got Buffer Underflow but should not have...");
+ }
+ }
+ }
+
+ public void interrupt() {
+ this.interrupted = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
new file mode 100644
index 0000000..154bd08
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SSLSocketChannelInputStream extends InputStream {
+
+ private final SSLSocketChannel channel;
+
+ public SSLSocketChannelInputStream(final SSLSocketChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return channel.read();
+ }
+
+ @Override
+ public int read(final byte[] b) throws IOException {
+ return channel.read(b);
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ return channel.read(b, off, len);
+ }
+
+ /**
+ * Closes the underlying SSLSocketChannel, which will also close the
+ * OutputStream and connection
+ */
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+
+ @Override
+ public int available() throws IOException {
+ return channel.available();
+ }
+
+ public boolean isDataAvailable() throws IOException {
+ return available() > 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
new file mode 100644
index 0000000..ce4e420
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class SSLSocketChannelOutputStream extends OutputStream {
+
+ private final SSLSocketChannel channel;
+
+ public SSLSocketChannelOutputStream(final SSLSocketChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ channel.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ channel.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ channel.write(b, off, len);
+ }
+
+ /**
+ * Closes the underlying SSLSocketChannel, which also will close the
+ * InputStream and the connection
+ */
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
new file mode 100644
index 0000000..aaf37ea
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.InputStream;
+
+/**
+ * This class is a slight modification of the BufferedInputStream in the java.io
+ * package. The modification is that this implementation does not provide
+ * synchronization on method calls, which means that this class is not suitable
+ * for use by multiple threads. However, the absence of these synchronized
+ * blocks results in potentially much better performance.
+ */
+public class BufferedInputStream extends java.io.BufferedInputStream {
+
+ public BufferedInputStream(final InputStream in) {
+ super(in);
+ }
+
+ public BufferedInputStream(final InputStream in, final int size) {
+ super(in, size);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1357a17b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
new file mode 100644
index 0000000..eadfcab
--- /dev/null
+++ b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * This class is a slight modification of the
+ * {@link java.io.BufferedOutputStream} class. This implementation differs in
+ * that it does not mark methods as synchronized. This means that this class is
+ * not suitable for writing by multiple concurrent threads. However, the removal
+ * of the synchronized keyword results in potentially much better performance.
+ */
+public class BufferedOutputStream extends FilterOutputStream {
+
+ /**
+ * The internal buffer where data is stored.
+ */
+ protected byte buf[];
+
+ /**
+ * The number of valid bytes in the buffer. This value is always in the
+ * range <tt>0</tt> through <tt>buf.length</tt>; elements
+ * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte data.
+ */
+ protected int count;
+
+ /**
+ * Creates a new buffered output stream to write data to the specified
+ * underlying output stream.
+ *
+ * @param out the underlying output stream.
+ */
+ public BufferedOutputStream(OutputStream out) {
+ this(out, 8192);
+ }
+
+ /**
+ * Creates a new buffered output stream to write data to the specified
+ * underlying output stream with the specified buffer size.
+ *
+ * @param out the underlying output stream.
+ * @param size the buffer size.
+ * @exception IllegalArgumentException if size <= 0.
+ */
+ public BufferedOutputStream(OutputStream out, int size) {
+ super(out);
+ if (size <= 0) {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ buf = new byte[size];
+ }
+
+ /**
+ * Flush the internal buffer
+ */
+ private void flushBuffer() throws IOException {
+ if (count > 0) {
+ out.write(buf, 0, count);
+ count = 0;
+ }
+ }
+
+ /**
+ * Writes the specified byte to this buffered output stream.
+ *
+ * @param b the byte to be written.
+ * @exception IOException if an I/O error occurs.
+ */
+ @Override
+ public void write(int b) throws IOException {
+ if (count >= buf.length) {
+ flushBuffer();
+ }
+ buf[count++] = (byte) b;
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array starting at
+ * offset <code>off</code> to this buffered output stream.
+ *
+ * <p>
+ * Ordinarily this method stores bytes from the given array into this
+ * stream's buffer, flushing the buffer to the underlying output stream as
+ * needed. If the requested length is at least as large as this stream's
+ * buffer, however, then this method will flush the buffer and write the
+ * bytes directly to the underlying output stream. Thus redundant
+ * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
+ *
+ * @param b the data.
+ * @param off the start offset in the data.
+ * @param len the number of bytes to write.
+ * @exception IOException if an I/O error occurs.
+ */
+ @Override
+ public void write(byte b[], int off, int len) throws IOException {
+ if (len >= buf.length) {
+ /* If the request length exceeds the size of the output buffer,
+ flush the output buffer and then write the data directly.
+ In this way buffered streams will cascade harmlessly. */
+ flushBuffer();
+ out.write(b, off, len);
+ return;
+ }
+ if (len >= buf.length - count) {
+ flushBuffer();
+ }
+ System.arraycopy(b, off, buf, count, len);
+ count += len;
+ }
+
+ /**
+ * Flushes this buffered output stream. This forces any buffered output
+ * bytes to be written out to the underlying output stream.
+ *
+ * @exception IOException if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ @Override
+ public void flush() throws IOException {
+ flushBuffer();
+ out.flush();
+ }
+}