You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:30:00 UTC
[37/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
new file mode 100644
index 0000000..bc46b0f
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+
+public class CompressionOutputStream extends OutputStream {
+
+ public static final byte[] SYNC_BYTES = new byte[]{'S', 'Y', 'N', 'C'};
+
+ public static final int DEFAULT_COMPRESSION_LEVEL = 1;
+ public static final int DEFAULT_BUFFER_SIZE = 64 << 10;
+ public static final int MIN_BUFFER_SIZE = 8 << 10;
+
+ private final OutputStream out;
+ private final Deflater deflater;
+
+ private final byte[] buffer;
+ private final byte[] compressed;
+
+ private int bufferIndex = 0;
+ private boolean dataWritten = false;
+
+ public CompressionOutputStream(final OutputStream outStream) {
+ this(outStream, DEFAULT_BUFFER_SIZE);
+ }
+
+ public CompressionOutputStream(final OutputStream outStream, final int bufferSize) {
+ this(outStream, bufferSize, DEFAULT_COMPRESSION_LEVEL, Deflater.DEFAULT_STRATEGY);
+ }
+
+ public CompressionOutputStream(final OutputStream outStream, final int bufferSize, final int level, final int strategy) {
+ if (bufferSize < MIN_BUFFER_SIZE) {
+ throw new IllegalArgumentException("Buffer size must be at least " + MIN_BUFFER_SIZE);
+ }
+
+ this.out = outStream;
+ this.deflater = new Deflater(level);
+ this.deflater.setStrategy(strategy);
+ buffer = new byte[bufferSize];
+ compressed = new byte[bufferSize + 64];
+ }
+
+ /**
+ * Compresses the currently buffered chunk of data and sends it to the
+ * output stream
+ *
+ * @throws IOException
+ */
+ protected void compressAndWrite() throws IOException {
+ if (bufferIndex <= 0) {
+ return;
+ }
+
+ deflater.setInput(buffer, 0, bufferIndex);
+ deflater.finish();
+ final int compressedBytes = deflater.deflate(compressed);
+
+ writeChunkHeader(compressedBytes);
+ out.write(compressed, 0, compressedBytes);
+
+ bufferIndex = 0;
+ deflater.reset();
+ }
+
+ private void writeChunkHeader(final int compressedBytes) throws IOException {
+ // If we have already written data, write out a '1' to indicate that we have more data; when we close
+ // the stream, we instead write a '0' to indicate that we are finished sending data.
+ if (dataWritten) {
+ out.write(1);
+ }
+ out.write(SYNC_BYTES);
+ dataWritten = true;
+
+ writeInt(out, bufferIndex);
+ writeInt(out, compressedBytes);
+ }
+
+ private void writeInt(final OutputStream out, final int val) throws IOException {
+ out.write(val >>> 24);
+ out.write(val >>> 16);
+ out.write(val >>> 8);
+ out.write(val);
+ }
+
+ protected boolean bufferFull() {
+ return bufferIndex >= buffer.length;
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ buffer[bufferIndex++] = (byte) (b & 0xFF);
+ if (bufferFull()) {
+ compressAndWrite();
+ }
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ int bytesLeft = len;
+ while (bytesLeft > 0) {
+ final int free = buffer.length - bufferIndex;
+ final int bytesThisIteration = Math.min(bytesLeft, free);
+ System.arraycopy(b, off + len - bytesLeft, buffer, bufferIndex, bytesThisIteration);
+ bufferIndex += bytesThisIteration;
+
+ bytesLeft -= bytesThisIteration;
+ if (bufferFull()) {
+ compressAndWrite();
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ compressAndWrite();
+ super.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ compressAndWrite();
+ out.write(0); // indicate that the stream is finished.
+ out.flush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
new file mode 100644
index 0000000..e03dfbf
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+public class InterruptableInputStream extends InputStream {
+
+ private volatile boolean interrupted = false;
+ private final InputStream in;
+
+ public InterruptableInputStream(final InputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ return in.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ return in.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ return in.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ in.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ in.mark(readlimit);
+ }
+
+ @Override
+ public boolean markSupported() {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ return in.markSupported();
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ in.reset();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ return in.skip(n);
+ }
+
+ public void interrupt() {
+ interrupted = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
new file mode 100644
index 0000000..cba5be6
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+
+public class InterruptableOutputStream extends OutputStream {
+
+ private final OutputStream out;
+ private volatile boolean interrupted = false;
+
+ public InterruptableOutputStream(final OutputStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ out.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ out.flush();
+ }
+
+ public void interrupt() {
+ this.interrupted = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
new file mode 100644
index 0000000..68913bd
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BufferStateManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(BufferStateManager.class);
+
+ private ByteBuffer buffer;
+ private Direction direction = Direction.WRITE;
+
+ public BufferStateManager(final ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public BufferStateManager(final ByteBuffer buffer, final Direction direction) {
+ this.buffer = buffer;
+ this.direction = direction;
+ }
+
+ /**
+ * Ensures that the buffer is at least as big as the size specified,
+ * resizing the buffer if necessary. This operation MAY change the direction
+ * of the buffer.
+ *
+ * @param requiredSize
+ */
+ public void ensureSize(final int requiredSize) {
+ if (buffer.capacity() < requiredSize) {
+ final ByteBuffer newBuffer = ByteBuffer.allocate(requiredSize);
+
+ // we have to read buffer so make sure the direction is correct.
+ if (direction == Direction.WRITE) {
+ buffer.flip();
+ }
+
+ // Copy from buffer to newBuffer
+ newBuffer.put(buffer);
+
+ // Swap the buffers
+ buffer = newBuffer;
+
+ // the new buffer is ready to be written to
+ direction = Direction.WRITE;
+ }
+ }
+
+ public ByteBuffer prepareForWrite(final int requiredSize) {
+ ensureSize(requiredSize);
+
+ if (direction == Direction.READ) {
+ direction = Direction.WRITE;
+ buffer.position(buffer.limit());
+ }
+
+ buffer.limit(buffer.capacity());
+ return buffer;
+ }
+
+ public ByteBuffer prepareForRead(final int requiredSize) {
+ ensureSize(requiredSize);
+
+ if (direction == Direction.WRITE) {
+ direction = Direction.READ;
+ buffer.flip();
+ }
+
+ return buffer;
+ }
+
+ /**
+ * Clears the contents of the buffer and sets direction to WRITE
+ */
+ public void clear() {
+ logger.debug("Clearing {}", buffer);
+ buffer.clear();
+ direction = Direction.WRITE;
+ }
+
+ public void compact() {
+ final String before = buffer.toString();
+ buffer.compact();
+ logger.debug("Before compact: {}, after: {}", before, buffer);
+ direction = Direction.WRITE;
+ }
+
+ public static enum Direction {
+
+ READ, WRITE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
new file mode 100644
index 0000000..32a3f26
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+public class SocketChannelInputStream extends InputStream {
+
+ private static final long CHANNEL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+ private final SocketChannel channel;
+ private volatile int timeoutMillis = 30000;
+
+ private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
+ private Byte bufferedByte = null;
+
+ public SocketChannelInputStream(final SocketChannel socketChannel) throws IOException {
+ // this class expects a non-blocking channel
+ socketChannel.configureBlocking(false);
+ this.channel = socketChannel;
+ }
+
+ public void setTimeout(final int timeoutMillis) {
+ this.timeoutMillis = timeoutMillis;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (bufferedByte != null) {
+ final int retVal = bufferedByte & 0xFF;
+ bufferedByte = null;
+ return retVal;
+ }
+
+ oneByteBuffer.flip();
+ oneByteBuffer.clear();
+
+ final long maxTime = System.currentTimeMillis() + timeoutMillis;
+ int bytesRead;
+ do {
+ bytesRead = channel.read(oneByteBuffer);
+ if (bytesRead == 0) {
+ if (System.currentTimeMillis() > maxTime) {
+ throw new SocketTimeoutException("Timed out reading from socket");
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
+ }
+ }
+ } while (bytesRead == 0);
+
+ if (bytesRead == -1) {
+ return -1;
+ }
+ oneByteBuffer.flip();
+ return oneByteBuffer.get() & 0xFF;
+ }
+
+ @Override
+ public int read(final byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ if (bufferedByte != null) {
+ final byte retVal = bufferedByte;
+ bufferedByte = null;
+ b[off] = retVal;
+ return 1;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+
+ final long maxTime = System.currentTimeMillis() + timeoutMillis;
+ int bytesRead;
+ do {
+ bytesRead = channel.read(buffer);
+ if (bytesRead == 0) {
+ if (System.currentTimeMillis() > maxTime) {
+ throw new SocketTimeoutException("Timed out reading from socket");
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS);
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation
+ }
+ }
+ } while (bytesRead == 0);
+
+ return bytesRead;
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (bufferedByte != null) {
+ return 1;
+ }
+
+ isDataAvailable(); // attempt to read from socket
+ return (bufferedByte == null) ? 0 : 1;
+ }
+
+ public boolean isDataAvailable() throws IOException {
+ if (bufferedByte != null) {
+ return true;
+ }
+
+ oneByteBuffer.flip();
+ oneByteBuffer.clear();
+ final int bytesRead = channel.read(oneByteBuffer);
+ if (bytesRead == -1) {
+ throw new EOFException("Peer has closed the stream");
+ }
+ if (bytesRead > 0) {
+ oneByteBuffer.flip();
+ bufferedByte = oneByteBuffer.get();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Closes the underlying socket channel.
+ * @throws java.io.IOException
+ */
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
new file mode 100644
index 0000000..77049ad
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+public class SocketChannelOutputStream extends OutputStream {
+
+ private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+ private final SocketChannel channel;
+ private volatile int timeout = 30000;
+
+ private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1);
+
+ public SocketChannelOutputStream(final SocketChannel socketChannel) throws IOException {
+ // this class expects a non-blocking channel
+ socketChannel.configureBlocking(false);
+ this.channel = socketChannel;
+ }
+
+ public void setTimeout(final int timeoutMillis) {
+ this.timeout = timeoutMillis;
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ oneByteBuffer.flip();
+ oneByteBuffer.clear();
+ oneByteBuffer.put((byte) b);
+ oneByteBuffer.flip();
+
+ final int timeoutMillis = this.timeout;
+ long maxTime = System.currentTimeMillis() + timeoutMillis;
+ int bytesWritten;
+ while (oneByteBuffer.hasRemaining()) {
+ bytesWritten = channel.write(oneByteBuffer);
+ if (bytesWritten == 0) {
+ if (System.currentTimeMillis() > maxTime) {
+ throw new SocketTimeoutException("Timed out writing to socket");
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
+ }
+ } else {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void write(final byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ final ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
+
+ final int timeoutMillis = this.timeout;
+ long maxTime = System.currentTimeMillis() + timeoutMillis;
+ int bytesWritten;
+ while (buffer.hasRemaining()) {
+ bytesWritten = channel.write(buffer);
+ if (bytesWritten == 0) {
+ if (System.currentTimeMillis() > maxTime) {
+ throw new SocketTimeoutException("Timed out writing to socket");
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS);
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation
+ }
+ } else {
+ maxTime = System.currentTimeMillis() + timeoutMillis;
+ }
+ }
+ }
+
+ /**
+ * Closes the underlying SocketChannel
+ * @throws java.io.IOException
+ */
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
new file mode 100644
index 0000000..5810488
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.security.cert.CertificateExpiredException;
+import javax.security.cert.CertificateNotYetValidException;
+import javax.security.cert.X509Certificate;
+
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.io.socket.BufferStateManager;
+import org.apache.nifi.remote.io.socket.BufferStateManager.Direction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSLSocketChannel implements Closeable {
+
+ public static final int MAX_WRITE_SIZE = 65536;
+
+ private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class);
+ private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+
+ private final String hostname;
+ private final int port;
+ private final SSLEngine engine;
+ private final SocketAddress socketAddress;
+
+ private BufferStateManager streamInManager;
+ private BufferStateManager streamOutManager;
+ private BufferStateManager appDataManager;
+
+ private SocketChannel channel;
+
+ private final byte[] oneByteBuffer = new byte[1];
+
+ private int timeoutMillis = 30000;
+ private volatile boolean connected = false;
+ private boolean handshaking = false;
+ private boolean closed = false;
+ private volatile boolean interrupted = false;
+
+ public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException {
+ this.socketAddress = new InetSocketAddress(hostname, port);
+ this.channel = SocketChannel.open();
+ this.hostname = hostname;
+ this.port = port;
+ this.engine = sslContext.createSSLEngine();
+ this.engine.setUseClientMode(client);
+ engine.setNeedClientAuth(true);
+
+ streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+ }
+
+ public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException {
+ if (!socketChannel.isConnected()) {
+ throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
+ }
+
+ this.channel = socketChannel;
+
+ this.socketAddress = socketChannel.getRemoteAddress();
+ final Socket socket = socketChannel.socket();
+ this.hostname = socket.getInetAddress().getHostName();
+ this.port = socket.getPort();
+
+ this.engine = sslContext.createSSLEngine();
+ this.engine.setUseClientMode(client);
+ engine.setNeedClientAuth(true);
+
+ streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+ }
+
+ public void setTimeout(final int millis) {
+ this.timeoutMillis = millis;
+ }
+
+ public int getTimeout() {
+ return timeoutMillis;
+ }
+
+ public void connect() throws SSLHandshakeException, IOException {
+ try {
+ channel.configureBlocking(false);
+ if (!channel.isConnected()) {
+ final long startTime = System.currentTimeMillis();
+
+ if (!channel.connect(socketAddress)) {
+ while (!channel.finishConnect()) {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+ if (System.currentTimeMillis() > startTime + timeoutMillis) {
+ throw new SocketTimeoutException("Timed out connecting to " + hostname + ":" + port);
+ }
+
+ try {
+ Thread.sleep(50L);
+ } catch (final InterruptedException e) {
+ }
+ }
+ }
+ }
+ engine.beginHandshake();
+
+ performHandshake();
+ logger.debug("{} Successfully completed SSL handshake", this);
+
+ streamInManager.clear();
+ streamOutManager.clear();
+ appDataManager.clear();
+
+ connected = true;
+ } catch (final Exception e) {
+ logger.error("{} Failed to connect due to {}", this, e);
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ closeQuietly(channel);
+ engine.closeInbound();
+ engine.closeOutbound();
+ throw e;
+ }
+ }
+
+ public String getDn() throws CertificateExpiredException, CertificateNotYetValidException, SSLPeerUnverifiedException {
+ final X509Certificate[] certs = engine.getSession().getPeerCertificateChain();
+ if (certs == null || certs.length == 0) {
+ throw new SSLPeerUnverifiedException("No certificates found");
+ }
+
+ final X509Certificate cert = certs[0];
+ cert.checkValidity();
+ return cert.getSubjectDN().getName().trim();
+ }
+
+ private void performHandshake() throws IOException {
+ // Generate handshake message
+ final byte[] emptyMessage = new byte[0];
+ handshaking = true;
+ logger.debug("{} Performing Handshake", this);
+
+ try {
+ while (true) {
+ switch (engine.getHandshakeStatus()) {
+ case FINISHED:
+ return;
+ case NEED_WRAP: {
+ final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+
+ final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+ final SSLEngineResult wrapHelloResult = engine.wrap(appDataOut, outboundBuffer);
+ if (wrapHelloResult.getStatus() == Status.BUFFER_OVERFLOW) {
+ streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+ continue;
+ }
+
+ if (wrapHelloResult.getStatus() != Status.OK) {
+ throw new SSLHandshakeException("Could not generate SSL Handshake information: SSLEngineResult: "
+ + wrapHelloResult.toString());
+ }
+
+ logger.trace("{} Handshake response after wrapping: {}", this, wrapHelloResult);
+
+ final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
+ final int bytesToSend = readableStreamOut.remaining();
+ writeFully(readableStreamOut);
+ logger.trace("{} Sent {} bytes of wrapped data for handshake", this, bytesToSend);
+
+ streamOutManager.clear();
+ }
+ continue;
+ case NEED_UNWRAP: {
+ final ByteBuffer readableDataIn = streamInManager.prepareForRead(0);
+ final ByteBuffer appData = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+ // Read handshake response from other side
+ logger.trace("{} Unwrapping: {} to {}", new Object[]{this, readableDataIn, appData});
+ SSLEngineResult handshakeResponseResult = engine.unwrap(readableDataIn, appData);
+ logger.trace("{} Handshake response after unwrapping: {}", this, handshakeResponseResult);
+
+ if (handshakeResponseResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+ final ByteBuffer writableDataIn = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+ final int bytesRead = readData(writableDataIn);
+ if (bytesRead > 0) {
+ logger.trace("{} Read {} bytes for handshake", this, bytesRead);
+ }
+
+ if (bytesRead < 0) {
+ throw new SSLHandshakeException("Reached End-of-File marker while performing handshake");
+ }
+ } else if (handshakeResponseResult.getStatus() == Status.CLOSED) {
+ throw new IOException("Channel was closed by peer during handshake");
+ } else {
+ streamInManager.compact();
+ appDataManager.clear();
+ }
+ }
+ break;
+ case NEED_TASK:
+ performTasks();
+ continue;
+ case NOT_HANDSHAKING:
+ return;
+ }
+ }
+ } finally {
+ handshaking = false;
+ }
+ }
+
+ private void performTasks() {
+ Runnable runnable;
+ while ((runnable = engine.getDelegatedTask()) != null) {
+ runnable.run();
+ }
+ }
+
+ private void closeQuietly(final Closeable closeable) {
+ try {
+ closeable.close();
+ } catch (final Exception e) {
+ }
+ }
+
+ private int readData(final ByteBuffer dest) throws IOException {
+ final long startTime = System.currentTimeMillis();
+
+ while (true) {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ if (dest.remaining() == 0) {
+ return 0;
+ }
+
+ final int readCount = channel.read(dest);
+
+ if (readCount == 0) {
+ if (System.currentTimeMillis() > startTime + timeoutMillis) {
+ throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port);
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException();
+ }
+
+ continue;
+ }
+
+ logger.trace("{} Read {} bytes", this, readCount);
+ return readCount;
+ }
+ }
+
+ private Status encryptAndWriteFully(final BufferStateManager src) throws IOException {
+ SSLEngineResult result = null;
+
+ final ByteBuffer buff = src.prepareForRead(0);
+ final ByteBuffer outBuff = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+ logger.trace("{} Encrypting {} bytes", this, buff.remaining());
+ while (buff.remaining() > 0) {
+ result = engine.wrap(buff, outBuff);
+ if (result.getStatus() == Status.OK) {
+ final ByteBuffer readableOutBuff = streamOutManager.prepareForRead(0);
+ writeFully(readableOutBuff);
+ streamOutManager.clear();
+ } else {
+ return result.getStatus();
+ }
+ }
+
+ return result.getStatus();
+ }
+
+ private void writeFully(final ByteBuffer src) throws IOException {
+ long lastByteWrittenTime = System.currentTimeMillis();
+
+ int bytesWritten = 0;
+ while (src.hasRemaining()) {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ final int written = channel.write(src);
+ bytesWritten += written;
+ final long now = System.currentTimeMillis();
+ if (written > 0) {
+ lastByteWrittenTime = now;
+ } else {
+ if (now > lastByteWrittenTime + timeoutMillis) {
+ throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port);
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+ } catch (final InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException();
+ }
+ }
+ }
+
+ logger.trace("{} Wrote {} bytes", this, bytesWritten);
+ }
+
+ public boolean isClosed() {
+ if (closed) {
+ return true;
+ }
+ // need to detect if peer has sent closure handshake...if so the answer is true
+ final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+ int readCount = 0;
+ try {
+ readCount = channel.read(writableInBuffer);
+ } catch (IOException e) {
+ logger.error("{} Failed to readData due to {}", new Object[]{this, e});
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ readCount = -1; // treat the condition same as if End of Stream
+ }
+ if (readCount == 0) {
+ return false;
+ }
+ if (readCount > 0) {
+ logger.trace("{} Read {} bytes", this, readCount);
+
+ final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
+ final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+ try {
+ SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+ logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
+ if (unwrapResponse.getStatus().equals(Status.CLOSED)) {
+ // Drain the incoming TCP buffer
+ final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+ int bytesDiscarded = channel.read(discardBuffer);
+ while (bytesDiscarded > 0) {
+ discardBuffer.clear();
+ bytesDiscarded = channel.read(discardBuffer);
+ }
+ engine.closeInbound();
+ } else {
+ streamInManager.compact();
+ return false;
+ }
+ } catch (IOException e) {
+ logger.error("{} Failed to check if closed due to {}. Closing channel.", new Object[]{this, e});
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ }
+ }
+ // either readCount is -1, indicating an end of stream, or the peer sent a closure handshake
+ // so go ahead and close down the channel
+ closeQuietly(channel.socket());
+ closeQuietly(channel);
+ closed = true;
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.debug("{} Closing Connection", this);
+ if (channel == null) {
+ return;
+ }
+
+ if (closed) {
+ return;
+ }
+
+ try {
+ engine.closeOutbound();
+
+ final byte[] emptyMessage = new byte[0];
+
+ final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+ final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+ final SSLEngineResult handshakeResult = engine.wrap(appDataOut, outboundBuffer);
+
+ if (handshakeResult.getStatus() != Status.CLOSED) {
+ throw new IOException("Invalid close state - will not send network data");
+ }
+
+ final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
+ writeFully(readableStreamOut);
+ } finally {
+ // Drain the incoming TCP buffer
+ final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+ try {
+ int bytesDiscarded = channel.read(discardBuffer);
+ while (bytesDiscarded > 0) {
+ discardBuffer.clear();
+ bytesDiscarded = channel.read(discardBuffer);
+ }
+ } catch (Exception e) {
+ }
+
+ closeQuietly(channel.socket());
+ closeQuietly(channel);
+ closed = true;
+ }
+ }
+
+ private int copyFromAppDataBuffer(final byte[] buffer, final int offset, final int len) {
+ // If any data already exists in the application data buffer, copy it to the buffer.
+ final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+
+ final int appDataRemaining = appDataBuffer.remaining();
+ if (appDataRemaining > 0) {
+ final int bytesToCopy = Math.min(len, appDataBuffer.remaining());
+ appDataBuffer.get(buffer, offset, bytesToCopy);
+
+ final int bytesCopied = appDataRemaining - appDataBuffer.remaining();
+ logger.trace("{} Copied {} ({}) bytes from unencrypted application buffer to user space",
+ new Object[]{this, bytesToCopy, bytesCopied});
+ return bytesCopied;
+ }
+ return 0;
+ }
+
+ public int available() throws IOException {
+ ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+ ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+ final int buffered = appDataBuffer.remaining() + streamDataBuffer.remaining();
+ if (buffered > 0) {
+ return buffered;
+ }
+
+ final boolean wasAbleToRead = isDataAvailable();
+ if (!wasAbleToRead) {
+ return 0;
+ }
+
+ appDataBuffer = appDataManager.prepareForRead(1);
+ streamDataBuffer = streamInManager.prepareForRead(1);
+ return appDataBuffer.remaining() + streamDataBuffer.remaining();
+ }
+
+ public boolean isDataAvailable() throws IOException {
+ final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+ final ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+
+ if (appDataBuffer.remaining() > 0 || streamDataBuffer.remaining() > 0) {
+ return true;
+ }
+
+ final ByteBuffer writableBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+ final int bytesRead = channel.read(writableBuffer);
+ return (bytesRead > 0);
+ }
+
+ public int read() throws IOException {
+ final int bytesRead = read(oneByteBuffer);
+ if (bytesRead == -1) {
+ return -1;
+ }
+ return oneByteBuffer[0] & 0xFF;
+ }
+
+ public int read(final byte[] buffer) throws IOException {
+ return read(buffer, 0, buffer.length);
+ }
+
+ public int read(final byte[] buffer, final int offset, final int len) throws IOException {
+ logger.debug("{} Reading up to {} bytes of data", this, len);
+
+ if (!connected) {
+ connect();
+ }
+
+ int copied = copyFromAppDataBuffer(buffer, offset, len);
+ if (copied > 0) {
+ return copied;
+ }
+
+ appDataManager.clear();
+
+ while (true) {
+ // prepare buffers and call unwrap
+ final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
+ SSLEngineResult unwrapResponse = null;
+ final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+ unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+ logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
+
+ switch (unwrapResponse.getStatus()) {
+ case BUFFER_OVERFLOW:
+ throw new SSLHandshakeException("Buffer Overflow, which is not allowed to happen from an unwrap");
+ case BUFFER_UNDERFLOW: {
+// appDataManager.prepareForRead(engine.getSession().getApplicationBufferSize());
+
+ final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+ final int bytesRead = readData(writableInBuffer);
+ if (bytesRead < 0) {
+ return -1;
+ }
+
+ continue;
+ }
+ case CLOSED:
+ throw new IOException("Channel is closed");
+ case OK: {
+ copied = copyFromAppDataBuffer(buffer, offset, len);
+ if (copied == 0) {
+ throw new IOException("Failed to decrypt data");
+ }
+ streamInManager.compact();
+ return copied;
+ }
+ }
+ }
+ }
+
+ public void write(final int data) throws IOException {
+ write(new byte[]{(byte) data}, 0, 1);
+ }
+
+ public void write(final byte[] data) throws IOException {
+ write(data, 0, data.length);
+ }
+
+ public void write(final byte[] data, final int offset, final int len) throws IOException {
+ logger.debug("{} Writing {} bytes of data", this, len);
+
+ if (!connected) {
+ connect();
+ }
+
+ int iterations = len / MAX_WRITE_SIZE;
+ if (len % MAX_WRITE_SIZE > 0) {
+ iterations++;
+ }
+
+ for (int i = 0; i < iterations; i++) {
+ streamOutManager.clear();
+ final int itrOffset = offset + i * MAX_WRITE_SIZE;
+ final int itrLen = Math.min(len - itrOffset, MAX_WRITE_SIZE);
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(data, itrOffset, itrLen);
+
+ final BufferStateManager buffMan = new BufferStateManager(byteBuffer, Direction.READ);
+ final Status status = encryptAndWriteFully(buffMan);
+ switch (status) {
+ case BUFFER_OVERFLOW:
+ streamOutManager.ensureSize(engine.getSession().getPacketBufferSize());
+ appDataManager.ensureSize(engine.getSession().getApplicationBufferSize());
+ continue;
+ case OK:
+ continue;
+ case CLOSED:
+ throw new IOException("Channel is closed");
+ case BUFFER_UNDERFLOW:
+ throw new AssertionError("Got Buffer Underflow but should not have...");
+ }
+ }
+ }
+
+ public void interrupt() {
+ this.interrupted = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
new file mode 100644
index 0000000..154bd08
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SSLSocketChannelInputStream extends InputStream {
+
+ private final SSLSocketChannel channel;
+
+ public SSLSocketChannelInputStream(final SSLSocketChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return channel.read();
+ }
+
+ @Override
+ public int read(final byte[] b) throws IOException {
+ return channel.read(b);
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ return channel.read(b, off, len);
+ }
+
+ /**
+ * Closes the underlying SSLSocketChannel, which will also close the
+ * OutputStream and connection
+ */
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+
+ @Override
+ public int available() throws IOException {
+ return channel.available();
+ }
+
+ public boolean isDataAvailable() throws IOException {
+ return available() > 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
new file mode 100644
index 0000000..ce4e420
--- /dev/null
+++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class SSLSocketChannelOutputStream extends OutputStream {
+
+ private final SSLSocketChannel channel;
+
+ public SSLSocketChannelOutputStream(final SSLSocketChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ channel.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ channel.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ channel.write(b, off, len);
+ }
+
+ /**
+ * Closes the underlying SSLSocketChannel, which also will close the
+ * InputStream and the connection
+ */
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java b/commons/remote-communications-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
new file mode 100644
index 0000000..bd30a96
--- /dev/null
+++ b/commons/remote-communications-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.remote.io.CompressionOutputStream;
+
+import org.junit.Test;
+
+public class TestCompressionInputOutputStreams {
+
+ @Test
+ public void testSimple() throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ final byte[] data = "Hello, World!".getBytes("UTF-8");
+
+ final CompressionOutputStream cos = new CompressionOutputStream(baos);
+ cos.write(data);
+ cos.flush();
+ cos.close();
+
+ final byte[] compressedBytes = baos.toByteArray();
+ final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes));
+ final byte[] decompressed = readFully(cis);
+
+ assertTrue(Arrays.equals(data, decompressed));
+ }
+
+ @Test
+ public void testDataLargerThanBuffer() throws IOException {
+ final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r";
+
+ final StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 100; i++) {
+ sb.append(str);
+ }
+ final byte[] data = sb.toString().getBytes("UTF-8");
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192);
+ cos.write(data);
+ cos.flush();
+ cos.close();
+
+ final byte[] compressedBytes = baos.toByteArray();
+ final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes));
+ final byte[] decompressed = readFully(cis);
+
+ assertTrue(Arrays.equals(data, decompressed));
+ }
+
+ @Test
+ public void testDataLargerThanBufferWhileFlushing() throws IOException {
+ final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r";
+ final byte[] data = str.getBytes("UTF-8");
+
+ final StringBuilder sb = new StringBuilder();
+ final byte[] data1024;
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192);
+ for (int i = 0; i < 1024; i++) {
+ cos.write(data);
+ cos.flush();
+ sb.append(str);
+ }
+ cos.close();
+ data1024 = sb.toString().getBytes("UTF-8");
+
+ final byte[] compressedBytes = baos.toByteArray();
+ final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes));
+ final byte[] decompressed = readFully(cis);
+
+ assertTrue(Arrays.equals(data1024, decompressed));
+ }
+
+ @Test
+ public void testSendingMultipleFilesBackToBackOnSameStream() throws IOException {
+ final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r";
+ final byte[] data = str.getBytes("UTF-8");
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192);
+ for (int i = 0; i < 512; i++) {
+ cos.write(data);
+ cos.flush();
+ }
+ cos.close();
+
+ final CompressionOutputStream cos2 = new CompressionOutputStream(baos, 8192);
+ for (int i = 0; i < 512; i++) {
+ cos2.write(data);
+ cos2.flush();
+ }
+ cos2.close();
+
+ final byte[] data512;
+ final StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 512; i++) {
+ sb.append(str);
+ }
+ data512 = sb.toString().getBytes("UTF-8");
+
+ final byte[] compressedBytes = baos.toByteArray();
+ final ByteArrayInputStream bais = new ByteArrayInputStream(compressedBytes);
+
+ final CompressionInputStream cis = new CompressionInputStream(bais);
+ final byte[] decompressed = readFully(cis);
+ assertTrue(Arrays.equals(data512, decompressed));
+
+ final CompressionInputStream cis2 = new CompressionInputStream(bais);
+ final byte[] decompressed2 = readFully(cis2);
+ assertTrue(Arrays.equals(data512, decompressed2));
+ }
+
+ private byte[] readFully(final InputStream in) throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ final byte[] buffer = new byte[65536];
+ int len;
+ while ((len = in.read(buffer)) >= 0) {
+ baos.write(buffer, 0, len);
+ }
+
+ return baos.toByteArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/search-utils/pom.xml b/commons/search-utils/pom.xml
new file mode 100644
index 0000000..569958f
--- /dev/null
+++ b/commons/search-utils/pom.xml
@@ -0,0 +1,32 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-search-utils</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>search-utils</name>
+
+ <dependencies>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/Search.java
----------------------------------------------------------------------
diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/Search.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/Search.java
new file mode 100644
index 0000000..59b444a
--- /dev/null
+++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/Search.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
+
+import org.apache.nifi.util.search.ahocorasick.SearchState;
+
+/**
+ * Defines an interface to search for content given a set of search terms. Any
+ * implementation of search must be thread safe.
+ *
+ * @author
+ * @param <T>
+ */
+public interface Search<T> {
+
+ /**
+ * Establishes the dictionary of terms which will be searched in subsequent
+ * search calls. This can be called only once
+ *
+ * @param terms
+ */
+ void initializeDictionary(Set<SearchTerm<T>> terms);
+
+ /**
+ * Searches the given input stream for matches between the already specified
+ * dictionary and the contents scanned.
+ *
+ * @param haystack
+ * @param findAll if true will find all matches if false will find only the
+ * first match
+ * @return SearchState containing results Map might be empty which indicates
+ * no matches found but will not be null
+ * @throws IOException Thrown for any exceptions occurring while searching.
+ * @throws IllegalStateException if the dictionary has not yet been
+ * initialized
+ */
+ SearchState<T> search(InputStream haystack, boolean findAll) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
----------------------------------------------------------------------
diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
new file mode 100644
index 0000000..62de964
--- /dev/null
+++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+/**
+ * This is an immutable thread safe object representing a search term
+ *
+ * @author
+ * @param <T>
+ */
+public class SearchTerm<T> {
+
+ private final byte[] bytes;
+ private final int hashCode;
+ private final T reference;
+
+ /**
+ * Constructs a SearchTerm. Defensively copies the given byte array
+ *
+ * @param bytes
+ * @throws IllegalArgument exception if given bytes are null or 0 length
+ */
+ public SearchTerm(final byte[] bytes) {
+ this(bytes, true, null);
+ }
+
+ /**
+ * Constructs a search term. Optionally performs a defensive copy of the
+ * given byte array. If the caller indicates a defensive copy is not
+ * necessary then they must not change the given arrays state any longer
+ *
+ * @param bytes
+ * @param defensiveCopy
+ * @param reference
+ */
+ public SearchTerm(final byte[] bytes, final boolean defensiveCopy, final T reference) {
+ if (bytes == null || bytes.length == 0) {
+ throw new IllegalArgumentException();
+ }
+ if (defensiveCopy) {
+ this.bytes = Arrays.copyOf(bytes, bytes.length);
+ } else {
+ this.bytes = bytes;
+ }
+ this.hashCode = Arrays.hashCode(this.bytes);
+ this.reference = reference;
+ }
+
+ public int get(final int index) {
+ return bytes[index] & 0xff;
+ }
+
+ /**
+ * @return size in of search term in bytes
+ */
+ public int size() {
+ return bytes.length;
+ }
+
+ /**
+ * @return reference object for this given search term
+ */
+ public T getReference() {
+ return reference;
+ }
+
+ /**
+ * Determines if the given window starts with the same bytes as this term
+ *
+ * @param window Current window of bytes from the haystack being evaluated.
+ * @param windowLength The length of the window to consider
+ * @return true if this term starts with the same bytes of the given window
+ */
+ public boolean startsWith(byte[] window, int windowLength) {
+ if (windowLength > window.length) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (bytes.length < windowLength) {
+ return false;
+ }
+ for (int i = 0; i < bytes.length && i < windowLength; i++) {
+ if (bytes[i] != window[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * @return a defensive copy of the internal byte structure
+ */
+ public byte[] getBytes() {
+ return Arrays.copyOf(bytes, bytes.length);
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final SearchTerm other = (SearchTerm) obj;
+ if (this.hashCode != other.hashCode) {
+ return false;
+ }
+ return Arrays.equals(this.bytes, other.bytes);
+ }
+
+ @Override
+ public String toString() {
+ return new String(bytes);
+ }
+
+ public String toString(final Charset charset) {
+ return new String(bytes, charset);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
----------------------------------------------------------------------
diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
new file mode 100644
index 0000000..3b8afaf
--- /dev/null
+++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search.ahocorasick;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.nifi.util.search.Search;
+import org.apache.nifi.util.search.SearchTerm;
+
+public class AhoCorasick<T> implements Search<T> {
+
+ private Node root = null;
+
+ /**
+ * Constructs a new search object.
+ *
+ * @throws IllegalArgumentException if given terms are null or empty
+ */
+ public AhoCorasick() {
+ }
+
+ @Override
+ public void initializeDictionary(final Set<SearchTerm<T>> terms) {
+ if (root != null) {
+ throw new IllegalStateException();
+ }
+ root = new Node();
+ if (terms == null || terms.isEmpty()) {
+ throw new IllegalArgumentException();
+ }
+ for (final SearchTerm<T> term : terms) {
+ int i = 0;
+ Node nextNode = root;
+ while (true) {
+ nextNode = addMatch(term, i, nextNode);
+ if (nextNode == null) {
+ break; //we're done
+ }
+ i++;
+ }
+ }
+ initialize();
+ }
+
+ private Node addMatch(final SearchTerm<T> term, final int offset, final Node current) {
+ final int index = term.get(offset);
+ boolean atEnd = (offset == (term.size() - 1));
+ if (current.getNeighbor(index) == null) {
+ if (atEnd) {
+ current.setNeighbor(new Node(term), index);
+ return null;
+ }
+ current.setNeighbor(new Node(), index);
+ } else if (atEnd) {
+ current.getNeighbor(index).setMatchingTerm(term);
+ return null;
+ }
+ return current.getNeighbor(index);
+ }
+
+ private void initialize() {
+ //perform bgs to build failure links
+ final Queue<Node> queue = new LinkedList<>();
+ queue.add(root);
+ root.setFailureNode(null);
+ while (!queue.isEmpty()) {
+ final Node current = queue.poll();
+ for (int i = 0; i < 256; i++) {
+ final Node next = current.getNeighbor(i);
+ if (next != null) {
+ //traverse failure to get state
+ Node fail = current.getFailureNode();
+ while ((fail != null) && fail.getNeighbor(i) == null) {
+ fail = fail.getFailureNode();
+ }
+ if (fail != null) {
+ next.setFailureNode(fail.getNeighbor(i));
+ } else {
+ next.setFailureNode(root);
+ }
+ queue.add(next);
+ }
+ }
+ }
+ }
+
+ @Override
+ public SearchState search(final InputStream stream, final boolean findAll) throws IOException {
+ return search(stream, findAll, null);
+ }
+
+ private SearchState search(final InputStream stream, final boolean findAll, final SearchState state) throws IOException {
+ if (root == null) {
+ throw new IllegalStateException();
+ }
+ final SearchState<T> currentState = (state == null) ? new SearchState(root) : state;
+ if (!findAll && currentState.foundMatch()) {
+ throw new IllegalStateException("A match has already been found yet we're being asked to keep searching");
+ }
+ Node current = currentState.getCurrentNode();
+ int currentChar;
+ while ((currentChar = stream.read()) >= 0) {
+ currentState.incrementBytesRead(1L);
+ Node next = current.getNeighbor(currentChar);
+ if (next == null) {
+ next = current.getFailureNode();
+ while ((next != null) && next.getNeighbor(currentChar) == null) {
+ next = next.getFailureNode();
+ }
+ if (next != null) {
+ next = next.getNeighbor(currentChar);
+ } else {
+ next = root;
+ }
+ }
+ if (next == null) {
+ throw new IllegalStateException("tree out of sync");
+ }
+ //Accept condition
+ if (next.hasMatch()) {
+ currentState.addResult(next.getMatchingTerm());
+ }
+ for (Node failNode = next.getFailureNode(); failNode != null; failNode = failNode.getFailureNode()) {
+ if (failNode.hasMatch()) {
+ currentState.addResult(failNode.getMatchingTerm());
+ }
+ }
+ current = next;
+ if (currentState.foundMatch() && !findAll) {
+ break;//give up as soon as we have at least one match
+ }
+ }
+ currentState.setCurrentNode(current);
+ return currentState;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
----------------------------------------------------------------------
diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
new file mode 100644
index 0000000..0ac325c
--- /dev/null
+++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search.ahocorasick;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.util.search.SearchTerm;
+
+/**
+ *
+ * @author
+ */
+public class Node {
+
+ private final Map<Integer, Node> neighborMap;
+ private Node failureNode;
+ private SearchTerm<?> term;
+
+ Node(final SearchTerm<?> term) {
+ this();
+ this.term = term;
+ }
+
+ Node() {
+ neighborMap = new HashMap<>();
+ term = null;
+ }
+
+ void setFailureNode(final Node fail) {
+ failureNode = fail;
+ }
+
+ public Node getFailureNode() {
+ return failureNode;
+ }
+
+ public boolean hasMatch() {
+ return term != null;
+ }
+
+ void setMatchingTerm(final SearchTerm<?> term) {
+ this.term = term;
+ }
+
+ public SearchTerm<?> getMatchingTerm() {
+ return term;
+ }
+
+ public Node getNeighbor(final int index) {
+ return neighborMap.get(index);
+ }
+
+ void setNeighbor(final Node neighbor, final int index) {
+ neighborMap.put(index, neighbor);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
----------------------------------------------------------------------
diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
new file mode 100644
index 0000000..6d36ad0
--- /dev/null
+++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search.ahocorasick;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.search.SearchTerm;
+
+public class SearchState<T> {
+
+ private Node currentNode;
+ private final Map<SearchTerm<T>, List<Long>> resultMap;
+ private long bytesRead;
+
+ SearchState(final Node rootNode) {
+ resultMap = new HashMap<>(5);
+ currentNode = rootNode;
+ bytesRead = 0L;
+ }
+
+ void incrementBytesRead(final long increment) {
+ bytesRead += increment;
+ }
+
+ void setCurrentNode(final Node curr) {
+ currentNode = curr;
+ }
+
+ public Node getCurrentNode() {
+ return currentNode;
+ }
+
+ public Map<SearchTerm<T>, List<Long>> getResults() {
+ return new HashMap<>(resultMap);
+ }
+
+ void addResult(final SearchTerm matchingTerm) {
+ final List<Long> indexes = (resultMap.containsKey(matchingTerm)) ? resultMap.get(matchingTerm) : new ArrayList<Long>(5);
+ indexes.add(bytesRead);
+ resultMap.put(matchingTerm, indexes);
+ }
+
+ public boolean foundMatch() {
+ return !resultMap.isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/.gitignore
----------------------------------------------------------------------
diff --git a/commons/wali/.gitignore b/commons/wali/.gitignore
new file mode 100755
index 0000000..19f2e00
--- /dev/null
+++ b/commons/wali/.gitignore
@@ -0,0 +1,2 @@
+/target
+/target
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/pom.xml
----------------------------------------------------------------------
diff --git a/commons/wali/pom.xml b/commons/wali/pom.xml
new file mode 100644
index 0000000..ce04973
--- /dev/null
+++ b/commons/wali/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <groupId>wali</groupId>
+ <artifactId>wali</artifactId>
+
+ <version>3.0.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <name>WALI : Write-Ahead Log Implementation</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-stream-utils</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>