You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2016/11/21 21:31:08 UTC

[5/6] nifi git commit: NIFI-2954 This closes #1244. Moved StandardPropertyValidator to nifi-utils, documented scope/purpose of a few util libs, removed deps from nifi-utils.

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
new file mode 100644
index 0000000..7a09f5f
--- /dev/null
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.SocketChannel;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import org.apache.nifi.remote.exception.TransmissionDisabledException;
+import org.apache.nifi.remote.io.socket.BufferStateManager;
+import org.apache.nifi.remote.io.socket.BufferStateManager.Direction;
+import org.apache.nifi.security.util.CertificateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSLSocketChannel implements Closeable {
+
+    public static final int MAX_WRITE_SIZE = 65536;
+
+    private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class);
+    private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS);
+
+    private final String hostname;
+    private final int port;
+    private final SSLEngine engine;
+    private final SocketAddress socketAddress;
+
+    private BufferStateManager streamInManager;
+    private BufferStateManager streamOutManager;
+    private BufferStateManager appDataManager;
+
+    private SocketChannel channel;
+
+    private final byte[] oneByteBuffer = new byte[1];
+
+    private int timeoutMillis = 30000;
+    private volatile boolean connected = false;
+    private boolean handshaking = false;
+    private boolean closed = false;
+    private volatile boolean interrupted = false;
+
+    public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException {
+        this.socketAddress = new InetSocketAddress(hostname, port);
+        this.channel = SocketChannel.open();
+        this.hostname = hostname;
+        this.port = port;
+        this.engine = sslContext.createSSLEngine();
+        this.engine.setUseClientMode(client);
+        engine.setNeedClientAuth(true);
+
+        streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+    }
+
+    public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException {
+        if (!socketChannel.isConnected()) {
+            throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
+        }
+
+        this.channel = socketChannel;
+
+        this.socketAddress = socketChannel.getRemoteAddress();
+        final Socket socket = socketChannel.socket();
+        this.hostname = socket.getInetAddress().getHostName();
+        this.port = socket.getPort();
+
+        this.engine = sslContext.createSSLEngine();
+        this.engine.setUseClientMode(client);
+        this.engine.setNeedClientAuth(true);
+
+        streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+    }
+
+    public SSLSocketChannel(final SSLEngine sslEngine, final SocketChannel socketChannel) throws IOException {
+        if (!socketChannel.isConnected()) {
+            throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel");
+        }
+
+        this.channel = socketChannel;
+
+        this.socketAddress = socketChannel.getRemoteAddress();
+        final Socket socket = socketChannel.socket();
+        this.hostname = socket.getInetAddress().getHostName();
+        this.port = socket.getPort();
+
+        // don't set useClientMode or needClientAuth, use the engine as is and let the caller configure it
+        this.engine = sslEngine;
+
+        streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
+    }
+
+    public void setTimeout(final int millis) {
+        this.timeoutMillis = millis;
+    }
+
+    public int getTimeout() {
+        return timeoutMillis;
+    }
+
+    public void connect() throws IOException {
+        try {
+            channel.configureBlocking(false);
+            if (!channel.isConnected()) {
+                final long startTime = System.currentTimeMillis();
+
+                if (!channel.connect(socketAddress)) {
+                    while (!channel.finishConnect()) {
+                        if (interrupted) {
+                            throw new TransmissionDisabledException();
+                        }
+                        if (System.currentTimeMillis() > startTime + timeoutMillis) {
+                            throw new SocketTimeoutException("Timed out connecting to " + hostname + ":" + port);
+                        }
+
+                        try {
+                            Thread.sleep(50L);
+                        } catch (final InterruptedException e) {
+                        }
+                    }
+                }
+            }
+            engine.beginHandshake();
+
+            performHandshake();
+            logger.debug("{} Successfully completed SSL handshake", this);
+
+            streamInManager.clear();
+            streamOutManager.clear();
+            appDataManager.clear();
+
+            connected = true;
+        } catch (final Exception e) {
+            logger.error("{} Failed to connect due to {}", this, e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+            closeQuietly(channel);
+            engine.closeInbound();
+            engine.closeOutbound();
+            throw e;
+        }
+    }
+
+    public String getDn() throws CertificateException, SSLPeerUnverifiedException {
+        final Certificate[] certs = engine.getSession().getPeerCertificates();
+        if (certs == null || certs.length == 0) {
+            throw new SSLPeerUnverifiedException("No certificates found");
+        }
+
+        final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certs[0]);
+        cert.checkValidity();
+        return cert.getSubjectDN().getName().trim();
+    }
+
+    private void performHandshake() throws IOException {
+        // Generate handshake message
+        final byte[] emptyMessage = new byte[0];
+        handshaking = true;
+        logger.debug("{} Performing Handshake", this);
+
+        try {
+            while (true) {
+                switch (engine.getHandshakeStatus()) {
+                    case FINISHED:
+                        return;
+                    case NEED_WRAP: {
+                        final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+
+                        final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+                        final SSLEngineResult wrapHelloResult = engine.wrap(appDataOut, outboundBuffer);
+                        if (wrapHelloResult.getStatus() == Status.BUFFER_OVERFLOW) {
+                            streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+                            continue;
+                        }
+
+                        if (wrapHelloResult.getStatus() != Status.OK) {
+                            throw new SSLHandshakeException("Could not generate SSL Handshake information: SSLEngineResult: "
+                                    + wrapHelloResult.toString());
+                        }
+
+                        logger.trace("{} Handshake response after wrapping: {}", this, wrapHelloResult);
+
+                        final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
+                        final int bytesToSend = readableStreamOut.remaining();
+                        writeFully(readableStreamOut);
+                        logger.trace("{} Sent {} bytes of wrapped data for handshake", this, bytesToSend);
+
+                        streamOutManager.clear();
+                    }
+                    continue;
+                    case NEED_UNWRAP: {
+                        final ByteBuffer readableDataIn = streamInManager.prepareForRead(0);
+                        final ByteBuffer appData = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+                        // Read handshake response from other side
+                        logger.trace("{} Unwrapping: {} to {}", this, readableDataIn, appData);
+                        SSLEngineResult handshakeResponseResult = engine.unwrap(readableDataIn, appData);
+                        logger.trace("{} Handshake response after unwrapping: {}", this, handshakeResponseResult);
+
+                        if (handshakeResponseResult.getStatus() == Status.BUFFER_UNDERFLOW) {
+                            final ByteBuffer writableDataIn = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+                            final int bytesRead = readData(writableDataIn);
+                            if (bytesRead > 0) {
+                                logger.trace("{} Read {} bytes for handshake", this, bytesRead);
+                            }
+
+                            if (bytesRead < 0) {
+                                throw new SSLHandshakeException("Reached End-of-File marker while performing handshake");
+                            }
+                        } else if (handshakeResponseResult.getStatus() == Status.CLOSED) {
+                            throw new IOException("Channel was closed by peer during handshake");
+                        } else {
+                            streamInManager.compact();
+                            appDataManager.clear();
+                        }
+                    }
+                    break;
+                    case NEED_TASK:
+                        performTasks();
+                        continue;
+                    case NOT_HANDSHAKING:
+                        return;
+                }
+            }
+        } finally {
+            handshaking = false;
+        }
+    }
+
+    private void performTasks() {
+        Runnable runnable;
+        while ((runnable = engine.getDelegatedTask()) != null) {
+            runnable.run();
+        }
+    }
+
+    private void closeQuietly(final Closeable closeable) {
+        try {
+            closeable.close();
+        } catch (final Exception e) {
+        }
+    }
+
+    public void consume() throws IOException {
+        channel.shutdownInput();
+
+        final byte[] b = new byte[4096];
+        final ByteBuffer buffer = ByteBuffer.wrap(b);
+        int readCount;
+        do {
+            readCount = channel.read(buffer);
+            buffer.flip();
+        } while (readCount > 0);
+    }
+
+    private int readData(final ByteBuffer dest) throws IOException {
+        final long startTime = System.currentTimeMillis();
+
+        while (true) {
+            if (interrupted) {
+                throw new TransmissionDisabledException();
+            }
+
+            if (dest.remaining() == 0) {
+                return 0;
+            }
+
+            final int readCount = channel.read(dest);
+
+            if (readCount == 0) {
+                if (System.currentTimeMillis() > startTime + timeoutMillis) {
+                    throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port);
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+                } catch (InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException();
+                }
+
+                continue;
+            }
+
+            logger.trace("{} Read {} bytes", this, readCount);
+            return readCount;
+        }
+    }
+
+    private Status encryptAndWriteFully(final BufferStateManager src) throws IOException {
+        SSLEngineResult result = null;
+
+        final ByteBuffer buff = src.prepareForRead(0);
+        final ByteBuffer outBuff = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+
+        logger.trace("{} Encrypting {} bytes", this, buff.remaining());
+        while (buff.remaining() > 0) {
+            result = engine.wrap(buff, outBuff);
+            if (result.getStatus() == Status.OK) {
+                final ByteBuffer readableOutBuff = streamOutManager.prepareForRead(0);
+                writeFully(readableOutBuff);
+                streamOutManager.clear();
+            } else {
+                return result.getStatus();
+            }
+        }
+
+        return result.getStatus();
+    }
+
+    private void writeFully(final ByteBuffer src) throws IOException {
+        long lastByteWrittenTime = System.currentTimeMillis();
+
+        int bytesWritten = 0;
+        while (src.hasRemaining()) {
+            if (interrupted) {
+                throw new TransmissionDisabledException();
+            }
+
+            final int written = channel.write(src);
+            bytesWritten += written;
+            final long now = System.currentTimeMillis();
+            if (written > 0) {
+                lastByteWrittenTime = now;
+            } else {
+                if (now > lastByteWrittenTime + timeoutMillis) {
+                    throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port);
+                }
+                try {
+                    TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS);
+                } catch (final InterruptedException e) {
+                    close();
+                    Thread.currentThread().interrupt(); // set the interrupt status
+                    throw new ClosedByInterruptException();
+                }
+            }
+        }
+
+        logger.trace("{} Wrote {} bytes", this, bytesWritten);
+    }
+
+    public boolean isClosed() {
+        if (closed) {
+            return true;
+        }
+        // need to detect if peer has sent closure handshake...if so the answer is true
+        final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+        int readCount = 0;
+        try {
+            readCount = channel.read(writableInBuffer);
+        } catch (IOException e) {
+            logger.error("{} Failed to readData due to {}", new Object[]{this, e});
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+            readCount = -1; // treat the condition same as if End of Stream
+        }
+        if (readCount == 0) {
+            return false;
+        }
+        if (readCount > 0) {
+            logger.trace("{} Read {} bytes", this, readCount);
+
+            final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
+            final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+            try {
+                SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+                logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", this, handshaking, unwrapResponse);
+                if (unwrapResponse.getStatus().equals(Status.CLOSED)) {
+                    // Drain the incoming TCP buffer
+                    final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+                    int bytesDiscarded = channel.read(discardBuffer);
+                    while (bytesDiscarded > 0) {
+                        discardBuffer.clear();
+                        bytesDiscarded = channel.read(discardBuffer);
+                    }
+                    engine.closeInbound();
+                } else {
+                    streamInManager.compact();
+                    return false;
+                }
+            } catch (IOException e) {
+                logger.error("{} Failed to check if closed due to {}. Closing channel.", new Object[]{this, e});
+                if (logger.isDebugEnabled()) {
+                    logger.error("", e);
+                }
+            }
+        }
+        // either readCount is -1, indicating an end of stream, or the peer sent a closure handshake
+        // so go ahead and close down the channel
+        closeQuietly(channel.socket());
+        closeQuietly(channel);
+        closed = true;
+        return true;
+    }
+
+    @Override
+    public void close() throws IOException {
+        logger.debug("{} Closing Connection", this);
+        if (channel == null) {
+            return;
+        }
+
+        if (closed) {
+            return;
+        }
+
+        try {
+            engine.closeOutbound();
+
+            final byte[] emptyMessage = new byte[0];
+
+            final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage);
+            final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+            final SSLEngineResult handshakeResult = engine.wrap(appDataOut, outboundBuffer);
+
+            if (handshakeResult.getStatus() != Status.CLOSED) {
+                throw new IOException("Invalid close state - will not send network data");
+            }
+
+            final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1);
+            writeFully(readableStreamOut);
+        } finally {
+            // Drain the incoming TCP buffer
+            final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
+            try {
+                int bytesDiscarded = channel.read(discardBuffer);
+                while (bytesDiscarded > 0) {
+                    discardBuffer.clear();
+                    bytesDiscarded = channel.read(discardBuffer);
+                }
+            } catch (Exception e) {
+            }
+
+            closeQuietly(channel.socket());
+            closeQuietly(channel);
+            closed = true;
+        }
+    }
+
+    private int copyFromAppDataBuffer(final byte[] buffer, final int offset, final int len) {
+        // If any data already exists in the application data buffer, copy it to the buffer.
+        final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+
+        final int appDataRemaining = appDataBuffer.remaining();
+        if (appDataRemaining > 0) {
+            final int bytesToCopy = Math.min(len, appDataBuffer.remaining());
+            appDataBuffer.get(buffer, offset, bytesToCopy);
+
+            final int bytesCopied = appDataRemaining - appDataBuffer.remaining();
+            logger.trace("{} Copied {} ({}) bytes from unencrypted application buffer to user space",
+                    this, bytesToCopy, bytesCopied);
+            return bytesCopied;
+        }
+        return 0;
+    }
+
+    public int available() throws IOException {
+        ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+        ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+        final int buffered = appDataBuffer.remaining() + streamDataBuffer.remaining();
+        if (buffered > 0) {
+            return buffered;
+        }
+
+        final boolean wasAbleToRead = isDataAvailable();
+        if (!wasAbleToRead) {
+            return 0;
+        }
+
+        appDataBuffer = appDataManager.prepareForRead(1);
+        streamDataBuffer = streamInManager.prepareForRead(1);
+        return appDataBuffer.remaining() + streamDataBuffer.remaining();
+    }
+
+    public boolean isDataAvailable() throws IOException {
+        final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1);
+        final ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1);
+
+        if (appDataBuffer.remaining() > 0 || streamDataBuffer.remaining() > 0) {
+            return true;
+        }
+
+        final ByteBuffer writableBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+        final int bytesRead = channel.read(writableBuffer);
+        return (bytesRead > 0);
+    }
+
+    public int read() throws IOException {
+        final int bytesRead = read(oneByteBuffer);
+        if (bytesRead == -1) {
+            return -1;
+        }
+        return oneByteBuffer[0] & 0xFF;
+    }
+
+    public int read(final byte[] buffer) throws IOException {
+        return read(buffer, 0, buffer.length);
+    }
+
+    public int read(final byte[] buffer, final int offset, final int len) throws IOException {
+        logger.debug("{} Reading up to {} bytes of data", this, len);
+
+        if (!connected) {
+            connect();
+        }
+
+        int copied = copyFromAppDataBuffer(buffer, offset, len);
+        if (copied > 0) {
+            return copied;
+        }
+
+        appDataManager.clear();
+
+        while (true) {
+            // prepare buffers and call unwrap
+            final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1);
+            SSLEngineResult unwrapResponse = null;
+            final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
+            unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
+            logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", this, handshaking, unwrapResponse);
+
+            switch (unwrapResponse.getStatus()) {
+                case BUFFER_OVERFLOW:
+                    throw new SSLHandshakeException("Buffer Overflow, which is not allowed to happen from an unwrap");
+                case BUFFER_UNDERFLOW: {
+//                appDataManager.prepareForRead(engine.getSession().getApplicationBufferSize());
+
+                    final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize());
+                    final int bytesRead = readData(writableInBuffer);
+                    if (bytesRead < 0) {
+                        return -1;
+                    }
+
+                    continue;
+                }
+                case CLOSED:
+                    throw new IOException("Channel is closed");
+                case OK: {
+                    copied = copyFromAppDataBuffer(buffer, offset, len);
+                    if (copied == 0) {
+                        throw new IOException("Failed to decrypt data");
+                    }
+                    streamInManager.compact();
+                    return copied;
+                }
+            }
+        }
+    }
+
+    public void write(final int data) throws IOException {
+        write(new byte[]{(byte) data}, 0, 1);
+    }
+
+    public void write(final byte[] data) throws IOException {
+        write(data, 0, data.length);
+    }
+
+    public void write(final byte[] data, final int offset, final int len) throws IOException {
+        logger.debug("{} Writing {} bytes of data", this, len);
+
+        if (!connected) {
+            connect();
+        }
+
+        int iterations = len / MAX_WRITE_SIZE;
+        if (len % MAX_WRITE_SIZE > 0) {
+            iterations++;
+        }
+
+        for (int i = 0; i < iterations; i++) {
+            streamOutManager.clear();
+            final int itrOffset = offset + i * MAX_WRITE_SIZE;
+            final int itrLen = Math.min(len - itrOffset, MAX_WRITE_SIZE);
+            final ByteBuffer byteBuffer = ByteBuffer.wrap(data, itrOffset, itrLen);
+
+            final BufferStateManager buffMan = new BufferStateManager(byteBuffer, Direction.READ);
+            final Status status = encryptAndWriteFully(buffMan);
+            switch (status) {
+                case BUFFER_OVERFLOW:
+                    streamOutManager.ensureSize(engine.getSession().getPacketBufferSize());
+                    appDataManager.ensureSize(engine.getSession().getApplicationBufferSize());
+                    continue;
+                case OK:
+                    continue;
+                case CLOSED:
+                    throw new IOException("Channel is closed");
+                case BUFFER_UNDERFLOW:
+                    throw new AssertionError("Got Buffer Underflow but should not have...");
+            }
+        }
+    }
+
+    public void interrupt() {
+        this.interrupted = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
new file mode 100644
index 0000000..ca6de85
--- /dev/null
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SSLSocketChannelInputStream extends InputStream {
+
+    private final SSLSocketChannel channel;
+
+    public SSLSocketChannelInputStream(final SSLSocketChannel channel) {
+        this.channel = channel;
+    }
+
+    public void consume() throws IOException {
+        channel.consume();
+    }
+
+    @Override
+    public int read() throws IOException {
+        return channel.read();
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        return channel.read(b);
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws IOException {
+        return channel.read(b, off, len);
+    }
+
+    /**
+     * Closes the underlying SSLSocketChannel, which will also close the OutputStream and connection
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+
+    @Override
+    public int available() throws IOException {
+        return channel.available();
+    }
+
+    public boolean isDataAvailable() throws IOException {
+        return available() > 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
new file mode 100644
index 0000000..262cf54
--- /dev/null
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class SSLSocketChannelOutputStream extends OutputStream {
+
+    private final SSLSocketChannel channel;
+
+    public SSLSocketChannelOutputStream(final SSLSocketChannel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public void write(final int b) throws IOException {
+        channel.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        channel.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        channel.write(b, off, len);
+    }
+
+    /**
+     * Closes the underlying SSLSocketChannel, which also will close the InputStream and the connection
+     */
+    @Override
+    public void close() throws IOException {
+        channel.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-site-to-site-client/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi-commons/nifi-site-to-site-client/pom.xml
index 0187a04..fd8d33f 100644
--- a/nifi-commons/nifi-site-to-site-client/pom.xml
+++ b/nifi-commons/nifi-site-to-site-client/pom.xml
@@ -38,11 +38,17 @@
             <artifactId>nifi-utils</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.codehaus.jackson</groupId>
             <artifactId>jackson-mapper-asl</artifactId>
-            <version>1.9.13</version>
         </dependency>
-
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-client-dto</artifactId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-socket-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/pom.xml b/nifi-commons/nifi-socket-utils/pom.xml
index 5328b73..a01284b 100644
--- a/nifi-commons/nifi-socket-utils/pom.xml
+++ b/nifi-commons/nifi-socket-utils/pom.xml
@@ -29,7 +29,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-utils</artifactId>
+            <artifactId>nifi-security-utils</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/pom.xml b/nifi-commons/nifi-utils/pom.xml
index be3ac84..f5f261f 100644
--- a/nifi-commons/nifi-utils/pom.xml
+++ b/nifi-commons/nifi-utils/pom.xml
@@ -23,13 +23,21 @@
     <artifactId>nifi-utils</artifactId>
     <version>1.1.0-SNAPSHOT</version>
     <packaging>jar</packaging>
-    <!--
-    This project intentionally minimizes dependencies beyond that pulled in by the parent. It is a general purpose utility library and should keep its surface/tension minimal.
-    -->
+    <description>
+        This nifi-utils module should be a general purpose place to store widely
+        and generally useful functions that any component might want to leverage.
+        NO DEPENDENCIES should be added.  This module is likely to be leveraged by
+        every extension and should not bring along any other dependencies.  The only
+        dependency intended is the nifi-api and even this is expected to be already
+        provided in any case where it would  be used.  The typical place this util
+        would be found is within a nar and all nars already have nifi-api as a parent
+        dependency.  The nifi-api can be thought of as a NiFi Application Container level
+        dependency.
+    </description>
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-security-utils</artifactId>
+            <artifactId>nifi-api</artifactId>
         </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
new file mode 100644
index 0000000..2d1a407
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.FlowFileFilter;
+
+public class FlowFileFilters {
+
+    /**
+     * Returns a new {@link FlowFileFilter} that will pull FlowFiles until the
+     * maximum file size has been reached, or the maximum FlowFile Count was
+     * been reached (this is important because FlowFiles may be 0 bytes!). If
+     * the first FlowFile exceeds the max size, the FlowFile will be selected
+     * and no other FlowFile will be.
+     *
+     * @param maxSize the maximum size of the group of FlowFiles
+     * @param unit the unit of the <code>maxSize</code> argument
+     * @param maxCount the maximum number of FlowFiles to pull
+     * @return filter
+     */
+    public static FlowFileFilter newSizeBasedFilter(final double maxSize, final DataUnit unit, final int maxCount) {
+        final double maxBytes = DataUnit.B.convert(maxSize, unit);
+
+        return new FlowFileFilter() {
+            int count = 0;
+            long size = 0L;
+
+            @Override
+            public FlowFileFilterResult filter(final FlowFile flowFile) {
+                if (count == 0) {
+                    count++;
+                    size += flowFile.getSize();
+
+                    return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                }
+
+                if ((size + flowFile.getSize() > maxBytes) || (count + 1 > maxCount)) {
+                    return FlowFileFilterResult.REJECT_AND_TERMINATE;
+                }
+
+                count++;
+                size += flowFile.getSize();
+                return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+            }
+
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb9cbccc/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
new file mode 100644
index 0000000..a577bc8
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.nio.charset.UnsupportedCharsetException;
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.util.FormatUtils;
+
+public class StandardValidators {
+
+    //
+    //
+    // STATICALLY DEFINED VALIDATORS
+    //
+    //
+    public static final Validator ATTRIBUTE_KEY_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            final ValidationResult.Builder builder = new ValidationResult.Builder();
+            builder.subject(subject).input(input);
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                return builder.valid(true).explanation("Contains Expression Language").build();
+            }
+
+            try {
+                FlowFile.KeyValidator.validateKey(input);
+                builder.valid(true);
+            } catch (final IllegalArgumentException e) {
+                builder.valid(false).explanation(e.getMessage());
+            }
+
+            return builder.build();
+        }
+    };
+
+    public static final Validator ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            final ValidationResult.Builder builder = new ValidationResult.Builder();
+            builder.subject("Property Name").input(subject);
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                return builder.valid(true).explanation("Contains Expression Language").build();
+            }
+
+            try {
+                FlowFile.KeyValidator.validateKey(subject);
+                builder.valid(true);
+            } catch (final IllegalArgumentException e) {
+                builder.valid(false).explanation(e.getMessage());
+            }
+
+            return builder.build();
+        }
+    };
+
+    public static final Validator POSITIVE_INTEGER_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+                return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+            }
+
+            String reason = null;
+            try {
+                final int intVal = Integer.parseInt(value);
+
+                if (intVal <= 0) {
+                    reason = "not a positive value";
+                }
+            } catch (final NumberFormatException e) {
+                reason = "not a valid integer";
+            }
+
+            return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+        }
+    };
+
+    public static final Validator POSITIVE_LONG_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+                return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+            }
+
+            String reason = null;
+            try {
+                final long longVal = Long.parseLong(value);
+
+                if (longVal <= 0) {
+                    reason = "not a positive value";
+                }
+            } catch (final NumberFormatException e) {
+                reason = "not a valid 64-bit integer";
+            }
+
+            return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+        }
+    };
+
+    public static final Validator PORT_VALIDATOR = createLongValidator(1, 65535, true);
+
+    /**
+     * {@link Validator} that ensures that value's length > 0
+     */
+    public static final Validator NON_EMPTY_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            return new ValidationResult.Builder().subject(subject).input(value).valid(value != null && !value.isEmpty()).explanation(subject + " cannot be empty").build();
+        }
+    };
+
+    /**
+     * {@link Validator} that ensures that value has 1+ non-whitespace
+     * characters
+     */
+    public static final Validator NON_BLANK_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            return new ValidationResult.Builder().subject(subject).input(value)
+                    .valid(value != null && !value.trim().isEmpty())
+                    .explanation(subject
+                            + " must contain at least one character that is not white space").build();
+        }
+    };
+
+    public static final Validator BOOLEAN_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+                return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+            }
+
+            final boolean valid = "true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value);
+            final String explanation = valid ? null : "Value must be 'true' or 'false'";
+            return new ValidationResult.Builder().subject(subject).input(value).valid(valid).explanation(explanation).build();
+        }
+    };
+
+    public static final Validator INTEGER_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+                return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+            }
+
+            String reason = null;
+            try {
+                Integer.parseInt(value);
+            } catch (final NumberFormatException e) {
+                reason = "not a valid integer";
+            }
+
+            return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+        }
+    };
+
+    public static final Validator LONG_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+                return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+            }
+
+            String reason = null;
+            try {
+                Long.parseLong(value);
+            } catch (final NumberFormatException e) {
+                reason = "not a valid Long";
+            }
+
+            return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+        }
+    };
+
+    public static final Validator ISO8061_INSTANT_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+
+            try {
+                Instant.parse(input);
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid ISO8061 Instant Date").valid(true).build();
+            } catch (final Exception e) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid ISO8061 Instant Date, please enter in UTC time").valid(false).build();
+            }
+        }
+    };
+
+    public static final Validator NON_NEGATIVE_INTEGER_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+                return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+            }
+
+            String reason = null;
+            try {
+                final int intVal = Integer.parseInt(value);
+
+                if (intVal < 0) {
+                    reason = "value is negative";
+                }
+            } catch (final NumberFormatException e) {
+                reason = "value is not a valid integer";
+            }
+
+            return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+        }
+    };
+
+    public static final Validator CHARACTER_SET_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+                final ResultType resultType = context.newExpressionLanguageCompiler().getResultType(value);
+                if (!resultType.equals(ResultType.STRING)) {
+                    return new ValidationResult.Builder()
+                            .subject(subject)
+                            .input(value)
+                            .valid(false)
+                            .explanation("Expected Attribute Query to return type " + ResultType.STRING + " but query returns type " + resultType)
+                            .build();
+                }
+
+                return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+            }
+
+            String reason = null;
+            try {
+                if (!Charset.isSupported(value)) {
+                    reason = "Character Set is not supported by this JVM.";
+                }
+            } catch (final UnsupportedCharsetException uce) {
+                reason = "Character Set is not supported by this JVM.";
+            } catch (final IllegalArgumentException iae) {
+                reason = "Character Set value cannot be null.";
+            }
+
+            return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+        }
+    };
+
+    /**
+     * URL Validator that does not allow the Expression Language to be used
+     */
+    public static final Validator URL_VALIDATOR = createURLValidator();
+
+    public static final Validator URI_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+            }
+
+            try {
+                new URI(input);
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid URI").valid(true).build();
+            } catch (final Exception e) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid URI").valid(false).build();
+            }
+        }
+    };
+
+    public static final Validator REGULAR_EXPRESSION_VALIDATOR = createRegexValidator(0, Integer.MAX_VALUE, false);
+
+    public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                try {
+                    final String result = context.newExpressionLanguageCompiler().validateExpression(input, true);
+                    if (!isEmpty(result)) {
+                        return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(result).build();
+                    }
+                } catch (final Exception e) {
+                    return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(e.getMessage()).build();
+                }
+            }
+
+            return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+        }
+
+    };
+
+    /**
+     * @param value to test
+     * @return true if value is null or empty string; does not trim before
+     * testing
+     */
+    private static boolean isEmpty(final String value) {
+        return value == null || value.length() == 0;
+    }
+
+    public static final Validator TIME_PERIOD_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+            }
+
+            if (input == null) {
+                return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time Period cannot be null").build();
+            }
+            if (Pattern.compile(FormatUtils.TIME_DURATION_REGEX).matcher(input.toLowerCase()).matches()) {
+                return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+            } else {
+                return new ValidationResult.Builder()
+                        .subject(subject)
+                        .input(input)
+                        .valid(false)
+                        .explanation("Must be of format <duration> <TimeUnit> where <duration> is a "
+                                + "non-negative integer and TimeUnit is a supported Time Unit, such "
+                                + "as: nanos, millis, secs, mins, hrs, days")
+                        .build();
+            }
+        }
+    };
+
+    public static final Validator DATA_SIZE_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+            }
+
+            if (input == null) {
+                return new ValidationResult.Builder()
+                        .subject(subject)
+                        .input(input)
+                        .valid(false)
+                        .explanation("Data Size cannot be null")
+                        .build();
+            }
+            if (Pattern.compile(DataUnit.DATA_SIZE_REGEX).matcher(input.toUpperCase()).matches()) {
+                return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+            } else {
+                return new ValidationResult.Builder()
+                        .subject(subject).input(input)
+                        .valid(false)
+                        .explanation("Must be of format <Data Size> <Data Unit> where <Data Size>"
+                                + " is a non-negative integer and <Data Unit> is a supported Data"
+                                + " Unit, such as: B, KB, MB, GB, TB")
+                        .build();
+            }
+        }
+    };
+
+    public static final Validator FILE_EXISTS_VALIDATOR = new FileExistsValidator(true);
+
+    //
+    //
+    // FACTORY METHODS FOR VALIDATORS
+    //
+    //
+    public static Validator createDirectoryExistsValidator(final boolean allowExpressionLanguage, final boolean createDirectoryIfMissing) {
+        return new DirectoryExistsValidator(allowExpressionLanguage, createDirectoryIfMissing);
+    }
+
+    private static Validator createURLValidator() {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+                if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                    return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+                }
+
+                try {
+                    final String evaluatedInput = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
+                    new URL(evaluatedInput);
+                    return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid URL").valid(true).build();
+                } catch (final Exception e) {
+                    return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid URL").valid(false).build();
+                }
+            }
+        };
+    }
+
+    public static Validator createURLorFileValidator() {
+        return (subject, input, context) -> {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+            }
+
+            try {
+                PropertyValue propertyValue = context.newPropertyValue(input);
+                String evaluatedInput = (propertyValue == null) ? input : propertyValue.evaluateAttributeExpressions().getValue();
+
+                boolean validUrl = true;
+
+                // First check to see if it is a valid URL
+                try {
+                    new URL(evaluatedInput);
+                } catch (MalformedURLException mue) {
+                    validUrl = false;
+                }
+
+                boolean validFile = true;
+                if (!validUrl) {
+                    // Check to see if it is a file and it exists
+                    final File file = new File(evaluatedInput);
+                    validFile = file.exists();
+                }
+
+                final boolean valid = validUrl || validFile;
+                final String reason = valid ? "Valid URL or file" : "Not a valid URL or file";
+                return new ValidationResult.Builder().subject(subject).input(input).explanation(reason).valid(valid).build();
+
+            } catch (final Exception e) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid URL or file").valid(false).build();
+            }
+        };
+    }
+
+    public static Validator createListValidator(boolean trimEntries, boolean excludeEmptyEntries, Validator validator) {
+        return (subject, input, context) -> {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+            }
+            try {
+                if (input == null) {
+                    return new ValidationResult.Builder().subject(subject).input(null).explanation("List must have at least one non-empty element").valid(false).build();
+                }
+                final String[] list = input.split(",");
+                for (String item : list) {
+                    String itemToValidate = trimEntries ? item.trim() : item;
+                    if (!isEmpty(itemToValidate) || !excludeEmptyEntries) {
+                        ValidationResult result = validator.validate(subject, itemToValidate, context);
+                        if (!result.isValid()) {
+                            return result;
+                        }
+                    }
+                }
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid List").valid(true).build();
+            } catch (final Exception e) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Not a valid list").valid(false).build();
+            }
+        };
+    }
+
+    public static Validator createTimePeriodValidator(final long minTime, final TimeUnit minTimeUnit, final long maxTime, final TimeUnit maxTimeUnit) {
+        return new TimePeriodValidator(minTime, minTimeUnit, maxTime, maxTimeUnit);
+    }
+
+    public static Validator createAttributeExpressionLanguageValidator(final ResultType expectedResultType) {
+        return createAttributeExpressionLanguageValidator(expectedResultType, true);
+    }
+
+    public static Validator createDataSizeBoundsValidator(final long minBytesInclusive, final long maxBytesInclusive) {
+        return new Validator() {
+
+            @Override
+            public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+                if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                    return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+                }
+
+                final ValidationResult vr = DATA_SIZE_VALIDATOR.validate(subject, input, context);
+                if (!vr.isValid()) {
+                    return vr;
+                }
+                final long dataSizeBytes = DataUnit.parseDataSize(input, DataUnit.B).longValue();
+                if (dataSizeBytes < minBytesInclusive) {
+                    return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be smaller than " + minBytesInclusive + " bytes").build();
+                }
+                if (dataSizeBytes > maxBytesInclusive) {
+                    return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Cannot be larger than " + maxBytesInclusive + " bytes").build();
+                }
+                return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+            }
+        };
+
+    }
+
+    public static Validator createRegexMatchingValidator(final Pattern pattern) {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+                if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                    return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+                }
+
+                final boolean matches = pattern.matcher(input).matches();
+                return new ValidationResult.Builder()
+                        .input(input)
+                        .subject(subject)
+                        .valid(matches)
+                        .explanation(matches ? null : "Value does not match regular expression: " + pattern.pattern())
+                        .build();
+            }
+        };
+    }
+
+    /**
+     * Creates a @{link Validator} that ensure that a value is a valid Java
+     * Regular Expression with at least <code>minCapturingGroups</code>
+     * capturing groups and at most <code>maxCapturingGroups</code> capturing
+     * groups. If <code>supportAttributeExpressionLanguage</code> is set to
+     * <code>true</code>, the value may also include the Expression Language,
+     * but the result of evaluating the Expression Language will be applied
+     * before the Regular Expression is performed. In this case, the Expression
+     * Language will not support FlowFile Attributes but only System/JVM
+     * Properties
+     *
+     * @param minCapturingGroups minimum capturing groups allowed
+     * @param maxCapturingGroups maximum capturing groups allowed
+     * @param supportAttributeExpressionLanguage whether or not to support
+     * expression language
+     * @return validator
+     */
+    public static Validator createRegexValidator(final int minCapturingGroups, final int maxCapturingGroups, final boolean supportAttributeExpressionLanguage) {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+                try {
+                    final String substituted;
+                    if (supportAttributeExpressionLanguage) {
+                        try {
+                            substituted = context.newPropertyValue(value).evaluateAttributeExpressions().getValue();
+                        } catch (final Exception e) {
+                            return new ValidationResult.Builder()
+                                    .subject(subject)
+                                    .input(value)
+                                    .valid(false)
+                                    .explanation("Failed to evaluate the Attribute Expression Language due to " + e.toString())
+                                    .build();
+                        }
+                    } else {
+                        substituted = value;
+                    }
+
+                    final Pattern pattern = Pattern.compile(substituted);
+                    final int numGroups = pattern.matcher("").groupCount();
+                    if (numGroups < minCapturingGroups || numGroups > maxCapturingGroups) {
+                        return new ValidationResult.Builder()
+                                .subject(subject)
+                                .input(value)
+                                .valid(false)
+                                .explanation("RegEx is required to have between " + minCapturingGroups + " and " + maxCapturingGroups + " Capturing Groups but has " + numGroups)
+                                .build();
+                    }
+
+                    return new ValidationResult.Builder().subject(subject).input(value).valid(true).build();
+                } catch (final Exception e) {
+                    return new ValidationResult.Builder()
+                            .subject(subject)
+                            .input(value)
+                            .valid(false)
+                            .explanation("Not a valid Java Regular Expression")
+                            .build();
+                }
+
+            }
+        };
+    }
+
+    public static Validator createAttributeExpressionLanguageValidator(final ResultType expectedResultType, final boolean allowExtraCharacters) {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+                final String syntaxError = context.newExpressionLanguageCompiler().validateExpression(input, allowExtraCharacters);
+                if (syntaxError != null) {
+                    return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(syntaxError).build();
+                }
+
+                final ResultType resultType = allowExtraCharacters ? ResultType.STRING : context.newExpressionLanguageCompiler().getResultType(input);
+                if (!resultType.equals(expectedResultType)) {
+                    return new ValidationResult.Builder()
+                            .subject(subject)
+                            .input(input)
+                            .valid(false)
+                            .explanation("Expected Attribute Query to return type " + expectedResultType + " but query returns type " + resultType)
+                            .build();
+                }
+
+                return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+            }
+        };
+    }
+
+    public static Validator createLongValidator(final long minimum, final long maximum, final boolean inclusive) {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+                if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                    return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+                }
+
+                String reason = null;
+                try {
+                    final long longVal = Long.parseLong(input);
+                    if (longVal < minimum || (!inclusive && longVal == minimum) | longVal > maximum || (!inclusive && longVal == maximum)) {
+                        reason = "Value must be between " + minimum + " and " + maximum + " (" + (inclusive ? "inclusive" : "exclusive") + ")";
+                    }
+                } catch (final NumberFormatException e) {
+                    reason = "not a valid integer";
+                }
+
+                return new ValidationResult.Builder().subject(subject).input(input).explanation(reason).valid(reason == null).build();
+            }
+
+        };
+    }
+
+    //
+    //
+    // SPECIFIC VALIDATOR IMPLEMENTATIONS THAT CANNOT BE ANONYMOUS CLASSES
+    //
+    //
+    static class TimePeriodValidator implements Validator {
+
+        private final Pattern pattern;
+
+        private final long minNanos;
+        private final long maxNanos;
+
+        private final String minValueEnglish;
+        private final String maxValueEnglish;
+
+        public TimePeriodValidator(final long minValue, final TimeUnit minTimeUnit, final long maxValue, final TimeUnit maxTimeUnit) {
+            pattern = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
+
+            this.minNanos = TimeUnit.NANOSECONDS.convert(minValue, minTimeUnit);
+            this.maxNanos = TimeUnit.NANOSECONDS.convert(maxValue, maxTimeUnit);
+            this.minValueEnglish = minValue + " " + minTimeUnit.toString();
+            this.maxValueEnglish = maxValue + " " + maxTimeUnit.toString();
+        }
+
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+                return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+            }
+
+            if (input == null) {
+                return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time Period cannot be null").build();
+            }
+            final String lowerCase = input.toLowerCase();
+            final boolean validSyntax = pattern.matcher(lowerCase).matches();
+            final ValidationResult.Builder builder = new ValidationResult.Builder();
+            if (validSyntax) {
+                final long nanos = FormatUtils.getTimeDuration(lowerCase, TimeUnit.NANOSECONDS);
+
+                if (nanos < minNanos || nanos > maxNanos) {
+                    builder.subject(subject).input(input).valid(false)
+                            .explanation("Must be in the range of " + minValueEnglish + " to " + maxValueEnglish);
+                } else {
+                    builder.subject(subject).input(input).valid(true);
+                }
+            } else {
+                builder.subject(subject).input(input).valid(false)
+                        .explanation("Must be of format <duration> <TimeUnit> where <duration> is a non-negative "
+                                + "integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days");
+            }
+            return builder.build();
+        }
+    }
+
+    public static class FileExistsValidator implements Validator {
+
+        private final boolean allowEL;
+
+        public FileExistsValidator(final boolean allowExpressionLanguage) {
+            this.allowEL = allowExpressionLanguage;
+        }
+
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+                return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+            }
+
+            final String substituted;
+            if (allowEL) {
+                try {
+                    substituted = context.newPropertyValue(value).evaluateAttributeExpressions().getValue();
+                } catch (final Exception e) {
+                    return new ValidationResult.Builder().subject(subject).input(value).valid(false)
+                            .explanation("Not a valid Expression Language value: " + e.getMessage()).build();
+                }
+            } else {
+                substituted = value;
+            }
+
+            final File file = new File(substituted);
+            final boolean valid = file.exists();
+            final String explanation = valid ? null : "File " + file + " does not exist";
+            return new ValidationResult.Builder().subject(subject).input(value).valid(valid).explanation(explanation).build();
+        }
+    }
+
+    public static class StringLengthValidator implements Validator {
+
+        private final int minimum;
+        private final int maximum;
+
+        public StringLengthValidator(int minimum, int maximum) {
+            this.minimum = minimum;
+            this.maximum = maximum;
+        }
+
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            if (value.length() < minimum || value.length() > maximum) {
+                return new ValidationResult.Builder()
+                        .subject(subject)
+                        .valid(false)
+                        .input(value)
+                        .explanation(String.format("String length invalid [min: %d, max: %d]", minimum, maximum))
+                        .build();
+            } else {
+                return new ValidationResult.Builder()
+                        .valid(true)
+                        .input(value)
+                        .subject(subject)
+                        .build();
+            }
+        }
+    }
+
+    public static class DirectoryExistsValidator implements Validator {
+
+        private final boolean allowEL;
+        private final boolean create;
+
+        public DirectoryExistsValidator(final boolean allowExpressionLanguage, final boolean create) {
+            this.allowEL = allowExpressionLanguage;
+            this.create = create;
+        }
+
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
+                return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
+            }
+
+            final String substituted;
+            if (allowEL) {
+                try {
+                    substituted = context.newPropertyValue(value).evaluateAttributeExpressions().getValue();
+                } catch (final Exception e) {
+                    return new ValidationResult.Builder().subject(subject).input(value).valid(false)
+                            .explanation("Not a valid Expression Language value: " + e.getMessage()).build();
+                }
+
+                if (substituted.trim().isEmpty() && !value.trim().isEmpty()) {
+                    // User specified an Expression and nothing more... assume valid.
+                    return new ValidationResult.Builder().subject(subject).input(value).valid(true).build();
+                }
+            } else {
+                substituted = value;
+            }
+
+            String reason = null;
+            try {
+                final File file = new File(substituted);
+                if (!file.exists()) {
+                    if (!create) {
+                        reason = "Directory does not exist";
+                    } else if (!file.mkdirs()) {
+                        reason = "Directory does not exist and could not be created";
+                    }
+                } else if (!file.isDirectory()) {
+                    reason = "Path does not point to a directory";
+                }
+            } catch (final Exception e) {
+                reason = "Value is not a valid directory name";
+            }
+
+            return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
+        }
+    }
+}