You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:30:05 UTC

[42/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java b/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java
new file mode 100644
index 0000000..aa6f8f3
--- /dev/null
+++ b/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+/**
+ * String Utils based on the Apache Commons Lang String Utils.
+ * These simple util methods here allow us to avoid a dependency in the core
+ */
+public class StringUtils {
+
+    public static final String EMPTY = "";
+
+    public static boolean isBlank(final String str) {
+        if (str == null || str.isEmpty()) {
+            return true;
+        }
+        for (int i = 0; i < str.length(); i++) {
+            if (!Character.isWhitespace(str.charAt(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static boolean isEmpty(final String str) {
+        return str == null || str.isEmpty();
+    }
+
+    public static boolean startsWith(final String str, final String prefix) {
+        if (str == null || prefix == null) {
+            return (str == null && prefix == null);
+        }
+        if (prefix.length() > str.length()) {
+            return false;
+        }
+        return str.regionMatches(false, 0, prefix, 0, prefix.length());
+    }
+
+    public static String substringAfter(final String str, final String separator) {
+        if (isEmpty(str)) {
+            return str;
+        }
+        if (separator == null) {
+            return EMPTY;
+        }
+        int pos = str.indexOf(separator);
+        if (pos == -1) {
+            return EMPTY;
+        }
+        return str.substring(pos + separator.length());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-security-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-security-utils/pom.xml b/commons/nifi-security-utils/pom.xml
new file mode 100644
index 0000000..76e9ac1
--- /dev/null
+++ b/commons/nifi-security-utils/pom.xml
@@ -0,0 +1,40 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-security-utils</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>NiFi Security Utils</name>
+    <description>Contains security functionality.</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.3.2</version>
+        </dependency>
+    </dependencies>
+</project>
+

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java
new file mode 100644
index 0000000..087d891
--- /dev/null
+++ b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.security.KeyStore;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class CertificateUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(CertificateUtils.class);
+
+    /**
+     * Returns true if the given keystore can be loaded using the given keystore
+     * type and password. Returns false otherwise.
+     * @param keystore
+     * @param keystoreType
+     * @param password
+     * @return 
+     */
+    public static boolean isStoreValid(final URL keystore, final KeystoreType keystoreType, final char[] password) {
+
+        if (keystore == null) {
+            throw new IllegalArgumentException("keystore may not be null");
+        } else if (keystoreType == null) {
+            throw new IllegalArgumentException("keystore type may not be null");
+        } else if (password == null) {
+            throw new IllegalArgumentException("password may not be null");
+        }
+
+        BufferedInputStream bis = null;
+        final KeyStore ks;
+        try {
+
+            // load the keystore
+            bis = new BufferedInputStream(keystore.openStream());
+            ks = KeyStore.getInstance(keystoreType.name());
+            ks.load(bis, password);
+
+            return true;
+
+        } catch (Exception e) {
+            return false;
+        } finally {
+            if (bis != null) {
+                try {
+                    bis.close();
+                } catch (final IOException ioe) {
+                    logger.warn("Failed to close input stream", ioe);
+                }
+            }
+        }
+    }
+
+    /**
+     * Extracts the username from the specified DN. If the username cannot be
+     * extracted because the CN is in an unrecognized format, the entire CN is
+     * returned. If the CN cannot be extracted because the DN is in an
+     * unrecognized format, the entire DN is returned.
+     *
+     * @param dn
+     * @return
+     */
+    public static String extractUsername(String dn) {
+        String username = dn;
+        String cn = "";
+
+        // ensure the dn is specified
+        if (StringUtils.isNotBlank(dn)) {
+
+            // attempt to locate the cn
+            if (dn.startsWith("CN=")) {
+                cn = StringUtils.substringBetween(dn, "CN=", ",");
+            } else if (dn.startsWith("/CN=")) {
+                cn = StringUtils.substringBetween(dn, "CN=", "/");
+            } else if (dn.startsWith("C=") || dn.startsWith("/C=")) {
+                cn = StringUtils.substringAfter(dn, "CN=");
+            } else if (dn.startsWith("/") && StringUtils.contains(dn, "CN=")) {
+                cn = StringUtils.substringAfter(dn, "CN=");
+            }
+
+            // attempt to get the username from the cn
+            if (StringUtils.isNotBlank(cn)) {
+                if (cn.endsWith(")")) {
+                    username = StringUtils.substringBetween(cn, "(", ")");
+                } else if (cn.contains(" ")) {
+                    username = StringUtils.substringAfterLast(cn, " ");
+                } else {
+                    username = cn;
+                }
+            }
+        }
+
+        return username;
+    }
+
+    /**
+     * Returns a list of subject alternative names. Any name that is represented
+     * as a String by X509Certificate.getSubjectAlternativeNames() is converted
+     * to lowercase and returned.
+     *
+     * @param certificate a certificate
+     * @return a list of subject alternative names; list is never null
+     * @throws CertificateParsingException if parsing the certificate failed
+     */
+    public static List<String> getSubjectAlternativeNames(final X509Certificate certificate) throws CertificateParsingException {
+
+        final Collection<List<?>> altNames = certificate.getSubjectAlternativeNames();
+        if (altNames == null) {
+            return new ArrayList<>();
+        }
+
+        final List<String> result = new ArrayList<>();
+        for (final List<?> generalName : altNames) {
+            /* 
+             * generalName has the name type as the first element a String or 
+             * byte array for the second element.  We return any general names 
+             * that are String types.
+             * 
+             * We don't inspect the numeric name type because some certificates
+             * incorrectly put IPs and DNS names under the wrong name types.
+             */
+            final Object value = generalName.get(1);
+            if (value instanceof String) {
+                result.add(((String) value).toLowerCase());
+            }
+
+        }
+
+        return result;
+    }
+
+    private CertificateUtils() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java
----------------------------------------------------------------------
diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java
new file mode 100644
index 0000000..741fdde
--- /dev/null
+++ b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+/**
+ * Enumeration capturing essential information about the various encryption
+ * methods that might be supported.
+ *
+ * @author none
+ */
+public enum EncryptionMethod {
+
+    MD5_128AES("PBEWITHMD5AND128BITAES-CBC-OPENSSL", "BC", false),
+    MD5_256AES("PBEWITHMD5AND256BITAES-CBC-OPENSSL", "BC", false),
+    SHA1_RC2("PBEWITHSHA1ANDRC2", "BC", false),
+    SHA1_DES("PBEWITHSHA1ANDDES", "BC", false),
+    MD5_192AES("PBEWITHMD5AND192BITAES-CBC-OPENSSL", "BC", false),
+    MD5_DES("PBEWITHMD5ANDDES", "BC", false),
+    MD5_RC2("PBEWITHMD5ANDRC2", "BC", false),
+    SHA_192AES("PBEWITHSHAAND192BITAES-CBC-BC", "BC", true),
+    SHA_40RC4("PBEWITHSHAAND40BITRC4", "BC", true),
+    SHA256_128AES("PBEWITHSHA256AND128BITAES-CBC-BC", "BC", true),
+    SHA_128RC2("PBEWITHSHAAND128BITRC2-CBC", "BC", true),
+    SHA_128AES("PBEWITHSHAAND128BITAES-CBC-BC", "BC", true),
+    SHA256_192AES("PBEWITHSHA256AND192BITAES-CBC-BC", "BC", true),
+    SHA_2KEYTRIPLEDES("PBEWITHSHAAND2-KEYTRIPLEDES-CBC", "BC", true),
+    SHA256_256AES("PBEWITHSHA256AND256BITAES-CBC-BC", "BC", true),
+    SHA_40RC2("PBEWITHSHAAND40BITRC2-CBC", "BC", true),
+    SHA_256AES("PBEWITHSHAAND256BITAES-CBC-BC", "BC", true),
+    SHA_3KEYTRIPLEDES("PBEWITHSHAAND3-KEYTRIPLEDES-CBC", "BC", true),
+    SHA_TWOFISH("PBEWITHSHAANDTWOFISH-CBC", "BC", true),
+    SHA_128RC4("PBEWITHSHAAND128BITRC4", "BC", true);
+    private final String algorithm;
+    private final String provider;
+    private final boolean unlimitedStrength;
+
+    EncryptionMethod(String algorithm, String provider, boolean unlimitedStrength) {
+        this.algorithm = algorithm;
+        this.provider = provider;
+        this.unlimitedStrength = unlimitedStrength;
+    }
+
+    public String getProvider() {
+        return provider;
+    }
+
+    public String getAlgorithm() {
+        return algorithm;
+    }
+
+    /**
+     * @return true if algorithm requires unlimited strength policies
+     */
+    public boolean isUnlimitedStrength() {
+        return unlimitedStrength;
+    }
+
+    @Override
+    public String toString() {
+        final ToStringBuilder builder = new ToStringBuilder(this);
+        ToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE);
+        builder.append("algorithm name", algorithm);
+        builder.append("Requires unlimited strength JCE policy", unlimitedStrength);
+        builder.append("Algorithm Provider", provider);
+        return builder.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java
----------------------------------------------------------------------
diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java
new file mode 100644
index 0000000..18574bb
--- /dev/null
+++ b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util;
+
+/**
+ * Keystore types.
+ */
+public enum KeystoreType {
+
+    PKCS12,
+    JKS;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java
----------------------------------------------------------------------
diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java
new file mode 100644
index 0000000..9abfcc3
--- /dev/null
+++ b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util;
+
+import java.io.PrintWriter;
+import java.io.Writer;
+
+/**
+ * Types of security stores and their related Java system properties.
+ */
+public enum SecurityStoreTypes {
+
+    TRUSTSTORE(
+            "javax.net.ssl.trustStore",
+            "javax.net.ssl.trustStorePassword",
+            "javax.net.ssl.trustStoreType"),
+    KEYSTORE(
+            "javax.net.ssl.keyStore",
+            "javax.net.ssl.keyStorePassword",
+            "javax.net.ssl.keyStoreType");
+
+    /**
+     * Logs the keystore and truststore Java system property values to the given
+     * writer. It logPasswords is true, then the keystore and truststore
+     * password property values are logged.
+     *
+     * @param writer a writer to log to
+     *
+     * @param logPasswords true if passwords should be logged; false otherwise
+     */
+    public static void logProperties(final Writer writer,
+            final boolean logPasswords) {
+        if (writer == null) {
+            return;
+        }
+
+        PrintWriter pw = new PrintWriter(writer);
+
+        // keystore properties
+        pw.println(
+                KEYSTORE.getStoreProperty() + " = " + System.getProperty(KEYSTORE.getStoreProperty()));
+
+        if (logPasswords) {
+            pw.println(
+                    KEYSTORE.getStorePasswordProperty() + " = "
+                    + System.getProperty(KEYSTORE.getStoreProperty()));
+        }
+
+        pw.println(
+                KEYSTORE.getStoreTypeProperty() + " = "
+                + System.getProperty(KEYSTORE.getStoreTypeProperty()));
+
+        // truststore properties
+        pw.println(
+                TRUSTSTORE.getStoreProperty() + " = "
+                + System.getProperty(TRUSTSTORE.getStoreProperty()));
+
+        if (logPasswords) {
+            pw.println(
+                    TRUSTSTORE.getStorePasswordProperty() + " = "
+                    + System.getProperty(TRUSTSTORE.getStoreProperty()));
+        }
+
+        pw.println(
+                TRUSTSTORE.getStoreTypeProperty() + " = "
+                + System.getProperty(TRUSTSTORE.getStoreTypeProperty()));
+        pw.flush();
+    }
+
+    /**
+     * the Java system property for setting the keystore (or truststore) path
+     */
+    private String storeProperty = "";
+
+    /**
+     * the Java system property for setting the keystore (or truststore)
+     * password
+     */
+    private String storePasswordProperty = "";
+
+    /**
+     * the Java system property for setting the keystore (or truststore) type
+     */
+    private String storeTypeProperty = "";
+
+    /**
+     * Creates an instance.
+     *
+     * @param storeProperty the Java system property for setting the keystore (
+     * or truststore) path
+     * @param storePasswordProperty the Java system property for setting the
+     * keystore (or truststore) password
+     * @param storeTypeProperty the Java system property for setting the
+     * keystore (or truststore) type
+     */
+    SecurityStoreTypes(final String storeProperty,
+            final String storePasswordProperty,
+            final String storeTypeProperty) {
+        this.storeProperty = storeProperty;
+        this.storePasswordProperty = storePasswordProperty;
+        this.storeTypeProperty = storeTypeProperty;
+    }
+
+    /**
+     * Returns the keystore (or truststore) property.
+     *
+     * @return the keystore (or truststore) property
+     */
+    public String getStoreProperty() {
+        return storeProperty;
+    }
+
+    /**
+     * Returns the keystore (or truststore) password property.
+     *
+     * @return the keystore (or truststore) password property
+     */
+    public String getStorePasswordProperty() {
+        return storePasswordProperty;
+    }
+
+    /**
+     * Returns the keystore (or truststore) type property.
+     *
+     * @return the keystore (or truststore) type property
+     */
+    public String getStoreTypeProperty() {
+        return storeTypeProperty;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
----------------------------------------------------------------------
diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
new file mode 100644
index 0000000..2371b0c
--- /dev/null
+++ b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+
+/**
+ * A factory for creating SSL contexts using the application's security
+ * properties.
+ *
+ * @author unattributed
+ */
+public final class SslContextFactory {
+
+    public static enum ClientAuth {
+
+        WANT,
+        REQUIRED,
+        NONE
+    }
+
+    /**
+     * Creates a SSLContext instance using the given information.
+     *
+     * @param keystore the full path to the keystore
+     * @param keystorePasswd the keystore password
+     * @param keystoreType the type of keystore (e.g., PKCS12, JKS)
+     * @param truststore the full path to the truststore
+     * @param truststorePasswd the truststore password
+     * @param truststoreType the type of truststore (e.g., PKCS12, JKS)
+     * @param clientAuth the type of client authentication
+     *
+     * @return a SSLContext instance
+     * @throws java.security.KeyStoreException
+     * @throws java.io.IOException
+     * @throws java.security.NoSuchAlgorithmException
+     * @throws java.security.cert.CertificateException
+     * @throws java.security.UnrecoverableKeyException
+     * @throws java.security.KeyManagementException
+     */
+    public static SSLContext createSslContext(
+            final String keystore, final char[] keystorePasswd, final String keystoreType,
+            final String truststore, final char[] truststorePasswd, final String truststoreType,
+            final ClientAuth clientAuth)
+            throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
+            UnrecoverableKeyException, KeyManagementException {
+
+        // prepare the keystore
+        final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+        try (final InputStream keyStoreStream = new FileInputStream(keystore)) {
+            keyStore.load(keyStoreStream, keystorePasswd);
+        }
+        final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(keyStore, keystorePasswd);
+
+        // prepare the truststore
+        final KeyStore trustStore = KeyStore.getInstance(truststoreType);
+        try (final InputStream trustStoreStream = new FileInputStream(truststore)) {
+            trustStore.load(trustStoreStream, truststorePasswd);
+        }
+        final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        trustManagerFactory.init(trustStore);
+
+        // initialize the ssl context
+        final SSLContext sslContext = SSLContext.getInstance("TLS");
+        sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom());
+        if (ClientAuth.REQUIRED == clientAuth) {
+            sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
+        } else if (ClientAuth.WANT == clientAuth) {
+            sslContext.getDefaultSSLParameters().setWantClientAuth(true);
+        } else {
+            sslContext.getDefaultSSLParameters().setWantClientAuth(false);
+        }
+
+        return sslContext;
+
+    }
+
+    /**
+     * Creates a SSLContext instance using the given information.
+     *
+     * @param keystore the full path to the keystore
+     * @param keystorePasswd the keystore password
+     * @param keystoreType the type of keystore (e.g., PKCS12, JKS)
+     *
+     * @return a SSLContext instance
+     * @throws java.security.KeyStoreException
+     * @throws java.io.IOException
+     * @throws java.security.NoSuchAlgorithmException
+     * @throws java.security.cert.CertificateException
+     * @throws java.security.UnrecoverableKeyException
+     * @throws java.security.KeyManagementException
+     */
+    public static SSLContext createSslContext(
+            final String keystore, final char[] keystorePasswd, final String keystoreType)
+            throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
+            UnrecoverableKeyException, KeyManagementException {
+
+        // prepare the keystore
+        final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+        try (final InputStream keyStoreStream = new FileInputStream(keystore)) {
+            keyStore.load(keyStoreStream, keystorePasswd);
+        }
+        final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(keyStore, keystorePasswd);
+
+        // initialize the ssl context
+        final SSLContext ctx = SSLContext.getInstance("TLS");
+        ctx.init(keyManagerFactory.getKeyManagers(), new TrustManager[0], new SecureRandom());
+
+        return ctx;
+
+    }
+
+    /**
+     * Creates a SSLContext instance using the given information.
+     *
+     * @param truststore the full path to the truststore
+     * @param truststorePasswd the truststore password
+     * @param truststoreType the type of truststore (e.g., PKCS12, JKS)
+     *
+     * @return a SSLContext instance
+     * @throws java.security.KeyStoreException
+     * @throws java.io.IOException
+     * @throws java.security.NoSuchAlgorithmException
+     * @throws java.security.cert.CertificateException
+     * @throws java.security.UnrecoverableKeyException
+     * @throws java.security.KeyManagementException
+     */
+    public static SSLContext createTrustSslContext(
+            final String truststore, final char[] truststorePasswd, final String truststoreType)
+            throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
+            UnrecoverableKeyException, KeyManagementException {
+
+        // prepare the truststore
+        final KeyStore trustStore = KeyStore.getInstance(truststoreType);
+        try (final InputStream trustStoreStream = new FileInputStream(truststore)) {
+            trustStore.load(trustStoreStream, truststorePasswd);
+        }
+        final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        trustManagerFactory.init(trustStore);
+
+        // initialize the ssl context
+        final SSLContext ctx = SSLContext.getInstance("TLS");
+        ctx.init(new KeyManager[0], trustManagerFactory.getTrustManagers(), new SecureRandom());
+
+        return ctx;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/pom.xml b/commons/nifi-socket-utils/pom.xml
new file mode 100644
index 0000000..8e06433
--- /dev/null
+++ b/commons/nifi-socket-utils/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-socket-utils</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>NiFi Socket Utils</name>
+    <description>Utilities for socket communication</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-logging-utils</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-net</groupId>
+            <artifactId>commons-net</artifactId>
+            <version>3.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.4</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
new file mode 100644
index 0000000..172c593
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.io.nio.consumer.StreamConsumer;
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author none
+ */
+public abstract class AbstractChannelReader implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChannelReader.class);
+    private final String uniqueId;
+    private final SelectionKey key;
+    private final BufferPool bufferPool;
+    private final StreamConsumer consumer;
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+    private final AtomicReference<ScheduledFuture<?>> future = new AtomicReference<>(null);//the future on which this reader runs...
+
+    public AbstractChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
+        this.uniqueId = id;
+        this.key = key;
+        this.bufferPool = empties;
+        this.consumer = consumerFactory.newInstance(id);
+        consumer.setReturnBufferQueue(bufferPool);
+    }
+
+    protected void setScheduledFuture(final ScheduledFuture<?> future) {
+        this.future.set(future);
+    }
+
+    protected ScheduledFuture<?> getScheduledFuture() {
+        return future.get();
+    }
+
+    protected SelectionKey getSelectionKey() {
+        return key;
+    }
+
+    public boolean isClosed() {
+        return isClosed.get();
+    }
+
+    private void closeStream() {
+        if (isClosed.get()) {
+            return;
+        }
+        try {
+            isClosed.set(true);
+            future.get().cancel(false);
+            key.cancel();
+            key.channel().close();
+        } catch (final IOException ioe) {
+            LOGGER.warn("Unable to cleanly close stream due to " + ioe);
+        } finally {
+            consumer.signalEndOfStream();
+        }
+    }
+
+    /**
+     * Allows a subclass to specifically handle how it reads from the given
+     * key's channel into the given buffer.
+     *
+     * @param key
+     * @param buffer
+     * @return the number of bytes read in the final read cycle. A value of zero
+     * or more indicates the channel is still open but a value of -1 indicates
+     * end of stream.
+     * @throws IOException
+     */
+    protected abstract int fillBuffer(SelectionKey key, ByteBuffer buffer) throws IOException;
+
+    @Override
+    public final void run() {
+        if (!key.isValid() || consumer.isConsumerFinished()) {
+            closeStream();
+            return;
+        }
+        if (!key.isReadable()) {
+            return;//there is nothing available to read...or we aren't allow to read due to throttling
+        }
+        ByteBuffer buffer = null;
+        try {
+            buffer = bufferPool.poll();
+            if (buffer == null) {
+                return; // no buffers available - come back later
+            }
+            final int bytesRead = fillBuffer(key, buffer);
+            buffer.flip();
+            if (buffer.remaining() > 0) {
+                consumer.addFilledBuffer(buffer);
+                buffer = null; //clear the reference - is now the consumer's responsiblity
+            } else {
+                buffer.clear();
+                bufferPool.returnBuffer(buffer, 0);
+                buffer = null; //clear the reference - is now back to the queue
+            }
+            if (bytesRead < 0) { //we've reached the end
+                closeStream();
+            }
+        } catch (final Exception ioe) {
+            closeStream();
+            LOGGER.error("Closed channel reader " + this + " due to " + ioe);
+        } finally {
+            if (buffer != null) {
+                buffer.clear();
+                bufferPool.returnBuffer(buffer, 0);
+            }
+        }
+    }
+
+    @Override
+    public final boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+        if (obj.getClass() != getClass()) {
+            return false;
+        }
+        AbstractChannelReader rhs = (AbstractChannelReader) obj;
+        return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals();
+    }
+
+    @Override
+    public final int hashCode() {
+        return new HashCodeBuilder(17, 37).append(uniqueId).toHashCode();
+    }
+
+    @Override
+    public final String toString() {
+        return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
new file mode 100644
index 0000000..a413ad2
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author none
+ */
+public class BufferPool implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BufferPool.class);
+    final BlockingQueue<ByteBuffer> bufferPool;
+    private final static double ONE_MB = 1 << 20;
+    private Calendar lastRateSampleTime = Calendar.getInstance();
+    private final Calendar startTime = Calendar.getInstance();
+    double lastRateSampleMBps = -1.0;
+    double overallMBps = -1.0;
+    private long totalBytesExtracted = 0L;
+    private long lastTotalBytesExtracted = 0L;
+    final double maxRateMBps;
+
+    public BufferPool(final int bufferCount, final int bufferCapacity, final boolean allocateDirect, final double maxRateMBps) {
+        bufferPool = new LinkedBlockingDeque<>(BufferPool.createBuffers(bufferCount, bufferCapacity, allocateDirect));
+        this.maxRateMBps = maxRateMBps;
+    }
+
+    /**
+     * Returns the given buffer to the pool - and clears it.
+     *
+     * @param buffer
+     * @param bytesProcessed
+     * @return
+     */
+    public synchronized boolean returnBuffer(ByteBuffer buffer, final int bytesProcessed) {
+        totalBytesExtracted += bytesProcessed;
+        buffer.clear();
+        return bufferPool.add(buffer);
+    }
+
+    //here we enforce the desired rate we want by restricting access to buffers when we're over rate
+    public synchronized ByteBuffer poll() {
+        computeRate();
+        final double weightedAvg = (lastRateSampleMBps * 0.7) + (overallMBps * 0.3);
+        if (overallMBps >= maxRateMBps || weightedAvg >= maxRateMBps) {
+            return null;
+        }
+        return bufferPool.poll();
+    }
+
+    public int size() {
+        return bufferPool.size();
+    }
+
+    private synchronized void computeRate() {
+        final Calendar now = Calendar.getInstance();
+        final long measurementDurationMillis = now.getTimeInMillis() - lastRateSampleTime.getTimeInMillis();
+        final double duractionSecs = ((double) measurementDurationMillis) / 1000.0;
+        if (duractionSecs >= 0.75) { //recompute every 3/4 second or when we're too fast
+            final long totalDuractionMillis = now.getTimeInMillis() - startTime.getTimeInMillis();
+            final double totalDurationSecs = ((double) totalDuractionMillis) / 1000.0;
+            final long differenceBytes = totalBytesExtracted - lastTotalBytesExtracted;
+            lastTotalBytesExtracted = totalBytesExtracted;
+            lastRateSampleTime = now;
+            final double bps = ((double) differenceBytes) / duractionSecs;
+            final double totalBps = ((double) totalBytesExtracted / totalDurationSecs);
+            lastRateSampleMBps = bps / ONE_MB;
+            overallMBps = totalBps / ONE_MB;
+        }
+    }
+
+    public static List<ByteBuffer> createBuffers(final int bufferCount, final int bufferCapacity, final boolean allocateDirect) {
+        final List<ByteBuffer> buffers = new ArrayList<>();
+        for (int i = 0; i < bufferCount; i++) {
+            final ByteBuffer buffer = (allocateDirect) ? ByteBuffer.allocateDirect(bufferCapacity) : ByteBuffer.allocate(bufferCapacity);
+            buffers.add(buffer);
+        }
+        return buffers;
+    }
+
+    private void logChannelReadRates() {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(String.format("Overall rate= %,.4f MB/s / Current Rate= %,.4f MB/s / Total Bytes Read= %d", overallMBps, lastRateSampleMBps, totalBytesExtracted));
+        }
+    }
+
+    @Override
+    public void run() {
+        computeRate();
+        logChannelReadRates();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
new file mode 100644
index 0000000..2ae2c07
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author none
+ */
+public final class ChannelDispatcher implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ChannelDispatcher.class);
+    private final Selector serverSocketSelector;
+    private final Selector socketChannelSelector;
+    private final ScheduledExecutorService executor;
+    private final BufferPool emptyBuffers;
+    private final StreamConsumerFactory factory;
+    private final AtomicLong channelReaderFrequencyMilliseconds = new AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS);
+    private final long timeout;
+    private volatile boolean stop = false;
+    public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L;
+
+    public ChannelDispatcher(final Selector serverSocketSelector, final Selector socketChannelSelector, final ScheduledExecutorService service,
+            final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit) {
+        this.serverSocketSelector = serverSocketSelector;
+        this.socketChannelSelector = socketChannelSelector;
+        this.executor = service;
+        this.factory = factory;
+        emptyBuffers = buffers;
+        this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+    }
+
+    public void setChannelReaderFrequency(final long period, final TimeUnit timeUnit) {
+        channelReaderFrequencyMilliseconds.set(TimeUnit.MILLISECONDS.convert(period, timeUnit));
+    }
+
+    @Override
+    public void run() {
+        while (!stop) {
+            try {
+                selectServerSocketKeys();
+                selectSocketChannelKeys();
+            } catch (final Exception ex) {
+                LOGGER.warn("Key selection failed: {} Normal during shutdown.", new Object[]{ex});
+            }
+        }
+    }
+
+    /*
+     * When serverSocketsChannels are registered with the selector, want each invoke of this method to loop through all
+     * channels' keys.
+     * 
+     * @throws IOException
+     */
+    private void selectServerSocketKeys() throws IOException {
+        int numSelected = serverSocketSelector.select(timeout);
+        if (numSelected == 0) {
+            return;
+        }
+
+        // for each registered server socket - see if any connections are waiting to be established
+        final Iterator<SelectionKey> itr = serverSocketSelector.selectedKeys().iterator();
+        while (itr.hasNext()) {
+            SelectionKey serverSocketkey = itr.next();
+            final SelectableChannel channel = serverSocketkey.channel();
+            AbstractChannelReader reader = null;
+            if (serverSocketkey.isValid() && serverSocketkey.isAcceptable()) {
+                final ServerSocketChannel ssChannel = (ServerSocketChannel) serverSocketkey.channel();
+                final SocketChannel sChannel = ssChannel.accept();
+                if (sChannel != null) {
+                    sChannel.configureBlocking(false);
+                    final SelectionKey socketChannelKey = sChannel.register(socketChannelSelector, SelectionKey.OP_READ);
+                    final String readerId = sChannel.socket().toString();
+                    reader = new SocketChannelReader(readerId, socketChannelKey, emptyBuffers, factory);
+                    final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L,
+                            channelReaderFrequencyMilliseconds.get(), TimeUnit.MILLISECONDS);
+                    reader.setScheduledFuture(readerFuture);
+                    socketChannelKey.attach(reader);
+                }
+            }
+            itr.remove(); // do this so that the next select operation returns a positive value; otherwise, it will return 0.
+            if (reader != null && LOGGER.isDebugEnabled()) {
+                LOGGER.debug(this + " New Connection established.  Server channel: " + channel + " Reader: " + reader);
+            }
+        }
+    }
+
+    /*
+     * When invoking this method, only want to iterate through the selected keys once. When a key is entered into the selectors
+     * selected key set, select will return a positive value. The next select will return 0 if nothing has changed. Note that
+     * the selected key set is not manually changed via a remove operation.
+     * 
+     * @throws IOException
+     */
+    private void selectSocketChannelKeys() throws IOException {
+        // once a channel associated with a key in this selector is 'ready', it causes this select to immediately return.
+        // thus, for each trip through the run() we only get hit with one real timeout...the one in selectServerSocketKeys.
+        int numSelected = socketChannelSelector.select(timeout);
+        if (numSelected == 0) {
+            return;
+        }
+
+        for (SelectionKey socketChannelKey : socketChannelSelector.selectedKeys()) {
+            final SelectableChannel channel = socketChannelKey.channel();
+            AbstractChannelReader reader = null;
+            // there are 2 kinds of channels in this selector, both which have their own readers and are executed in their own
+            // threads. We will get here whenever a new SocketChannel is created due to an incoming connection. However,
+            // for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only
+            // way to tell if it's new is the lack of an attachment. 
+            if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) {
+                reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory);
+                socketChannelKey.attach(reader);
+                final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(),
+                        TimeUnit.MILLISECONDS);
+                reader.setScheduledFuture(readerFuture);
+            }
+            if (reader != null && LOGGER.isDebugEnabled()) {
+                LOGGER.debug(this + " New Connection established.  Server channel: " + channel + " Reader: " + reader);
+            }
+        }
+
+    }
+
+    public void stop() {
+        stop = true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
new file mode 100644
index 0000000..b0a1cfb
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides the entry point to NIO based socket listeners for NiFi
+ * processors and services. There are 2 supported types of Listeners, Datagram
+ * (UDP based transmissions) and ServerSocket (TCP based transmissions). This
+ * will create the ChannelDispatcher, which is a Runnable and is controlled via
+ * the ScheduledExecutorService, which is also created by this class. The
+ * ChannelDispatcher handles connections to the ServerSocketChannels and creates
+ * the readers associated with the resulting SocketChannels. Additionally, this
+ * creates and manages two Selectors, one for ServerSocketChannels and another
+ * for SocketChannels and DatagramChannels.
+ *
+ * The threading model for this consists of one thread for the
+ * ChannelDispatcher, one thread per added SocketChannel reader, one thread per
+ * added DatagramChannel reader. The ChannelDispatcher is not scheduled with
+ * fixed delay as the others are. It is throttled by the provided timeout value.
+ * Within the ChannelDispatcher there are two blocking operations which will
+ * block for the given timeout each time through the enclosing loop.
+ *
+ * All channels are cached in one of the two Selectors via their SelectionKey.
+ * The serverSocketSelector maintains all the added ServerSocketChannels; the
+ * socketChannelSelector maintains the all the add DatagramChannels and the
+ * created SocketChannels. Further, the SelectionKey of the DatagramChannel and
+ * the SocketChannel is injected with the channel's associated reader.
+ *
+ * All ChannelReaders will get throttled by the unavailability of buffers in the
+ * provided BufferPool. This is designed to create back pressure.
+ *
+ * @author none
+ */
+public final class ChannelListener {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ChannelListener.class);
+    private final ScheduledExecutorService executor;
+    private final Selector serverSocketSelector; // used to listen for new connections
+    private final Selector socketChannelSelector; // used to listen on existing connections
+    private final ChannelDispatcher channelDispatcher;
+    private final BufferPool bufferPool;
+    private final int initialBufferPoolSize;
+    private volatile long channelReaderFrequencyMSecs = 50;
+
+    public ChannelListener(final int threadPoolSize, final StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout,
+            TimeUnit unit) throws IOException {
+        this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); // need to allow for long running ChannelDispatcher thread
+        this.serverSocketSelector = Selector.open();
+        this.socketChannelSelector = Selector.open();
+        this.bufferPool = bufferPool;
+        this.initialBufferPoolSize = bufferPool.size();
+        channelDispatcher = new ChannelDispatcher(serverSocketSelector, socketChannelSelector, executor, consumerFactory, bufferPool,
+                timeout, unit);
+        executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS);
+    }
+
+    public void setChannelReaderSchedulingPeriod(final long period, final TimeUnit unit) {
+        channelReaderFrequencyMSecs = TimeUnit.MILLISECONDS.convert(period, unit);
+        channelDispatcher.setChannelReaderFrequency(period, unit);
+    }
+
+    /**
+     * Adds a server socket channel for listening to connections.
+     *
+     * @param nicIPAddress - if null binds to wildcard address
+     * @param port - port to bind to
+     * @param receiveBufferSize - size of OS receive buffer to request. If less
+     * than 0 then will not be set and OS default will win.
+     * @throws IOException
+     */
+    public void addServerSocket(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
+            throws IOException {
+        final ServerSocketChannel ssChannel = ServerSocketChannel.open();
+        ssChannel.configureBlocking(false);
+        if (receiveBufferSize > 0) {
+            ssChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+            final int actualReceiveBufSize = ssChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+            if (actualReceiveBufSize < receiveBufferSize) {
+                LOGGER.warn(this + " attempted to set TCP Receive Buffer Size to "
+                        + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize
+                        + "bytes. You may want to consider changing the Operating System's "
+                        + "maximum receive buffer");
+            }
+        }
+        ssChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+        ssChannel.bind(new InetSocketAddress(nicIPAddress, port));
+        ssChannel.register(serverSocketSelector, SelectionKey.OP_ACCEPT);
+    }
+
+    /**
+     * Binds to listen for data grams on the given local IPAddress/port
+     *
+     * @param nicIPAddress - if null will listen on wildcard address, which
+     * means datagrams will be received on all local network interfaces.
+     * Otherwise, will bind to the provided IP address associated with some NIC.
+     * @param port - the port to listen on
+     * @param receiveBufferSize - the number of bytes to request for a receive
+     * buffer from OS
+     * @throws IOException
+     */
+    public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
+            throws IOException {
+        final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
+        dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
+    }
+
+    /**
+     * Binds to listen for data grams on the given local IPAddress/port and
+     * restricts receipt of datagrams to those from the provided host and port,
+     * must specify both. This improves performance for datagrams coming from a
+     * sender that is known a-priori.
+     *
+     * @param nicIPAddress - if null will listen on wildcard address, which
+     * means datagrams will be received on all local network interfaces.
+     * Otherwise, will bind to the provided IP address associated with some NIC.
+     * @param port - the port to listen on. This is used to provide a well-known
+     * destination for a sender.
+     * @param receiveBufferSize - the number of bytes to request for a receive
+     * buffer from OS
+     * @param sendingHost - the hostname, or IP address, of the sender of
+     * datagrams. Only datagrams from this host will be received. If this is
+     * null the wildcard ip is used, which means datagrams may be received from
+     * any network interface on the local host.
+     * @param sendingPort - the port used by the sender of datagrams. Only
+     * datagrams from this port will be received.
+     * @throws IOException
+     */
+    public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize, final String sendingHost,
+            final Integer sendingPort) throws IOException {
+
+        if (sendingHost == null || sendingPort == null) {
+            addDatagramChannel(nicIPAddress, port, receiveBufferSize);
+            return;
+        }
+        final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
+        dChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
+        dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
+    }
+
+    private DatagramChannel createAndBindDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
+            throws IOException {
+        final DatagramChannel dChannel = DatagramChannel.open();
+        dChannel.configureBlocking(false);
+        if (receiveBufferSize > 0) {
+            dChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+            final int actualReceiveBufSize = dChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+            if (actualReceiveBufSize < receiveBufferSize) {
+                LOGGER.warn(this + " attempted to set UDP Receive Buffer Size to "
+                        + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize
+                        + "bytes. You may want to consider changing the Operating System's "
+                        + "maximum receive buffer");
+            }
+        }
+        dChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+        dChannel.bind(new InetSocketAddress(nicIPAddress, port));
+        return dChannel;
+    }
+
+    public void shutdown(final long period, final TimeUnit timeUnit) {
+        channelDispatcher.stop();
+        for (SelectionKey selectionKey : socketChannelSelector.keys()) {
+            final AbstractChannelReader reader = (AbstractChannelReader) selectionKey.attachment();
+            selectionKey.cancel();
+            if (reader != null) {
+                while (!reader.isClosed()) {
+                    try {
+                        Thread.sleep(channelReaderFrequencyMSecs);
+                    } catch (InterruptedException e) {
+                    }
+                }
+                final ScheduledFuture<?> readerFuture = reader.getScheduledFuture();
+                readerFuture.cancel(false);
+            }
+            IOUtils.closeQuietly(selectionKey.channel()); // should already be closed via reader, but if reader did not exist...
+        }
+        IOUtils.closeQuietly(socketChannelSelector);
+
+        for (SelectionKey selectionKey : serverSocketSelector.keys()) {
+            selectionKey.cancel();
+            IOUtils.closeQuietly(selectionKey.channel());
+        }
+        IOUtils.closeQuietly(serverSocketSelector);
+        executor.shutdown();
+        try {
+            executor.awaitTermination(period, timeUnit);
+        } catch (final InterruptedException ex) {
+            LOGGER.warn("Interrupted while trying to shutdown executor");
+        }
+        final int currentBufferPoolSize = bufferPool.size();
+        final String warning = (currentBufferPoolSize != initialBufferPoolSize) ? "Initial buffer count=" + initialBufferPoolSize
+                + " Current buffer count=" + currentBufferPoolSize
+                + " Could indicate a buffer leak.  Ensure all consumers are executed until they complete." : "";
+        LOGGER.info("Channel listener shutdown. " + warning);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
new file mode 100644
index 0000000..1eb5c7e
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+/**
+ *
+ * @author none
+ */
+public final class DatagramChannelReader extends AbstractChannelReader {
+
+    public static final int MAX_UDP_PACKET_SIZE = 65507;
+
+    public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
+        super(id, key, empties, consumerFactory);
+    }
+
+    /**
+     * Will receive UDP data from channel and won't receive anything unless the
+     * given buffer has enough space for at least one full max udp packet.
+     *
+     * @param key
+     * @param buffer
+     * @return
+     * @throws IOException
+     */
+    @Override
+    protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
+        final DatagramChannel dChannel = (DatagramChannel) key.channel();
+        final int initialBufferPosition = buffer.position();
+        while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) {
+            if (dChannel.receive(buffer) == null) {
+                break;
+            }
+        }
+        return buffer.position() - initialBufferPosition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
new file mode 100644
index 0000000..db2c102
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+/**
+ *
+ * @author none
+ */
+public final class SocketChannelReader extends AbstractChannelReader {
+
+    public SocketChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
+        super(id, key, empties, consumerFactory);
+    }
+
+    /**
+     * Receives TCP data from the socket channel for the given key.
+     *
+     * @param key
+     * @param buffer
+     * @return
+     * @throws IOException
+     */
+    @Override
+    protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
+        int bytesRead = 0;
+        final SocketChannel sChannel = (SocketChannel) key.channel();
+        while (key.isValid() && key.isReadable()) {
+            bytesRead = sChannel.read(buffer);
+            if (bytesRead <= 0) {
+                break;
+            }
+        }
+        return bytesRead;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
new file mode 100644
index 0000000..fce59c6
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.consumer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.nifi.io.nio.BufferPool;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+/**
+ *
+ * @author none
+ */
+public abstract class AbstractStreamConsumer implements StreamConsumer {
+
+    private final String uniqueId;
+    private BufferPool bufferPool = null;
+    private final BlockingQueue<ByteBuffer> filledBuffers = new LinkedBlockingQueue<>();
+    private final AtomicBoolean streamEnded = new AtomicBoolean(false);
+    private final AtomicBoolean consumerEnded = new AtomicBoolean(false);
+
+    public AbstractStreamConsumer(final String id) {
+        uniqueId = id;
+    }
+
+    @Override
+    public final void setReturnBufferQueue(final BufferPool returnQueue) {
+        bufferPool = returnQueue;
+    }
+
+    @Override
+    public final void addFilledBuffer(final ByteBuffer buffer) {
+        if (isConsumerFinished()) {
+            buffer.clear();
+            bufferPool.returnBuffer(buffer, buffer.remaining());
+        } else {
+            filledBuffers.add(buffer);
+        }
+    }
+
+    @Override
+    public final void process() throws IOException {
+        if (isConsumerFinished()) {
+            return;
+        }
+        if (streamEnded.get() && filledBuffers.isEmpty()) {
+            consumerEnded.set(true);
+            onConsumerDone();
+            return;
+        }
+        final ByteBuffer buffer = filledBuffers.poll();
+        if (buffer != null) {
+            final int bytesToProcess = buffer.remaining();
+            try {
+                processBuffer(buffer);
+            } finally {
+                buffer.clear();
+                bufferPool.returnBuffer(buffer, bytesToProcess);
+            }
+        }
+    }
+
+    protected abstract void processBuffer(ByteBuffer buffer) throws IOException;
+
+    @Override
+    public final void signalEndOfStream() {
+        streamEnded.set(true);
+    }
+
+    /**
+     * Convenience method that is called when the consumer is done processing
+     * based on being told the signal is end of stream and has processed all
+     * given buffers.
+     */
+    protected void onConsumerDone() {
+    }
+
+    @Override
+    public final boolean isConsumerFinished() {
+        return consumerEnded.get();
+    }
+
+    @Override
+    public final String getId() {
+        return uniqueId;
+    }
+
+    @Override
+    public final boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+        if (obj.getClass() != getClass()) {
+            return false;
+        }
+        AbstractStreamConsumer rhs = (AbstractStreamConsumer) obj;
+        return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals();
+    }
+
+    @Override
+    public final int hashCode() {
+        return new HashCodeBuilder(19, 23).append(uniqueId).toHashCode();
+    }
+
+    @Override
+    public final String toString() {
+        return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
new file mode 100644
index 0000000..d75b7d7
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.consumer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.nifi.io.nio.BufferPool;
+
+/**
+ * A StreamConsumer must be thread safe. It may be accessed concurrently by a
+ * thread providing data to process and another thread that is processing that
+ * data.
+ *
+ * @author none
+ */
+public interface StreamConsumer {
+
+    /**
+     * Will be called once just after construction. It provides the queue to
+     * which processed and emptied and cleared buffers must be returned. For
+     * each time <code>addFilledBuffer</code> is called there should be an
+     * associated add to this given queue. If not, buffers will run out and all
+     * stream processing will halt. READ THIS!!!
+     *
+     * @param returnQueue
+     */
+    void setReturnBufferQueue(BufferPool returnQueue);
+
+    /**
+     * Will be called by the thread that produces byte buffers with available
+     * data to be processed. If the consumer is finished this should simply
+     * return the given buffer to the return buffer queue (after it is cleared)
+     *
+     * @param buffer
+     */
+    void addFilledBuffer(ByteBuffer buffer);
+
+    /**
+     * Will be called by the thread that executes the consumption of data. May
+     * be called many times though once <code>isConsumerFinished</code> returns
+     * true this method will likely do nothing.
+     * @throws java.io.IOException
+     */
+    void process() throws IOException;
+
+    /**
+     * Called once the end of the input stream is detected
+     */
+    void signalEndOfStream();
+
+    /**
+     * If true signals the consumer is done consuming data and will not process
+     * any more buffers.
+     *
+     * @return
+     */
+    boolean isConsumerFinished();
+
+    /**
+     * Uniquely identifies the consumer
+     *
+     * @return
+     */
+    String getId();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
new file mode 100644
index 0000000..df298d5
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.consumer;
+
+/**
+ *
+ * @author none
+ */
+public interface StreamConsumerFactory {
+
+    StreamConsumer newInstance(String streamId);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
new file mode 100644
index 0000000..7ed5ad4
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.nifi.util.NiFiProperties;
+
+public class SSLContextFactory {
+
+    private final String keystore;
+    private final char[] keystorePass;
+    private final String keystoreType;
+    private final String truststore;
+    private final char[] truststorePass;
+    private final String truststoreType;
+
+    private final KeyManager[] keyManagers;
+    private final TrustManager[] trustManagers;
+
+    public SSLContextFactory(final NiFiProperties properties) throws NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException, KeyStoreException, UnrecoverableKeyException {
+        keystore = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE);
+        keystorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD));
+        keystoreType = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE);
+
+        truststore = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE);
+        truststorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD));
+        truststoreType = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE);
+
+        // prepare the keystore
+        final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+        keyStore.load(new FileInputStream(keystore), keystorePass);
+        final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(keyStore, keystorePass);
+
+        // prepare the truststore
+        final KeyStore trustStore = KeyStore.getInstance(truststoreType);
+        trustStore.load(new FileInputStream(truststore), truststorePass);
+        final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        trustManagerFactory.init(trustStore);
+
+        keyManagers = keyManagerFactory.getKeyManagers();
+        trustManagers = trustManagerFactory.getTrustManagers();
+    }
+
+    private static char[] getPass(final String password) {
+        return password == null ? null : password.toCharArray();
+    }
+
+    /**
+     * Creates a SSLContext instance using the given information.
+     *
+     *
+     * @return a SSLContext instance
+     * @throws java.security.KeyStoreException
+     * @throws java.io.IOException
+     * @throws java.security.NoSuchAlgorithmException
+     * @throws java.security.cert.CertificateException
+     * @throws java.security.UnrecoverableKeyException
+     * @throws java.security.KeyManagementException
+     */
+    public SSLContext createSslContext() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
+            UnrecoverableKeyException, KeyManagementException {
+
+        // initialize the ssl context
+        final SSLContext sslContext = SSLContext.getInstance("TLS");
+        sslContext.init(keyManagers, trustManagers, new SecureRandom());
+        sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
+
+        return sslContext;
+
+    }
+}