You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/11/21 21:31:08 UTC
[5/6] nifi git commit: NIFI-2954 This closes #1244. Moved
StandardPropertyValidator to nifi-utils,
documented scope/purpose of a few util libs, removed deps from nifi-utils.
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
new file mode 100644
index 0000000..7a09f5f
--- /dev/null
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.io.socket.BufferStateManager;
+import org.apache.nifi.remote.io.socket.BufferStateManager.Direction;
+import org.apache.nifi.security.util.CertificateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSLSocketChannel implements Closeable {
+
+ public static final int MAX_WRITE_SIZE = 65536;
+
+ private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class);
+ private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+
+ private final String hostname;
+ private final int port;
+ private final SSLEngine engine;
+ private final SocketAddress socketAddress;
+
+ private BufferStateManager streamInManager;
+ private BufferStateManager streamOutManager;
+ private BufferStateManager appDataManager;
+
+ private SocketChannel channel;
+
+ private final byte[] oneByteBuffer = new byte[1];
+
+ private int timeoutMillis = 30000;
+ private volatile boolean connected = false;
+ private boolean handshaking = false;
+ private boolean closed = false;
+ private volatile boolean interrupted = false;
+
+ public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException {
+ this.socketAddress = new InetSocketAddress(hostname, port);
+ this.channel = SocketChannel.open();
+ this.hostname = hostname;
+ this.port = port;
+ this.engine = sslContext.createSSLEngine();
+ this.engine.setUseClientMode(client);
+ engine.setNeedClientAuth(true);
+
+ streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+ }
+
+ public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException {
+ if (!socketChannel.isConnected()) {
+ throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
+ }
+
+ this.channel = socketChannel;
+
+ this.socketAddress = socketChannel.getRemoteAddress();
+ final Socket socket = socketChannel.socket();
+ this.hostname = socket.getInetAddress().getHostName();
+ this.port = socket.getPort();
+
+ this.engine = sslContext.createSSLEngine();
+ this.engine.setUseClientMode(client);
+ this.engine.setNeedClientAuth(true);
+
+ streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+ }
+
+ public SSLSocketChannel(final SSLEngine sslEngine, final SocketChannel socketChannel) throws IOException {
+ if (!socketChannel.isConnected()) {
+ throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
+ }
+
+ this.channel = socketChannel;
+
+ this.socketAddress = socketChannel.getRemoteAddress();
+ final Socket socket = socketChannel.socket();
+ this.hostname = socket.getInetAddress().getHostName();
+ this.port = socket.getPort();
+
+ // don't set useClientMode or needClientAuth, use the engine as is and let the caller configure it
+ this.engine = sslEngine;
+
+ streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+ appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+ }
+
+ public void setTimeout(final int millis) {
+ this.timeoutMillis = millis;
+ }
+
+ public int getTimeout() {
+ return timeoutMillis;
+ }
+
+ public void connect() throws IOException {
+ try {
+ channel.configureBlocking(false);
+ if (!channel.isConnected()) {
+ final long startTime = System.currentTimeMillis();
+
+ if (!channel.connect(socketAddress)) {
+ while (!channel.finishConnect()) {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+ if (System.currentTimeMillis() > startTime + timeoutMillis) {
+ throw new SocketTimeoutException("Timed out connecting to " + hostname + ":" + port);
+ }
+
+ try {
+ Thread.sleep(50L);
+ } catch (final InterruptedException e) {
+ }
+ }
+ }
+ }
+ engine.beginHandshake();
+
+ performHandshake();
+ logger.debug("{} Successfully completed SSL handshake", this);
+
+ streamInManager.clear();
+ streamOutManager.clear();
+ appDataManager.clear();
+
+ connected = true;
+ } catch (final Exception e) {
+ logger.error("{} Failed to connect due to {}", this, e);
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ closeQuietly(channel);
+ engine.closeInbound();
+ engine.closeOutbound();
+ throw e;
+ }
+ }
+
+ public String getDn() throws CertificateException, SSLPeerUnverifiedException {
+ final Certificate[] certs = engine.getSession().getPeerCertificates();
+ if (certs == null || certs.length == 0) {
+ throw new SSLPeerUnverifiedException("No certificates found");
+ }
+
+ final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certs[0]);
+ cert.checkValidity();
+ return cert.getSubjectDN().getName().trim();
+ }
+
+ private void performHandshake() throws IOException {
+ // Generate handshake message
+ final byte[] emptyMessage = new byte[0];
+ handshaking = true;
+ logger.debug("{} Performing Handshake", this);
+
+ try {
+ while (true) {
+ switch (engine.getHandshakeStatus()) {
+ case FINISHED:
+ return;
+ case NEED_WRAP: {
+ final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+
+ final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+ final SSLEngineResult wrapHelloResult = engine.wrap(appDataOut, outboundBuffer);
+ if (wrapHelloResult.getStatus() == Status.BUFFER_OVERFLOW) {
+ streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+ continue;
+ }
+
+ if (wrapHelloResult.getStatus() != Status.OK) {
+ throw new SSLHandshakeException("Could not generate SSL Handshake information: SSLEngineResult: "
+ + wrapHelloResult.toString());
+ }
+
+ logger.trace("{} Handshake response after wrapping: {}", this, wrapHelloResult);
+
+ final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
+ final int bytesToSend = readableStreamOut.remaining();
+ writeFully(readableStreamOut);
+ logger.trace("{} Sent {} bytes of wrapped data for handshake", this, bytesToSend);
+
+ streamOutManager.clear();
+ }
+ continue;
+ case NEED_UNWRAP: {
+ final ByteBuffer readableDataIn = streamInManager.prepareForRead(0);
+ final ByteBuffer appData = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+ // Read handshake response from other side
+ logger.trace("{} Unwrapping: {} to {}", this, readableDataIn, appData);
+ SSLEngineResult handshakeResponseResult = engine.unwrap(readableDataIn, appData);
+ logger.trace("{} Handshake response after unwrapping: {}", this, handshakeResponseResult);
+
+ if (handshakeResponseResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+ final ByteBuffer writableDataIn = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+ final int bytesRead = readData(writableDataIn);
+ if (bytesRead > 0) {
+ logger.trace("{} Read {} bytes for handshake", this, bytesRead);
+ }
+
+ if (bytesRead < 0) {
+ throw new SSLHandshakeException("Reached End-of-File marker while performing handshake");
+ }
+ } else if (handshakeResponseResult.getStatus() == Status.CLOSED) {
+ throw new IOException("Channel was closed by peer during handshake");
+ } else {
+ streamInManager.compact();
+ appDataManager.clear();
+ }
+ }
+ break;
+ case NEED_TASK:
+ performTasks();
+ continue;
+ case NOT_HANDSHAKING:
+ return;
+ }
+ }
+ } finally {
+ handshaking = false;
+ }
+ }
+
+ private void performTasks() {
+ Runnable runnable;
+ while ((runnable = engine.getDelegatedTask()) != null) {
+ runnable.run();
+ }
+ }
+
+ private void closeQuietly(final Closeable closeable) {
+ try {
+ closeable.close();
+ } catch (final Exception e) {
+ }
+ }
+
+ public void consume() throws IOException {
+ channel.shutdownInput();
+
+ final byte[] b = new byte[4096];
+ final ByteBuffer buffer = ByteBuffer.wrap(b);
+ int readCount;
+ do {
+ readCount = channel.read(buffer);
+ buffer.flip();
+ } while (readCount > 0);
+ }
+
+ private int readData(final ByteBuffer dest) throws IOException {
+ final long startTime = System.currentTimeMillis();
+
+ while (true) {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ if (dest.remaining() == 0) {
+ return 0;
+ }
+
+ final int readCount = channel.read(dest);
+
+ if (readCount == 0) {
+ if (System.currentTimeMillis() > startTime + timeoutMillis) {
+ throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port);
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException();
+ }
+
+ continue;
+ }
+
+ logger.trace("{} Read {} bytes", this, readCount);
+ return readCount;
+ }
+ }
+
+ private Status encryptAndWriteFully(final BufferStateManager src) throws IOException {
+ SSLEngineResult result = null;
+
+ final ByteBuffer buff = src.prepareForRead(0);
+ final ByteBuffer outBuff = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+ logger.trace("{} Encrypting {} bytes", this, buff.remaining());
+ while (buff.remaining() > 0) {
+ result = engine.wrap(buff, outBuff);
+ if (result.getStatus() == Status.OK) {
+ final ByteBuffer readableOutBuff = streamOutManager.prepareForRead(0);
+ writeFully(readableOutBuff);
+ streamOutManager.clear();
+ } else {
+ return result.getStatus();
+ }
+ }
+
+ return result.getStatus();
+ }
+
+ private void writeFully(final ByteBuffer src) throws IOException {
+ long lastByteWrittenTime = System.currentTimeMillis();
+
+ int bytesWritten = 0;
+ while (src.hasRemaining()) {
+ if (interrupted) {
+ throw new TransmissionDisabledException();
+ }
+
+ final int written = channel.write(src);
+ bytesWritten += written;
+ final long now = System.currentTimeMillis();
+ if (written > 0) {
+ lastByteWrittenTime = now;
+ } else {
+ if (now > lastByteWrittenTime + timeoutMillis) {
+ throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port);
+ }
+ try {
+ TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+ } catch (final InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt(); // set the interrupt status
+ throw new ClosedByInterruptException();
+ }
+ }
+ }
+
+ logger.trace("{} Wrote {} bytes", this, bytesWritten);
+ }
+
+ public boolean isClosed() {
+ if (closed) {
+ return true;
+ }
+ // need to detect if peer has sent closure handshake...if so the answer is true
+ final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+ int readCount = 0;
+ try {
+ readCount = channel.read(writableInBuffer);
+ } catch (IOException e) {
+ logger.error("{} Failed to readData due to {}", new Object[]{this, e});
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ readCount = -1; // treat the condition same as if End of Stream
+ }
+ if (readCount == 0) {
+ return false;
+ }
+ if (readCount > 0) {
+ logger.trace("{} Read {} bytes", this, readCount);
+
+ final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
+ final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+ try {
+ SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+ logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", this, handshaking, unwrapResponse);
+ if (unwrapResponse.getStatus().equals(Status.CLOSED)) {
+ // Drain the incoming TCP buffer
+ final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+ int bytesDiscarded = channel.read(discardBuffer);
+ while (bytesDiscarded > 0) {
+ discardBuffer.clear();
+ bytesDiscarded = channel.read(discardBuffer);
+ }
+ engine.closeInbound();
+ } else {
+ streamInManager.compact();
+ return false;
+ }
+ } catch (IOException e) {
+ logger.error("{} Failed to check if closed due to {}. Closing channel.", new Object[]{this, e});
+ if (logger.isDebugEnabled()) {
+ logger.error("", e);
+ }
+ }
+ }
+ // either readCount is -1, indicating an end of stream, or the peer sent a closure handshake
+ // so go ahead and close down the channel
+ closeQuietly(channel.socket());
+ closeQuietly(channel);
+ closed = true;
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.debug("{} Closing Connection", this);
+ if (channel == null) {
+ return;
+ }
+
+ if (closed) {
+ return;
+ }
+
+ try {
+ engine.closeOutbound();
+
+ final byte[] emptyMessage = new byte[0];
+
+ final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+ final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+ final SSLEngineResult handshakeResult = engine.wrap(appDataOut, outboundBuffer);
+
+ if (handshakeResult.getStatus() != Status.CLOSED) {
+ throw new IOException("Invalid close state - will not send network data");
+ }
+
+ final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
+ writeFully(readableStreamOut);
+ } finally {
+ // Drain the incoming TCP buffer
+ final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+ try {
+ int bytesDiscarded = channel.read(discardBuffer);
+ while (bytesDiscarded > 0) {
+ discardBuffer.clear();
+ bytesDiscarded = channel.read(discardBuffer);
+ }
+ } catch (Exception e) {
+ }
+
+ closeQuietly(channel.socket());
+ closeQuietly(channel);
+ closed = true;
+ }
+ }
+
+ private int copyFromAppDataBuffer(final byte[] buffer, final int offset, final int len) {
+ // If any data already exists in the application data buffer, copy it to the buffer.
+ final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+
+ final int appDataRemaining = appDataBuffer.remaining();
+ if (appDataRemaining > 0) {
+ final int bytesToCopy = Math.min(len, appDataBuffer.remaining());
+ appDataBuffer.get(buffer, offset, bytesToCopy);
+
+ final int bytesCopied = appDataRemaining - appDataBuffer.remaining();
+ logger.trace("{} Copied {} ({}) bytes from unencrypted application buffer to user space",
+ this, bytesToCopy, bytesCopied);
+ return bytesCopied;
+ }
+ return 0;
+ }
+
+ public int available() throws IOException {
+ ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+ ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+ final int buffered = appDataBuffer.remaining() + streamDataBuffer.remaining();
+ if (buffered > 0) {
+ return buffered;
+ }
+
+ final boolean wasAbleToRead = isDataAvailable();
+ if (!wasAbleToRead) {
+ return 0;
+ }
+
+ appDataBuffer = appDataManager.prepareForRead(1);
+ streamDataBuffer = streamInManager.prepareForRead(1);
+ return appDataBuffer.remaining() + streamDataBuffer.remaining();
+ }
+
+ public boolean isDataAvailable() throws IOException {
+ final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+ final ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+
+ if (appDataBuffer.remaining() > 0 || streamDataBuffer.remaining() > 0) {
+ return true;
+ }
+
+ final ByteBuffer writableBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+ final int bytesRead = channel.read(writableBuffer);
+ return (bytesRead > 0);
+ }
+
+ public int read() throws IOException {
+ final int bytesRead = read(oneByteBuffer);
+ if (bytesRead == -1) {
+ return -1;
+ }
+ return oneByteBuffer[0] & 0xFF;
+ }
+
+ public int read(final byte[] buffer) throws IOException {
+ return read(buffer, 0, buffer.length);
+ }
+
+ public int read(final byte[] buffer, final int offset, final int len) throws IOException {
+ logger.debug("{} Reading up to {} bytes of data", this, len);
+
+ if (!connected) {
+ connect();
+ }
+
+ int copied = copyFromAppDataBuffer(buffer, offset, len);
+ if (copied > 0) {
+ return copied;
+ }
+
+ appDataManager.clear();
+
+ while (true) {
+ // prepare buffers and call unwrap
+ final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
+ SSLEngineResult unwrapResponse = null;
+ final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+ unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+ logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", this, handshaking, unwrapResponse);
+
+ switch (unwrapResponse.getStatus()) {
+ case BUFFER_OVERFLOW:
+ throw new SSLHandshakeException("Buffer Overflow, which is not allowed to happen from an unwrap");
+ case BUFFER_UNDERFLOW: {
+// appDataManager.prepareForRead(engine.getSession().getApplicationBufferSize());
+
+ final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+ final int bytesRead = readData(writableInBuffer);
+ if (bytesRead < 0) {
+ return -1;
+ }
+
+ continue;
+ }
+ case CLOSED:
+ throw new IOException("Channel is closed");
+ case OK: {
+ copied = copyFromAppDataBuffer(buffer, offset, len);
+ if (copied == 0) {
+ throw new IOException("Failed to decrypt data");
+ }
+ streamInManager.compact();
+ return copied;
+ }
+ }
+ }
+ }
+
+ public void write(final int data) throws IOException {
+ write(new byte[]{(byte) data}, 0, 1);
+ }
+
+ public void write(final byte[] data) throws IOException {
+ write(data, 0, data.length);
+ }
+
+ public void write(final byte[] data, final int offset, final int len) throws IOException {
+ logger.debug("{} Writing {} bytes of data", this, len);
+
+ if (!connected) {
+ connect();
+ }
+
+ int iterations = len / MAX_WRITE_SIZE;
+ if (len % MAX_WRITE_SIZE > 0) {
+ iterations++;
+ }
+
+ for (int i = 0; i < iterations; i++) {
+ streamOutManager.clear();
+ final int itrOffset = offset + i * MAX_WRITE_SIZE;
+ final int itrLen = Math.min(len - itrOffset, MAX_WRITE_SIZE);
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(data, itrOffset, itrLen);
+
+ final BufferStateManager buffMan = new BufferStateManager(byteBuffer, Direction.READ);
+ final Status status = encryptAndWriteFully(buffMan);
+ switch (status) {
+ case BUFFER_OVERFLOW:
+ streamOutManager.ensureSize(engine.getSession().getPacketBufferSize());
+ appDataManager.ensureSize(engine.getSession().getApplicationBufferSize());
+ continue;
+ case OK:
+ continue;
+ case CLOSED:
+ throw new IOException("Channel is closed");
+ case BUFFER_UNDERFLOW:
+ throw new AssertionError("Got Buffer Underflow but should not have...");
+ }
+ }
+ }
+
+ public void interrupt() {
+ this.interrupted = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
new file mode 100644
index 0000000..ca6de85
--- /dev/null
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SSLSocketChannelInputStream extends InputStream {
+
+ private final SSLSocketChannel channel;
+
+ public SSLSocketChannelInputStream(final SSLSocketChannel channel) {
+ this.channel = channel;
+ }
+
+ public void consume() throws IOException {
+ channel.consume();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return channel.read();
+ }
+
+ @Override
+ public int read(final byte[] b) throws IOException {
+ return channel.read(b);
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ return channel.read(b, off, len);
+ }
+
+ /**
+ * Closes the underlying SSLSocketChannel, which will also close the OutputStream and connection
+ */
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+
+ @Override
+ public int available() throws IOException {
+ return channel.available();
+ }
+
+ public boolean isDataAvailable() throws IOException {
+ return available() > 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
new file mode 100644
index 0000000..262cf54
--- /dev/null
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class SSLSocketChannelOutputStream extends OutputStream {
+
+ private final SSLSocketChannel channel;
+
+ public SSLSocketChannelOutputStream(final SSLSocketChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public void write(final int b) throws IOException {
+ channel.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ channel.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ channel.write(b, off, len);
+ }
+
+ /**
+ * Closes the underlying SSLSocketChannel, which also will close the InputStream and the connection
+ */
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi-commons/nifi-site-to-site-client/pom.xml
index 0187a04..fd8d33f 100644
--- a/nifi-commons/nifi-site-to-site-client/pom.xml
+++ b/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -38,11 +38,17 @@
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-security-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
- <version>1.9.13</version>
</dependency>
-
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-client-dto</artifactId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-socket-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/pom.xml b/nifi-commons/nifi-socket-utils/pom.xml
index 5328b73..a01284b 100644
--- a/nifi-commons/nifi-socket-utils/pom.xml
+++ b/nifi-commons/nifi-socket-utils/pom.xml
@@ -29,7 +29,7 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-utils</artifactId>
+ <artifactId>nifi-security-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/pom.xml b/nifi-commons/nifi-utils/pom.xml
index be3ac84..f5f261f 100644
--- a/nifi-commons/nifi-utils/pom.xml
+++ b/nifi-commons/nifi-utils/pom.xml
@@ -23,13 +23,21 @@
<artifactId>nifi-utils</artifactId>
<version>1.1.0-SNAPSHOT</version>
<packaging>jar</packaging>
- <!--
- This project intentionally minimizes dependencies beyond that pulled in by the parent. It is a general purpose utility library and should keep its surface/tension minimal.
- -->
+ <description>
+ This nifi-utils module should be a general purpose place to store widely
+ and generally useful functions that any component might want to leverage.
+ NO DEPENDENCIES should be added. This module is likely to be leveraged by
+ every extension and should not bring along any other dependencies. The only
+ dependency intended is the nifi-api and even this is expected to be already
+ provided in any case where it would be used. The typical place this util
+ would be found is within a nar and all nars already have nifi-api as a parent
+ dependency. The nifi-api can be thought of as a NiFi Application Container level
+ dependency.
+ </description>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-security-utils</artifactId>
+ <artifactId>nifi-api</artifactId>
</dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
new file mode 100644
index 0000000..2d1a407
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.FlowFileFilter;
+
+public class FlowFileFilters {
+
+ /**
+ * Returns a new {@link FlowFileFilter} that will pull FlowFiles until the
+ * maximum file size has been reached, or the maximum FlowFile Count was
+ * been reached (this is important because FlowFiles may be 0 bytes!). If
+ * the first FlowFile exceeds the max size, the FlowFile will be selected
+ * and no other FlowFile will be.
+ *
+ * @param maxSize the maximum size of the group of FlowFiles
+ * @param unit the unit of the <code>maxSize</code> argument
+ * @param maxCount the maximum number of FlowFiles to pull
+ * @return filter
+ */
+ public static FlowFileFilter newSizeBasedFilter(final double maxSize, final DataUnit unit, final int maxCount) {
+ final double maxBytes = DataUnit.B.convert(maxSize, unit);
+
+ return new FlowFileFilter() {
+ int count = 0;
+ long size = 0L;
+
+ @Override
+ public FlowFileFilterResult filter(final FlowFile flowFile) {
+ if (count == 0) {
+ count++;
+ size += flowFile.getSize();
+
+ return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+ }
+
+ if ((size + flowFile.getSize() > maxBytes) || (count + 1 > maxCount)) {
+ return FlowFileFilterResult.REJECT_AND_TERMINATE;
+ }
+
+ count++;
+ size += flowFile.getSize();
+ return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+ }
+
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
new file mode 100644
index 0000000..a577bc8
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.nio.charset.UnsupportedCharsetException;
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.util.FormatUtils;
+
+public class StandardValidators {
+
+ //
+ //
+ // STATICALLY DEFINED VALIDATORS
+ //
+ //
+ public static final Validator ATTRIBUTE_KEY_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ final ValidationResult.Builder builder = new ValidationResult.Builder();
+ builder.subject(subject).input(input);
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return builder.valid(true).explanation("Contains Expression Language").build();
+ }
+
+ try {
+ FlowFile.KeyValidator.validateKey(input);
+ builder.valid(true);
+ } catch (final IllegalArgumentException e) {
+ builder.valid(false).explanation(e.getMessage());
+ }
+
+ return builder.build();
+ }
+ };
+
+ public static final Validator ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ final ValidationResult.Builder builder = new ValidationResult.Builder();
+ builder.subject("Property Name").input(subject);
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return builder.valid(true).explanation("Contains Expression Language").build();
+ }
+
+ try {
+ FlowFile.KeyValidator.validateKey(subject);
+ builder.valid(true);
+ } catch (final IllegalArgumentException e) {
+ builder.valid(false).explanation(e.getMessage());
+ }
+
+ return builder.build();
+ }
+ };
+
+ public static final Validator POSITIVE_INTEGER_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+ }
+
+ String reason = null;
+ try {
+ final int intVal = Integer.parseInt(value);
+
+ if (intVal <= 0) {
+ reason = "not a positive value";
+ }
+ } catch (final NumberFormatException e) {
+ reason = "not a valid integer";
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+ }
+ };
+
+ public static final Validator POSITIVE_LONG_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+ }
+
+ String reason = null;
+ try {
+ final long longVal = Long.parseLong(value);
+
+ if (longVal <= 0) {
+ reason = "not a positive value";
+ }
+ } catch (final NumberFormatException e) {
+ reason = "not a valid 64-bit integer";
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+ }
+ };
+
+ public static final Validator PORT_VALIDATOR = createLongValidator(1, 65535, true);
+
+ /**
+ * {@link Validator} that ensures that value's length > 0
+ */
+ public static final Validator NON_EMPTY_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ return new ValidationResult.Builder().subject(subject).input(value).valid(value != null && !value.isEmpty()).explanation(subject + " cannot be empty").build();
+ }
+ };
+
+ /**
+ * {@link Validator} that ensures that value has 1+ non-whitespace
+ * characters
+ */
+ public static final Validator NON_BLANK_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ return new ValidationResult.Builder().subject(subject).input(value)
+ .valid(value != null && !value.trim().isEmpty())
+ .explanation(subject
+ + " must contain at least one character that is not white space").build();
+ }
+ };
+
+ public static final Validator BOOLEAN_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+ }
+
+ final boolean valid = "true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value);
+ final String explanation = valid ? null : "Value must be 'true' or 'false'";
+ return new ValidationResult.Builder().subject(subject).input(value).valid(valid).explanation(explanation).build();
+ }
+ };
+
+ public static final Validator INTEGER_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+ }
+
+ String reason = null;
+ try {
+ Integer.parseInt(value);
+ } catch (final NumberFormatException e) {
+ reason = "not a valid integer";
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+ }
+ };
+
+ public static final Validator LONG_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+ }
+
+ String reason = null;
+ try {
+ Long.parseLong(value);
+ } catch (final NumberFormatException e) {
+ reason = "not a valid Long";
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+ }
+ };
+
+ public static final Validator ISO8061_INSTANT_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+
+ try {
+ Instant.parse(input);
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid ISO8061 Instant Date").valid(true).build();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid ISO8061 Instant Date, please enter in UTC time").valid(false).build();
+ }
+ }
+ };
+
+ public static final Validator NON_NEGATIVE_INTEGER_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+ }
+
+ String reason = null;
+ try {
+ final int intVal = Integer.parseInt(value);
+
+ if (intVal < 0) {
+ reason = "value is negative";
+ }
+ } catch (final NumberFormatException e) {
+ reason = "value is not a valid integer";
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+ }
+ };
+
+ public static final Validator CHARACTER_SET_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+ final ResultType resultType = context.newExpressionLanguageCompiler().getResultType(value);
+ if (!resultType.equals(ResultType.STRING)) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(value)
+ .valid(false)
+ .explanation("Expected Attribute Query to return type " + ResultType.STRING + " but query returns type " + resultType)
+ .build();
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+ }
+
+ String reason = null;
+ try {
+ if (!Charset.isSupported(value)) {
+ reason = "Character Set is not supported by this JVM.";
+ }
+ } catch (final UnsupportedCharsetException uce) {
+ reason = "Character Set is not supported by this JVM.";
+ } catch (final IllegalArgumentException iae) {
+ reason = "Character Set value cannot be null.";
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+ }
+ };
+
+ /**
+ * URL Validator that does not allow the Expression Language to be used
+ */
+ public static final Validator URL_VALIDATOR = createURLValidator();
+
+ public static final Validator URI_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+ }
+
+ try {
+ new URI(input);
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid URI").valid(true).build();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid URI").valid(false).build();
+ }
+ }
+ };
+
+ public static final Validator REGULAR_EXPRESSION_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, false);
+
+ public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ try {
+ final String result = context.newExpressionLanguageCompiler().validateExpression(input, true);
+ if (!isEmpty(result)) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(result).build();
+ }
+ } catch (final Exception e) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(e.getMessage()).build();
+ }
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+ }
+
+ };
+
+ /**
+ * @param value to test
+ * @return true if value is null or empty string; does not trim before
+ * testing
+ */
+ private static boolean isEmpty(final String value) {
+ return value == null || value.length() == 0;
+ }
+
+ public static final Validator TIME_PERIOD_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+ }
+
+ if (input == null) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time Period cannot be null").build();
+ }
+ if (Pattern.compile(FormatUtils.TIME_DURATION_REGEX).matcher(input.toLowerCase()).matches()) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+ } else {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(false)
+ .explanation("Must be of format <duration> <TimeUnit> where <duration> is a "
+ + "non-negative integer and TimeUnit is a supported Time Unit, such "
+ + "as: nanos, millis, secs, mins, hrs, days")
+ .build();
+ }
+ }
+ };
+
+ public static final Validator DATA_SIZE_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+ }
+
+ if (input == null) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(false)
+ .explanation("Data Size cannot be null")
+ .build();
+ }
+ if (Pattern.compile(DataUnit.DATA_SIZE_REGEX).matcher(input.toUpperCase()).matches()) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+ } else {
+ return new ValidationResult.Builder()
+ .subject(subject).input(input)
+ .valid(false)
+ .explanation("Must be of format <Data Size> <Data Unit> where <Data Size>"
+ + " is a non-negative integer and <Data Unit> is a supported Data"
+ + " Unit, such as: B, KB, MB, GB, TB")
+ .build();
+ }
+ }
+ };
+
+ public static final Validator FILE_EXISTS_VALIDATOR = new FileExistsValidator(true);
+
+ //
+ //
+ // FACTORY METHODS FOR VALIDATORS
+ //
+ //
+ public static Validator createDirectoryExistsValidator(final boolean allowExpressionLanguage, final boolean createDirectoryIfMissing) {
+ return new DirectoryExistsValidator(allowExpressionLanguage, createDirectoryIfMissing);
+ }
+
+ private static Validator createURLValidator() {
+ return new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+ }
+
+ try {
+ final String evaluatedInput = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
+ new URL(evaluatedInput);
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid URL").valid(true).build();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid URL").valid(false).build();
+ }
+ }
+ };
+ }
+
+ public static Validator createURLorFileValidator() {
+ return (subject, input, context) -> {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+ }
+
+ try {
+ PropertyValue propertyValue = context.newPropertyValue(input);
+ String evaluatedInput = (propertyValue == null) ? input : propertyValue.evaluateAttributeExpressions().getValue();
+
+ boolean validUrl = true;
+
+ // First check to see if it is a valid URL
+ try {
+ new URL(evaluatedInput);
+ } catch (MalformedURLException mue) {
+ validUrl = false;
+ }
+
+ boolean validFile = true;
+ if (!validUrl) {
+ // Check to see if it is a file and it exists
+ final File file = new File(evaluatedInput);
+ validFile = file.exists();
+ }
+
+ final boolean valid = validUrl || validFile;
+ final String reason = valid ? "Valid URL or file" : "Not a valid URL or file";
+ return new ValidationResult.Builder().subject(subject).input(input).explanation(reason).valid(valid).build();
+
+ } catch (final Exception e) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid URL or file").valid(false).build();
+ }
+ };
+ }
+
+ public static Validator createListValidator(boolean trimEntries, boolean excludeEmptyEntries, Validator validator) {
+ return (subject, input, context) -> {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+ }
+ try {
+ if (input == null) {
+ return new ValidationResult.Builder().subject(subject).input(null).explanation("List must have at least one non-empty element").valid(false).build();
+ }
+ final String[] list = input.split(",");
+ for (String item : list) {
+ String itemToValidate = trimEntries ? item.trim() : item;
+ if (!isEmpty(itemToValidate) || !excludeEmptyEntries) {
+ ValidationResult result = validator.validate(subject, itemToValidate, context);
+ if (!result.isValid()) {
+ return result;
+ }
+ }
+ }
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid List").valid(true).build();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid list").valid(false).build();
+ }
+ };
+ }
+
+ public static Validator createTimePeriodValidator(final long minTime, final TimeUnit minTimeUnit, final long maxTime, final TimeUnit maxTimeUnit) {
+ return new TimePeriodValidator(minTime, minTimeUnit, maxTime, maxTimeUnit);
+ }
+
+ public static Validator createAttributeExpressionLanguageValidator(final ResultType expectedResultType) {
+ return createAttributeExpressionLanguageValidator(expectedResultType, true);
+ }
+
+ public static Validator createDataSizeBoundsValidator(final long minBytesInclusive, final long maxBytesInclusive) {
+ return new Validator() {
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+ }
+
+ final ValidationResult vr = DATA_SIZE_VALIDATOR.validate(subject, input, context);
+ if (!vr.isValid()) {
+ return vr;
+ }
+ final long dataSizeBytes = DataUnit.parseDataSize(input, DataUnit.B).longValue();
+ if (dataSizeBytes < minBytesInclusive) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be smaller than " + minBytesInclusive + " bytes").build();
+ }
+ if (dataSizeBytes > maxBytesInclusive) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be larger than " + maxBytesInclusive + " bytes").build();
+ }
+ return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+ }
+ };
+
+ }
+
+ public static Validator createRegexMatchingValidator(final Pattern pattern) {
+ return new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+ }
+
+ final boolean matches = pattern.matcher(input).matches();
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(matches)
+ .explanation(matches ? null : "Value does not match regular expression: " + pattern.pattern())
+ .build();
+ }
+ };
+ }
+
+ /**
+ * Creates a @{link Validator} that ensure that a value is a valid Java
+ * Regular Expression with at least <code>minCapturingGroups</code>
+ * capturing groups and at most <code>maxCapturingGroups</code> capturing
+ * groups. If <code>supportAttributeExpressionLanguage</code> is set to
+ * <code>true</code>, the value may also include the Expression Language,
+ * but the result of evaluating the Expression Language will be applied
+ * before the Regular Expression is performed. In this case, the Expression
+ * Language will not support FlowFile Attributes but only System/JVM
+ * Properties
+ *
+ * @param minCapturingGroups minimum capturing groups allowed
+ * @param maxCapturingGroups maximum capturing groups allowed
+ * @param supportAttributeExpressionLanguage whether or not to support
+ * expression language
+ * @return validator
+ */
+ public static Validator createRegexValidator(final int minCapturingGroups, final int maxCapturingGroups, final boolean supportAttributeExpressionLanguage) {
+ return new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ try {
+ final String substituted;
+ if (supportAttributeExpressionLanguage) {
+ try {
+ substituted = context.newPropertyValue(value).evaluateAttributeExpressions().getValue();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(value)
+ .valid(false)
+ .explanation("Failed to evaluate the Attribute Expression Language due to " + e.toString())
+ .build();
+ }
+ } else {
+ substituted = value;
+ }
+
+ final Pattern pattern = Pattern.compile(substituted);
+ final int numGroups = pattern.matcher("").groupCount();
+ if (numGroups < minCapturingGroups || numGroups > maxCapturingGroups) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(value)
+ .valid(false)
+ .explanation("RegEx is required to have between " + minCapturingGroups + " and " + maxCapturingGroups + " Capturing Groups but has " + numGroups)
+ .build();
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(value).valid(true).build();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(value)
+ .valid(false)
+ .explanation("Not a valid Java Regular Expression")
+ .build();
+ }
+
+ }
+ };
+ }
+
+ public static Validator createAttributeExpressionLanguageValidator(final ResultType expectedResultType, final boolean allowExtraCharacters) {
+ return new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ final String syntaxError = context.newExpressionLanguageCompiler().validateExpression(input, allowExtraCharacters);
+ if (syntaxError != null) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(syntaxError).build();
+ }
+
+ final ResultType resultType = allowExtraCharacters ? ResultType.STRING : context.newExpressionLanguageCompiler().getResultType(input);
+ if (!resultType.equals(expectedResultType)) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(false)
+ .explanation("Expected Attribute Query to return type " + expectedResultType + " but query returns type " + resultType)
+ .build();
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+ }
+ };
+ }
+
+ public static Validator createLongValidator(final long minimum, final long maximum, final boolean inclusive) {
+ return new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+ }
+
+ String reason = null;
+ try {
+ final long longVal = Long.parseLong(input);
+ if (longVal < minimum || (!inclusive && longVal == minimum) | longVal > maximum || (!inclusive && longVal == maximum)) {
+ reason = "Value must be between " + minimum + " and " + maximum + " (" + (inclusive ? "inclusive" : "exclusive") + ")";
+ }
+ } catch (final NumberFormatException e) {
+ reason = "not a valid integer";
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(input).explanation(reason).valid(reason == null).build();
+ }
+
+ };
+ }
+
+ //
+ //
+ // SPECIFIC VALIDATOR IMPLEMENTATIONS THAT CANNOT BE ANONYMOUS CLASSES
+ //
+ //
+ static class TimePeriodValidator implements Validator {
+
+ private final Pattern pattern;
+
+ private final long minNanos;
+ private final long maxNanos;
+
+ private final String minValueEnglish;
+ private final String maxValueEnglish;
+
+ public TimePeriodValidator(final long minValue, final TimeUnit minTimeUnit, final long maxValue, final TimeUnit maxTimeUnit) {
+ pattern = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
+
+ this.minNanos = TimeUnit.NANOSECONDS.convert(minValue, minTimeUnit);
+ this.maxNanos = TimeUnit.NANOSECONDS.convert(maxValue, maxTimeUnit);
+ this.minValueEnglish = minValue + " " + minTimeUnit.toString();
+ this.maxValueEnglish = maxValue + " " + maxTimeUnit.toString();
+ }
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+ }
+
+ if (input == null) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time Period cannot be null").build();
+ }
+ final String lowerCase = input.toLowerCase();
+ final boolean validSyntax = pattern.matcher(lowerCase).matches();
+ final ValidationResult.Builder builder = new ValidationResult.Builder();
+ if (validSyntax) {
+ final long nanos = FormatUtils.getTimeDuration(lowerCase, TimeUnit.NANOSECONDS);
+
+ if (nanos < minNanos || nanos > maxNanos) {
+ builder.subject(subject).input(input).valid(false)
+ .explanation("Must be in the range of " + minValueEnglish + " to " + maxValueEnglish);
+ } else {
+ builder.subject(subject).input(input).valid(true);
+ }
+ } else {
+ builder.subject(subject).input(input).valid(false)
+ .explanation("Must be of format <duration> <TimeUnit> where <duration> is a non-negative "
+ + "integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days");
+ }
+ return builder.build();
+ }
+ }
+
+ public static class FileExistsValidator implements Validator {
+
+ private final boolean allowEL;
+
+ public FileExistsValidator(final boolean allowExpressionLanguage) {
+ this.allowEL = allowExpressionLanguage;
+ }
+
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+ }
+
+ final String substituted;
+ if (allowEL) {
+ try {
+ substituted = context.newPropertyValue(value).evaluateAttributeExpressions().getValue();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder().subject(subject).input(value).valid(false)
+ .explanation("Not a valid Expression Language value: " + e.getMessage()).build();
+ }
+ } else {
+ substituted = value;
+ }
+
+ final File file = new File(substituted);
+ final boolean valid = file.exists();
+ final String explanation = valid ? null : "File " + file + " does not exist";
+ return new ValidationResult.Builder().subject(subject).input(value).valid(valid).explanation(explanation).build();
+ }
+ }
+
+ public static class StringLengthValidator implements Validator {
+
+ private final int minimum;
+ private final int maximum;
+
+ public StringLengthValidator(int minimum, int maximum) {
+ this.minimum = minimum;
+ this.maximum = maximum;
+ }
+
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (value.length() < minimum || value.length() > maximum) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .valid(false)
+ .input(value)
+ .explanation(String.format("String length invalid [min: %d, max: %d]", minimum, maximum))
+ .build();
+ } else {
+ return new ValidationResult.Builder()
+ .valid(true)
+ .input(value)
+ .subject(subject)
+ .build();
+ }
+ }
+ }
+
+ public static class DirectoryExistsValidator implements Validator {
+
+ private final boolean allowEL;
+ private final boolean create;
+
+ public DirectoryExistsValidator(final boolean allowExpressionLanguage, final boolean create) {
+ this.allowEL = allowExpressionLanguage;
+ this.create = create;
+ }
+
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+ }
+
+ final String substituted;
+ if (allowEL) {
+ try {
+ substituted = context.newPropertyValue(value).evaluateAttributeExpressions().getValue();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder().subject(subject).input(value).valid(false)
+ .explanation("Not a valid Expression Language value: " + e.getMessage()).build();
+ }
+
+ if (substituted.trim().isEmpty() && !value.trim().isEmpty()) {
+ // User specified an Expression and nothing more... assume valid.
+ return new ValidationResult.Builder().subject(subject).input(value).valid(true).build();
+ }
+ } else {
+ substituted = value;
+ }
+
+ String reason = null;
+ try {
+ final File file = new File(substituted);
+ if (!file.exists()) {
+ if (!create) {
+ reason = "Directory does not exist";
+ } else if (!file.mkdirs()) {
+ reason = "Directory does not exist and could not be created";
+ }
+ } else if (!file.isDirectory()) {
+ reason = "Path does not point to a directory";
+ }
+ } catch (final Exception e) {
+ reason = "Value is not a valid directory name";
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+ }
+ }
+}