You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:30:05 UTC
[42/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java b/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java
new file mode 100644
index 0000000..aa6f8f3
--- /dev/null
+++ b/commons/nifi-properties/src/main/java/org/apache/nifi/util/StringUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+/**
+ * String Utils based on the Apache Commons Lang String Utils.
+ * These simple util methods here allow us to avoid a dependency in the core
+ */
+public class StringUtils {
+
+ public static final String EMPTY = "";
+
+ public static boolean isBlank(final String str) {
+ if (str == null || str.isEmpty()) {
+ return true;
+ }
+ for (int i = 0; i < str.length(); i++) {
+ if (!Character.isWhitespace(str.charAt(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean isEmpty(final String str) {
+ return str == null || str.isEmpty();
+ }
+
+ public static boolean startsWith(final String str, final String prefix) {
+ if (str == null || prefix == null) {
+ return (str == null && prefix == null);
+ }
+ if (prefix.length() > str.length()) {
+ return false;
+ }
+ return str.regionMatches(false, 0, prefix, 0, prefix.length());
+ }
+
+ public static String substringAfter(final String str, final String separator) {
+ if (isEmpty(str)) {
+ return str;
+ }
+ if (separator == null) {
+ return EMPTY;
+ }
+ int pos = str.indexOf(separator);
+ if (pos == -1) {
+ return EMPTY;
+ }
+ return str.substring(pos + separator.length());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-security-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-security-utils/pom.xml b/commons/nifi-security-utils/pom.xml
new file mode 100644
index 0000000..76e9ac1
--- /dev/null
+++ b/commons/nifi-security-utils/pom.xml
@@ -0,0 +1,40 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-security-utils</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>NiFi Security Utils</name>
+ <description>Contains security functionality.</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.3.2</version>
+ </dependency>
+ </dependencies>
+</project>
+
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java
----------------------------------------------------------------------
diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java
new file mode 100644
index 0000000..087d891
--- /dev/null
+++ b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.security.KeyStore;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class CertificateUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(CertificateUtils.class);
+
+ /**
+ * Returns true if the given keystore can be loaded using the given keystore
+ * type and password. Returns false otherwise.
+ * @param keystore
+ * @param keystoreType
+ * @param password
+ * @return
+ */
+ public static boolean isStoreValid(final URL keystore, final KeystoreType keystoreType, final char[] password) {
+
+ if (keystore == null) {
+ throw new IllegalArgumentException("keystore may not be null");
+ } else if (keystoreType == null) {
+ throw new IllegalArgumentException("keystore type may not be null");
+ } else if (password == null) {
+ throw new IllegalArgumentException("password may not be null");
+ }
+
+ BufferedInputStream bis = null;
+ final KeyStore ks;
+ try {
+
+ // load the keystore
+ bis = new BufferedInputStream(keystore.openStream());
+ ks = KeyStore.getInstance(keystoreType.name());
+ ks.load(bis, password);
+
+ return true;
+
+ } catch (Exception e) {
+ return false;
+ } finally {
+ if (bis != null) {
+ try {
+ bis.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close input stream", ioe);
+ }
+ }
+ }
+ }
+
+ /**
+ * Extracts the username from the specified DN. If the username cannot be
+ * extracted because the CN is in an unrecognized format, the entire CN is
+ * returned. If the CN cannot be extracted because the DN is in an
+ * unrecognized format, the entire DN is returned.
+ *
+ * @param dn
+ * @return
+ */
+ public static String extractUsername(String dn) {
+ String username = dn;
+ String cn = "";
+
+ // ensure the dn is specified
+ if (StringUtils.isNotBlank(dn)) {
+
+ // attempt to locate the cn
+ if (dn.startsWith("CN=")) {
+ cn = StringUtils.substringBetween(dn, "CN=", ",");
+ } else if (dn.startsWith("/CN=")) {
+ cn = StringUtils.substringBetween(dn, "CN=", "/");
+ } else if (dn.startsWith("C=") || dn.startsWith("/C=")) {
+ cn = StringUtils.substringAfter(dn, "CN=");
+ } else if (dn.startsWith("/") && StringUtils.contains(dn, "CN=")) {
+ cn = StringUtils.substringAfter(dn, "CN=");
+ }
+
+ // attempt to get the username from the cn
+ if (StringUtils.isNotBlank(cn)) {
+ if (cn.endsWith(")")) {
+ username = StringUtils.substringBetween(cn, "(", ")");
+ } else if (cn.contains(" ")) {
+ username = StringUtils.substringAfterLast(cn, " ");
+ } else {
+ username = cn;
+ }
+ }
+ }
+
+ return username;
+ }
+
+ /**
+ * Returns a list of subject alternative names. Any name that is represented
+ * as a String by X509Certificate.getSubjectAlternativeNames() is converted
+ * to lowercase and returned.
+ *
+ * @param certificate a certificate
+ * @return a list of subject alternative names; list is never null
+ * @throws CertificateParsingException if parsing the certificate failed
+ */
+ public static List<String> getSubjectAlternativeNames(final X509Certificate certificate) throws CertificateParsingException {
+
+ final Collection<List<?>> altNames = certificate.getSubjectAlternativeNames();
+ if (altNames == null) {
+ return new ArrayList<>();
+ }
+
+ final List<String> result = new ArrayList<>();
+ for (final List<?> generalName : altNames) {
+ /*
+ * generalName has the name type as the first element a String or
+ * byte array for the second element. We return any general names
+ * that are String types.
+ *
+ * We don't inspect the numeric name type because some certificates
+ * incorrectly put IPs and DNS names under the wrong name types.
+ */
+ final Object value = generalName.get(1);
+ if (value instanceof String) {
+ result.add(((String) value).toLowerCase());
+ }
+
+ }
+
+ return result;
+ }
+
+ private CertificateUtils() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java
----------------------------------------------------------------------
diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java
new file mode 100644
index 0000000..741fdde
--- /dev/null
+++ b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/EncryptionMethod.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+/**
+ * Enumeration capturing essential information about the various encryption
+ * methods that might be supported.
+ *
+ * @author none
+ */
+public enum EncryptionMethod {
+
+ MD5_128AES("PBEWITHMD5AND128BITAES-CBC-OPENSSL", "BC", false),
+ MD5_256AES("PBEWITHMD5AND256BITAES-CBC-OPENSSL", "BC", false),
+ SHA1_RC2("PBEWITHSHA1ANDRC2", "BC", false),
+ SHA1_DES("PBEWITHSHA1ANDDES", "BC", false),
+ MD5_192AES("PBEWITHMD5AND192BITAES-CBC-OPENSSL", "BC", false),
+ MD5_DES("PBEWITHMD5ANDDES", "BC", false),
+ MD5_RC2("PBEWITHMD5ANDRC2", "BC", false),
+ SHA_192AES("PBEWITHSHAAND192BITAES-CBC-BC", "BC", true),
+ SHA_40RC4("PBEWITHSHAAND40BITRC4", "BC", true),
+ SHA256_128AES("PBEWITHSHA256AND128BITAES-CBC-BC", "BC", true),
+ SHA_128RC2("PBEWITHSHAAND128BITRC2-CBC", "BC", true),
+ SHA_128AES("PBEWITHSHAAND128BITAES-CBC-BC", "BC", true),
+ SHA256_192AES("PBEWITHSHA256AND192BITAES-CBC-BC", "BC", true),
+ SHA_2KEYTRIPLEDES("PBEWITHSHAAND2-KEYTRIPLEDES-CBC", "BC", true),
+ SHA256_256AES("PBEWITHSHA256AND256BITAES-CBC-BC", "BC", true),
+ SHA_40RC2("PBEWITHSHAAND40BITRC2-CBC", "BC", true),
+ SHA_256AES("PBEWITHSHAAND256BITAES-CBC-BC", "BC", true),
+ SHA_3KEYTRIPLEDES("PBEWITHSHAAND3-KEYTRIPLEDES-CBC", "BC", true),
+ SHA_TWOFISH("PBEWITHSHAANDTWOFISH-CBC", "BC", true),
+ SHA_128RC4("PBEWITHSHAAND128BITRC4", "BC", true);
+ private final String algorithm;
+ private final String provider;
+ private final boolean unlimitedStrength;
+
+ EncryptionMethod(String algorithm, String provider, boolean unlimitedStrength) {
+ this.algorithm = algorithm;
+ this.provider = provider;
+ this.unlimitedStrength = unlimitedStrength;
+ }
+
+ public String getProvider() {
+ return provider;
+ }
+
+ public String getAlgorithm() {
+ return algorithm;
+ }
+
+ /**
+ * @return true if algorithm requires unlimited strength policies
+ */
+ public boolean isUnlimitedStrength() {
+ return unlimitedStrength;
+ }
+
+ @Override
+ public String toString() {
+ final ToStringBuilder builder = new ToStringBuilder(this);
+ ToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE);
+ builder.append("algorithm name", algorithm);
+ builder.append("Requires unlimited strength JCE policy", unlimitedStrength);
+ builder.append("Algorithm Provider", provider);
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java
----------------------------------------------------------------------
diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java
new file mode 100644
index 0000000..18574bb
--- /dev/null
+++ b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/KeystoreType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util;
+
+/**
+ * Keystore types.
+ */
+public enum KeystoreType {
+
+ PKCS12,
+ JKS;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java
----------------------------------------------------------------------
diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java
new file mode 100644
index 0000000..9abfcc3
--- /dev/null
+++ b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SecurityStoreTypes.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util;
+
+import java.io.PrintWriter;
+import java.io.Writer;
+
+/**
+ * Types of security stores and their related Java system properties.
+ */
+public enum SecurityStoreTypes {
+
+ TRUSTSTORE(
+ "javax.net.ssl.trustStore",
+ "javax.net.ssl.trustStorePassword",
+ "javax.net.ssl.trustStoreType"),
+ KEYSTORE(
+ "javax.net.ssl.keyStore",
+ "javax.net.ssl.keyStorePassword",
+ "javax.net.ssl.keyStoreType");
+
+ /**
+ * Logs the keystore and truststore Java system property values to the given
+ * writer. It logPasswords is true, then the keystore and truststore
+ * password property values are logged.
+ *
+ * @param writer a writer to log to
+ *
+ * @param logPasswords true if passwords should be logged; false otherwise
+ */
+ public static void logProperties(final Writer writer,
+ final boolean logPasswords) {
+ if (writer == null) {
+ return;
+ }
+
+ PrintWriter pw = new PrintWriter(writer);
+
+ // keystore properties
+ pw.println(
+ KEYSTORE.getStoreProperty() + " = " + System.getProperty(KEYSTORE.getStoreProperty()));
+
+ if (logPasswords) {
+ pw.println(
+ KEYSTORE.getStorePasswordProperty() + " = "
+ + System.getProperty(KEYSTORE.getStoreProperty()));
+ }
+
+ pw.println(
+ KEYSTORE.getStoreTypeProperty() + " = "
+ + System.getProperty(KEYSTORE.getStoreTypeProperty()));
+
+ // truststore properties
+ pw.println(
+ TRUSTSTORE.getStoreProperty() + " = "
+ + System.getProperty(TRUSTSTORE.getStoreProperty()));
+
+ if (logPasswords) {
+ pw.println(
+ TRUSTSTORE.getStorePasswordProperty() + " = "
+ + System.getProperty(TRUSTSTORE.getStoreProperty()));
+ }
+
+ pw.println(
+ TRUSTSTORE.getStoreTypeProperty() + " = "
+ + System.getProperty(TRUSTSTORE.getStoreTypeProperty()));
+ pw.flush();
+ }
+
+ /**
+ * the Java system property for setting the keystore (or truststore) path
+ */
+ private String storeProperty = "";
+
+ /**
+ * the Java system property for setting the keystore (or truststore)
+ * password
+ */
+ private String storePasswordProperty = "";
+
+ /**
+ * the Java system property for setting the keystore (or truststore) type
+ */
+ private String storeTypeProperty = "";
+
+ /**
+ * Creates an instance.
+ *
+ * @param storeProperty the Java system property for setting the keystore (
+ * or truststore) path
+ * @param storePasswordProperty the Java system property for setting the
+ * keystore (or truststore) password
+ * @param storeTypeProperty the Java system property for setting the
+ * keystore (or truststore) type
+ */
+ SecurityStoreTypes(final String storeProperty,
+ final String storePasswordProperty,
+ final String storeTypeProperty) {
+ this.storeProperty = storeProperty;
+ this.storePasswordProperty = storePasswordProperty;
+ this.storeTypeProperty = storeTypeProperty;
+ }
+
+ /**
+ * Returns the keystore (or truststore) property.
+ *
+ * @return the keystore (or truststore) property
+ */
+ public String getStoreProperty() {
+ return storeProperty;
+ }
+
+ /**
+ * Returns the keystore (or truststore) password property.
+ *
+ * @return the keystore (or truststore) password property
+ */
+ public String getStorePasswordProperty() {
+ return storePasswordProperty;
+ }
+
+ /**
+ * Returns the keystore (or truststore) type property.
+ *
+ * @return the keystore (or truststore) type property
+ */
+ public String getStoreTypeProperty() {
+ return storeTypeProperty;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
----------------------------------------------------------------------
diff --git a/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
new file mode 100644
index 0000000..2371b0c
--- /dev/null
+++ b/commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/SslContextFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.util;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+
+/**
+ * A factory for creating SSL contexts using the application's security
+ * properties.
+ *
+ * @author unattributed
+ */
+public final class SslContextFactory {
+
+ public static enum ClientAuth {
+
+ WANT,
+ REQUIRED,
+ NONE
+ }
+
+ /**
+ * Creates a SSLContext instance using the given information.
+ *
+ * @param keystore the full path to the keystore
+ * @param keystorePasswd the keystore password
+ * @param keystoreType the type of keystore (e.g., PKCS12, JKS)
+ * @param truststore the full path to the truststore
+ * @param truststorePasswd the truststore password
+ * @param truststoreType the type of truststore (e.g., PKCS12, JKS)
+ * @param clientAuth the type of client authentication
+ *
+ * @return a SSLContext instance
+ * @throws java.security.KeyStoreException
+ * @throws java.io.IOException
+ * @throws java.security.NoSuchAlgorithmException
+ * @throws java.security.cert.CertificateException
+ * @throws java.security.UnrecoverableKeyException
+ * @throws java.security.KeyManagementException
+ */
+ public static SSLContext createSslContext(
+ final String keystore, final char[] keystorePasswd, final String keystoreType,
+ final String truststore, final char[] truststorePasswd, final String truststoreType,
+ final ClientAuth clientAuth)
+ throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
+ UnrecoverableKeyException, KeyManagementException {
+
+ // prepare the keystore
+ final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+ try (final InputStream keyStoreStream = new FileInputStream(keystore)) {
+ keyStore.load(keyStoreStream, keystorePasswd);
+ }
+ final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ keyManagerFactory.init(keyStore, keystorePasswd);
+
+ // prepare the truststore
+ final KeyStore trustStore = KeyStore.getInstance(truststoreType);
+ try (final InputStream trustStoreStream = new FileInputStream(truststore)) {
+ trustStore.load(trustStoreStream, truststorePasswd);
+ }
+ final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(trustStore);
+
+ // initialize the ssl context
+ final SSLContext sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom());
+ if (ClientAuth.REQUIRED == clientAuth) {
+ sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
+ } else if (ClientAuth.WANT == clientAuth) {
+ sslContext.getDefaultSSLParameters().setWantClientAuth(true);
+ } else {
+ sslContext.getDefaultSSLParameters().setWantClientAuth(false);
+ }
+
+ return sslContext;
+
+ }
+
+ /**
+ * Creates a SSLContext instance using the given information.
+ *
+ * @param keystore the full path to the keystore
+ * @param keystorePasswd the keystore password
+ * @param keystoreType the type of keystore (e.g., PKCS12, JKS)
+ *
+ * @return a SSLContext instance
+ * @throws java.security.KeyStoreException
+ * @throws java.io.IOException
+ * @throws java.security.NoSuchAlgorithmException
+ * @throws java.security.cert.CertificateException
+ * @throws java.security.UnrecoverableKeyException
+ * @throws java.security.KeyManagementException
+ */
+ public static SSLContext createSslContext(
+ final String keystore, final char[] keystorePasswd, final String keystoreType)
+ throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
+ UnrecoverableKeyException, KeyManagementException {
+
+ // prepare the keystore
+ final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+ try (final InputStream keyStoreStream = new FileInputStream(keystore)) {
+ keyStore.load(keyStoreStream, keystorePasswd);
+ }
+ final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ keyManagerFactory.init(keyStore, keystorePasswd);
+
+ // initialize the ssl context
+ final SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(keyManagerFactory.getKeyManagers(), new TrustManager[0], new SecureRandom());
+
+ return ctx;
+
+ }
+
+ /**
+ * Creates a SSLContext instance using the given information.
+ *
+ * @param truststore the full path to the truststore
+ * @param truststorePasswd the truststore password
+ * @param truststoreType the type of truststore (e.g., PKCS12, JKS)
+ *
+ * @return a SSLContext instance
+ * @throws java.security.KeyStoreException
+ * @throws java.io.IOException
+ * @throws java.security.NoSuchAlgorithmException
+ * @throws java.security.cert.CertificateException
+ * @throws java.security.UnrecoverableKeyException
+ * @throws java.security.KeyManagementException
+ */
+ public static SSLContext createTrustSslContext(
+ final String truststore, final char[] truststorePasswd, final String truststoreType)
+ throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
+ UnrecoverableKeyException, KeyManagementException {
+
+ // prepare the truststore
+ final KeyStore trustStore = KeyStore.getInstance(truststoreType);
+ try (final InputStream trustStoreStream = new FileInputStream(truststore)) {
+ trustStore.load(trustStoreStream, truststorePasswd);
+ }
+ final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(trustStore);
+
+ // initialize the ssl context
+ final SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(new KeyManager[0], trustManagerFactory.getTrustManagers(), new SecureRandom());
+
+ return ctx;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/pom.xml b/commons/nifi-socket-utils/pom.xml
new file mode 100644
index 0000000..8e06433
--- /dev/null
+++ b/commons/nifi-socket-utils/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-socket-utils</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>NiFi Socket Utils</name>
+ <description>Utilities for socket communication</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-logging-utils</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <version>3.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-properties</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
new file mode 100644
index 0000000..172c593
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.io.nio.consumer.StreamConsumer;
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author none
+ */
+public abstract class AbstractChannelReader implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChannelReader.class);
+ private final String uniqueId;
+ private final SelectionKey key;
+ private final BufferPool bufferPool;
+ private final StreamConsumer consumer;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final AtomicReference<ScheduledFuture<?>> future = new AtomicReference<>(null);//the future on which this reader runs...
+
+ public AbstractChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
+ this.uniqueId = id;
+ this.key = key;
+ this.bufferPool = empties;
+ this.consumer = consumerFactory.newInstance(id);
+ consumer.setReturnBufferQueue(bufferPool);
+ }
+
+ protected void setScheduledFuture(final ScheduledFuture<?> future) {
+ this.future.set(future);
+ }
+
+ protected ScheduledFuture<?> getScheduledFuture() {
+ return future.get();
+ }
+
+ protected SelectionKey getSelectionKey() {
+ return key;
+ }
+
+ public boolean isClosed() {
+ return isClosed.get();
+ }
+
+ private void closeStream() {
+ if (isClosed.get()) {
+ return;
+ }
+ try {
+ isClosed.set(true);
+ future.get().cancel(false);
+ key.cancel();
+ key.channel().close();
+ } catch (final IOException ioe) {
+ LOGGER.warn("Unable to cleanly close stream due to " + ioe);
+ } finally {
+ consumer.signalEndOfStream();
+ }
+ }
+
+ /**
+ * Allows a subclass to specifically handle how it reads from the given
+ * key's channel into the given buffer.
+ *
+ * @param key
+ * @param buffer
+ * @return the number of bytes read in the final read cycle. A value of zero
+ * or more indicates the channel is still open but a value of -1 indicates
+ * end of stream.
+ * @throws IOException
+ */
+ protected abstract int fillBuffer(SelectionKey key, ByteBuffer buffer) throws IOException;
+
+ @Override
+ public final void run() {
+ if (!key.isValid() || consumer.isConsumerFinished()) {
+ closeStream();
+ return;
+ }
+ if (!key.isReadable()) {
+ return;//there is nothing available to read...or we aren't allow to read due to throttling
+ }
+ ByteBuffer buffer = null;
+ try {
+ buffer = bufferPool.poll();
+ if (buffer == null) {
+ return; // no buffers available - come back later
+ }
+ final int bytesRead = fillBuffer(key, buffer);
+ buffer.flip();
+ if (buffer.remaining() > 0) {
+ consumer.addFilledBuffer(buffer);
+ buffer = null; //clear the reference - is now the consumer's responsiblity
+ } else {
+ buffer.clear();
+ bufferPool.returnBuffer(buffer, 0);
+ buffer = null; //clear the reference - is now back to the queue
+ }
+ if (bytesRead < 0) { //we've reached the end
+ closeStream();
+ }
+ } catch (final Exception ioe) {
+ closeStream();
+ LOGGER.error("Closed channel reader " + this + " due to " + ioe);
+ } finally {
+ if (buffer != null) {
+ buffer.clear();
+ bufferPool.returnBuffer(buffer, 0);
+ }
+ }
+ }
+
+ @Override
+ public final boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
+ AbstractChannelReader rhs = (AbstractChannelReader) obj;
+ return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals();
+ }
+
+ @Override
+ public final int hashCode() {
+ return new HashCodeBuilder(17, 37).append(uniqueId).toHashCode();
+ }
+
+ @Override
+ public final String toString() {
+ return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
new file mode 100644
index 0000000..a413ad2
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author none
+ */
+public class BufferPool implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(BufferPool.class);
+ final BlockingQueue<ByteBuffer> bufferPool;
+ private final static double ONE_MB = 1 << 20;
+ private Calendar lastRateSampleTime = Calendar.getInstance();
+ private final Calendar startTime = Calendar.getInstance();
+ double lastRateSampleMBps = -1.0;
+ double overallMBps = -1.0;
+ private long totalBytesExtracted = 0L;
+ private long lastTotalBytesExtracted = 0L;
+ final double maxRateMBps;
+
+ public BufferPool(final int bufferCount, final int bufferCapacity, final boolean allocateDirect, final double maxRateMBps) {
+ bufferPool = new LinkedBlockingDeque<>(BufferPool.createBuffers(bufferCount, bufferCapacity, allocateDirect));
+ this.maxRateMBps = maxRateMBps;
+ }
+
+ /**
+ * Returns the given buffer to the pool - and clears it.
+ *
+ * @param buffer
+ * @param bytesProcessed
+ * @return
+ */
+ public synchronized boolean returnBuffer(ByteBuffer buffer, final int bytesProcessed) {
+ totalBytesExtracted += bytesProcessed;
+ buffer.clear();
+ return bufferPool.add(buffer);
+ }
+
+ //here we enforce the desired rate we want by restricting access to buffers when we're over rate
+ public synchronized ByteBuffer poll() {
+ computeRate();
+ final double weightedAvg = (lastRateSampleMBps * 0.7) + (overallMBps * 0.3);
+ if (overallMBps >= maxRateMBps || weightedAvg >= maxRateMBps) {
+ return null;
+ }
+ return bufferPool.poll();
+ }
+
+ public int size() {
+ return bufferPool.size();
+ }
+
+ private synchronized void computeRate() {
+ final Calendar now = Calendar.getInstance();
+ final long measurementDurationMillis = now.getTimeInMillis() - lastRateSampleTime.getTimeInMillis();
+ final double duractionSecs = ((double) measurementDurationMillis) / 1000.0;
+ if (duractionSecs >= 0.75) { //recompute every 3/4 second or when we're too fast
+ final long totalDuractionMillis = now.getTimeInMillis() - startTime.getTimeInMillis();
+ final double totalDurationSecs = ((double) totalDuractionMillis) / 1000.0;
+ final long differenceBytes = totalBytesExtracted - lastTotalBytesExtracted;
+ lastTotalBytesExtracted = totalBytesExtracted;
+ lastRateSampleTime = now;
+ final double bps = ((double) differenceBytes) / duractionSecs;
+ final double totalBps = ((double) totalBytesExtracted / totalDurationSecs);
+ lastRateSampleMBps = bps / ONE_MB;
+ overallMBps = totalBps / ONE_MB;
+ }
+ }
+
+ public static List<ByteBuffer> createBuffers(final int bufferCount, final int bufferCapacity, final boolean allocateDirect) {
+ final List<ByteBuffer> buffers = new ArrayList<>();
+ for (int i = 0; i < bufferCount; i++) {
+ final ByteBuffer buffer = (allocateDirect) ? ByteBuffer.allocateDirect(bufferCapacity) : ByteBuffer.allocate(bufferCapacity);
+ buffers.add(buffer);
+ }
+ return buffers;
+ }
+
+ private void logChannelReadRates() {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(String.format("Overall rate= %,.4f MB/s / Current Rate= %,.4f MB/s / Total Bytes Read= %d", overallMBps, lastRateSampleMBps, totalBytesExtracted));
+ }
+ }
+
+ @Override
+ public void run() {
+ computeRate();
+ logChannelReadRates();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
new file mode 100644
index 0000000..2ae2c07
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * @author none
+ */
+public final class ChannelDispatcher implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ChannelDispatcher.class);
+ private final Selector serverSocketSelector;
+ private final Selector socketChannelSelector;
+ private final ScheduledExecutorService executor;
+ private final BufferPool emptyBuffers;
+ private final StreamConsumerFactory factory;
+ private final AtomicLong channelReaderFrequencyMilliseconds = new AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS);
+ private final long timeout;
+ private volatile boolean stop = false;
+ public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L;
+
+ public ChannelDispatcher(final Selector serverSocketSelector, final Selector socketChannelSelector, final ScheduledExecutorService service,
+ final StreamConsumerFactory factory, final BufferPool buffers, final long timeout, final TimeUnit unit) {
+ this.serverSocketSelector = serverSocketSelector;
+ this.socketChannelSelector = socketChannelSelector;
+ this.executor = service;
+ this.factory = factory;
+ emptyBuffers = buffers;
+ this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+ }
+
+ public void setChannelReaderFrequency(final long period, final TimeUnit timeUnit) {
+ channelReaderFrequencyMilliseconds.set(TimeUnit.MILLISECONDS.convert(period, timeUnit));
+ }
+
+ @Override
+ public void run() {
+ while (!stop) {
+ try {
+ selectServerSocketKeys();
+ selectSocketChannelKeys();
+ } catch (final Exception ex) {
+ LOGGER.warn("Key selection failed: {} Normal during shutdown.", new Object[]{ex});
+ }
+ }
+ }
+
+ /*
+ * When serverSocketsChannels are registered with the selector, want each invoke of this method to loop through all
+ * channels' keys.
+ *
+ * @throws IOException
+ */
+ private void selectServerSocketKeys() throws IOException {
+ int numSelected = serverSocketSelector.select(timeout);
+ if (numSelected == 0) {
+ return;
+ }
+
+ // for each registered server socket - see if any connections are waiting to be established
+ final Iterator<SelectionKey> itr = serverSocketSelector.selectedKeys().iterator();
+ while (itr.hasNext()) {
+ SelectionKey serverSocketkey = itr.next();
+ final SelectableChannel channel = serverSocketkey.channel();
+ AbstractChannelReader reader = null;
+ if (serverSocketkey.isValid() && serverSocketkey.isAcceptable()) {
+ final ServerSocketChannel ssChannel = (ServerSocketChannel) serverSocketkey.channel();
+ final SocketChannel sChannel = ssChannel.accept();
+ if (sChannel != null) {
+ sChannel.configureBlocking(false);
+ final SelectionKey socketChannelKey = sChannel.register(socketChannelSelector, SelectionKey.OP_READ);
+ final String readerId = sChannel.socket().toString();
+ reader = new SocketChannelReader(readerId, socketChannelKey, emptyBuffers, factory);
+ final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L,
+ channelReaderFrequencyMilliseconds.get(), TimeUnit.MILLISECONDS);
+ reader.setScheduledFuture(readerFuture);
+ socketChannelKey.attach(reader);
+ }
+ }
+ itr.remove(); // do this so that the next select operation returns a positive value; otherwise, it will return 0.
+ if (reader != null && LOGGER.isDebugEnabled()) {
+ LOGGER.debug(this + " New Connection established. Server channel: " + channel + " Reader: " + reader);
+ }
+ }
+ }
+
+ /*
+ * When invoking this method, only want to iterate through the selected keys once. When a key is entered into the selectors
+ * selected key set, select will return a positive value. The next select will return 0 if nothing has changed. Note that
+ * the selected key set is not manually changed via a remove operation.
+ *
+ * @throws IOException
+ */
+ private void selectSocketChannelKeys() throws IOException {
+ // once a channel associated with a key in this selector is 'ready', it causes this select to immediately return.
+ // thus, for each trip through the run() we only get hit with one real timeout...the one in selectServerSocketKeys.
+ int numSelected = socketChannelSelector.select(timeout);
+ if (numSelected == 0) {
+ return;
+ }
+
+ for (SelectionKey socketChannelKey : socketChannelSelector.selectedKeys()) {
+ final SelectableChannel channel = socketChannelKey.channel();
+ AbstractChannelReader reader = null;
+ // there are 2 kinds of channels in this selector, both which have their own readers and are executed in their own
+ // threads. We will get here whenever a new SocketChannel is created due to an incoming connection. However,
+ // for a DatagramChannel we don't want to create a new reader unless it is a new DatagramChannel. The only
+ // way to tell if it's new is the lack of an attachment.
+ if (channel instanceof DatagramChannel && socketChannelKey.attachment() == null) {
+ reader = new DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, emptyBuffers, factory);
+ socketChannelKey.attach(reader);
+ final ScheduledFuture<?> readerFuture = executor.scheduleWithFixedDelay(reader, 10L, channelReaderFrequencyMilliseconds.get(),
+ TimeUnit.MILLISECONDS);
+ reader.setScheduledFuture(readerFuture);
+ }
+ if (reader != null && LOGGER.isDebugEnabled()) {
+ LOGGER.debug(this + " New Connection established. Server channel: " + channel + " Reader: " + reader);
+ }
+ }
+
+ }
+
+ public void stop() {
+ stop = true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
new file mode 100644
index 0000000..b0a1cfb
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides the entry point to NIO based socket listeners for NiFi
+ * processors and services. There are 2 supported types of Listeners, Datagram
+ * (UDP based transmissions) and ServerSocket (TCP based transmissions). This
+ * will create the ChannelDispatcher, which is a Runnable and is controlled via
+ * the ScheduledExecutorService, which is also created by this class. The
+ * ChannelDispatcher handles connections to the ServerSocketChannels and creates
+ * the readers associated with the resulting SocketChannels. Additionally, this
+ * creates and manages two Selectors, one for ServerSocketChannels and another
+ * for SocketChannels and DatagramChannels.
+ *
+ * The threading model for this consists of one thread for the
+ * ChannelDispatcher, one thread per added SocketChannel reader, one thread per
+ * added DatagramChannel reader. The ChannelDispatcher is not scheduled with
+ * fixed delay as the others are. It is throttled by the provided timeout value.
+ * Within the ChannelDispatcher there are two blocking operations which will
+ * block for the given timeout each time through the enclosing loop.
+ *
+ * All channels are cached in one of the two Selectors via their SelectionKey.
+ * The serverSocketSelector maintains all the added ServerSocketChannels; the
+ * socketChannelSelector maintains the all the add DatagramChannels and the
+ * created SocketChannels. Further, the SelectionKey of the DatagramChannel and
+ * the SocketChannel is injected with the channel's associated reader.
+ *
+ * All ChannelReaders will get throttled by the unavailability of buffers in the
+ * provided BufferPool. This is designed to create back pressure.
+ *
+ * @author none
+ */
+public final class ChannelListener {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ChannelListener.class);
+ private final ScheduledExecutorService executor;
+ private final Selector serverSocketSelector; // used to listen for new connections
+ private final Selector socketChannelSelector; // used to listen on existing connections
+ private final ChannelDispatcher channelDispatcher;
+ private final BufferPool bufferPool;
+ private final int initialBufferPoolSize;
+ private volatile long channelReaderFrequencyMSecs = 50;
+
+ public ChannelListener(final int threadPoolSize, final StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout,
+ TimeUnit unit) throws IOException {
+ this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); // need to allow for long running ChannelDispatcher thread
+ this.serverSocketSelector = Selector.open();
+ this.socketChannelSelector = Selector.open();
+ this.bufferPool = bufferPool;
+ this.initialBufferPoolSize = bufferPool.size();
+ channelDispatcher = new ChannelDispatcher(serverSocketSelector, socketChannelSelector, executor, consumerFactory, bufferPool,
+ timeout, unit);
+ executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS);
+ }
+
+ public void setChannelReaderSchedulingPeriod(final long period, final TimeUnit unit) {
+ channelReaderFrequencyMSecs = TimeUnit.MILLISECONDS.convert(period, unit);
+ channelDispatcher.setChannelReaderFrequency(period, unit);
+ }
+
+ /**
+ * Adds a server socket channel for listening to connections.
+ *
+ * @param nicIPAddress - if null binds to wildcard address
+ * @param port - port to bind to
+ * @param receiveBufferSize - size of OS receive buffer to request. If less
+ * than 0 then will not be set and OS default will win.
+ * @throws IOException
+ */
+ public void addServerSocket(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
+ throws IOException {
+ final ServerSocketChannel ssChannel = ServerSocketChannel.open();
+ ssChannel.configureBlocking(false);
+ if (receiveBufferSize > 0) {
+ ssChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+ final int actualReceiveBufSize = ssChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+ if (actualReceiveBufSize < receiveBufferSize) {
+ LOGGER.warn(this + " attempted to set TCP Receive Buffer Size to "
+ + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize
+ + "bytes. You may want to consider changing the Operating System's "
+ + "maximum receive buffer");
+ }
+ }
+ ssChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ ssChannel.bind(new InetSocketAddress(nicIPAddress, port));
+ ssChannel.register(serverSocketSelector, SelectionKey.OP_ACCEPT);
+ }
+
+ /**
+ * Binds to listen for data grams on the given local IPAddress/port
+ *
+ * @param nicIPAddress - if null will listen on wildcard address, which
+ * means datagrams will be received on all local network interfaces.
+ * Otherwise, will bind to the provided IP address associated with some NIC.
+ * @param port - the port to listen on
+ * @param receiveBufferSize - the number of bytes to request for a receive
+ * buffer from OS
+ * @throws IOException
+ */
+ public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
+ throws IOException {
+ final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
+ dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
+ }
+
+ /**
+ * Binds to listen for data grams on the given local IPAddress/port and
+ * restricts receipt of datagrams to those from the provided host and port,
+ * must specify both. This improves performance for datagrams coming from a
+ * sender that is known a-priori.
+ *
+ * @param nicIPAddress - if null will listen on wildcard address, which
+ * means datagrams will be received on all local network interfaces.
+ * Otherwise, will bind to the provided IP address associated with some NIC.
+ * @param port - the port to listen on. This is used to provide a well-known
+ * destination for a sender.
+ * @param receiveBufferSize - the number of bytes to request for a receive
+ * buffer from OS
+ * @param sendingHost - the hostname, or IP address, of the sender of
+ * datagrams. Only datagrams from this host will be received. If this is
+ * null the wildcard ip is used, which means datagrams may be received from
+ * any network interface on the local host.
+ * @param sendingPort - the port used by the sender of datagrams. Only
+ * datagrams from this port will be received.
+ * @throws IOException
+ */
+ public void addDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize, final String sendingHost,
+ final Integer sendingPort) throws IOException {
+
+ if (sendingHost == null || sendingPort == null) {
+ addDatagramChannel(nicIPAddress, port, receiveBufferSize);
+ return;
+ }
+ final DatagramChannel dChannel = createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
+ dChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
+ dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
+ }
+
+ private DatagramChannel createAndBindDatagramChannel(final InetAddress nicIPAddress, final int port, final int receiveBufferSize)
+ throws IOException {
+ final DatagramChannel dChannel = DatagramChannel.open();
+ dChannel.configureBlocking(false);
+ if (receiveBufferSize > 0) {
+ dChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+ final int actualReceiveBufSize = dChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+ if (actualReceiveBufSize < receiveBufferSize) {
+ LOGGER.warn(this + " attempted to set UDP Receive Buffer Size to "
+ + receiveBufferSize + " bytes but could only set to " + actualReceiveBufSize
+ + "bytes. You may want to consider changing the Operating System's "
+ + "maximum receive buffer");
+ }
+ }
+ dChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ dChannel.bind(new InetSocketAddress(nicIPAddress, port));
+ return dChannel;
+ }
+
+ public void shutdown(final long period, final TimeUnit timeUnit) {
+ channelDispatcher.stop();
+ for (SelectionKey selectionKey : socketChannelSelector.keys()) {
+ final AbstractChannelReader reader = (AbstractChannelReader) selectionKey.attachment();
+ selectionKey.cancel();
+ if (reader != null) {
+ while (!reader.isClosed()) {
+ try {
+ Thread.sleep(channelReaderFrequencyMSecs);
+ } catch (InterruptedException e) {
+ }
+ }
+ final ScheduledFuture<?> readerFuture = reader.getScheduledFuture();
+ readerFuture.cancel(false);
+ }
+ IOUtils.closeQuietly(selectionKey.channel()); // should already be closed via reader, but if reader did not exist...
+ }
+ IOUtils.closeQuietly(socketChannelSelector);
+
+ for (SelectionKey selectionKey : serverSocketSelector.keys()) {
+ selectionKey.cancel();
+ IOUtils.closeQuietly(selectionKey.channel());
+ }
+ IOUtils.closeQuietly(serverSocketSelector);
+ executor.shutdown();
+ try {
+ executor.awaitTermination(period, timeUnit);
+ } catch (final InterruptedException ex) {
+ LOGGER.warn("Interrupted while trying to shutdown executor");
+ }
+ final int currentBufferPoolSize = bufferPool.size();
+ final String warning = (currentBufferPoolSize != initialBufferPoolSize) ? "Initial buffer count=" + initialBufferPoolSize
+ + " Current buffer count=" + currentBufferPoolSize
+ + " Could indicate a buffer leak. Ensure all consumers are executed until they complete." : "";
+ LOGGER.info("Channel listener shutdown. " + warning);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
new file mode 100644
index 0000000..1eb5c7e
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+/**
+ *
+ * @author none
+ */
+public final class DatagramChannelReader extends AbstractChannelReader {
+
+ public static final int MAX_UDP_PACKET_SIZE = 65507;
+
+ public DatagramChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
+ super(id, key, empties, consumerFactory);
+ }
+
+ /**
+ * Will receive UDP data from channel and won't receive anything unless the
+ * given buffer has enough space for at least one full max udp packet.
+ *
+ * @param key
+ * @param buffer
+ * @return
+ * @throws IOException
+ */
+ @Override
+ protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
+ final DatagramChannel dChannel = (DatagramChannel) key.channel();
+ final int initialBufferPosition = buffer.position();
+ while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && key.isReadable()) {
+ if (dChannel.receive(buffer) == null) {
+ break;
+ }
+ }
+ return buffer.position() - initialBufferPosition;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
new file mode 100644
index 0000000..db2c102
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+/**
+ *
+ * @author none
+ */
+public final class SocketChannelReader extends AbstractChannelReader {
+
+ public SocketChannelReader(final String id, final SelectionKey key, final BufferPool empties, final StreamConsumerFactory consumerFactory) {
+ super(id, key, empties, consumerFactory);
+ }
+
+ /**
+ * Receives TCP data from the socket channel for the given key.
+ *
+ * @param key
+ * @param buffer
+ * @return
+ * @throws IOException
+ */
+ @Override
+ protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) throws IOException {
+ int bytesRead = 0;
+ final SocketChannel sChannel = (SocketChannel) key.channel();
+ while (key.isValid() && key.isReadable()) {
+ bytesRead = sChannel.read(buffer);
+ if (bytesRead <= 0) {
+ break;
+ }
+ }
+ return bytesRead;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
new file mode 100644
index 0000000..fce59c6
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.consumer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.nifi.io.nio.BufferPool;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+/**
+ *
+ * @author none
+ */
+public abstract class AbstractStreamConsumer implements StreamConsumer {
+
+ private final String uniqueId;
+ private BufferPool bufferPool = null;
+ private final BlockingQueue<ByteBuffer> filledBuffers = new LinkedBlockingQueue<>();
+ private final AtomicBoolean streamEnded = new AtomicBoolean(false);
+ private final AtomicBoolean consumerEnded = new AtomicBoolean(false);
+
+ public AbstractStreamConsumer(final String id) {
+ uniqueId = id;
+ }
+
+ @Override
+ public final void setReturnBufferQueue(final BufferPool returnQueue) {
+ bufferPool = returnQueue;
+ }
+
+ @Override
+ public final void addFilledBuffer(final ByteBuffer buffer) {
+ if (isConsumerFinished()) {
+ buffer.clear();
+ bufferPool.returnBuffer(buffer, buffer.remaining());
+ } else {
+ filledBuffers.add(buffer);
+ }
+ }
+
+ @Override
+ public final void process() throws IOException {
+ if (isConsumerFinished()) {
+ return;
+ }
+ if (streamEnded.get() && filledBuffers.isEmpty()) {
+ consumerEnded.set(true);
+ onConsumerDone();
+ return;
+ }
+ final ByteBuffer buffer = filledBuffers.poll();
+ if (buffer != null) {
+ final int bytesToProcess = buffer.remaining();
+ try {
+ processBuffer(buffer);
+ } finally {
+ buffer.clear();
+ bufferPool.returnBuffer(buffer, bytesToProcess);
+ }
+ }
+ }
+
+ protected abstract void processBuffer(ByteBuffer buffer) throws IOException;
+
+ @Override
+ public final void signalEndOfStream() {
+ streamEnded.set(true);
+ }
+
+ /**
+ * Convenience method that is called when the consumer is done processing
+ * based on being told the signal is end of stream and has processed all
+ * given buffers.
+ */
+ protected void onConsumerDone() {
+ }
+
+ @Override
+ public final boolean isConsumerFinished() {
+ return consumerEnded.get();
+ }
+
+ @Override
+ public final String getId() {
+ return uniqueId;
+ }
+
+ @Override
+ public final boolean equals(final Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj == this) {
+ return true;
+ }
+ if (obj.getClass() != getClass()) {
+ return false;
+ }
+ AbstractStreamConsumer rhs = (AbstractStreamConsumer) obj;
+ return new EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, rhs.uniqueId).isEquals();
+ }
+
+ @Override
+ public final int hashCode() {
+ return new HashCodeBuilder(19, 23).append(uniqueId).toHashCode();
+ }
+
+ @Override
+ public final String toString() {
+ return new ToStringBuilder(this, ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
new file mode 100644
index 0000000..d75b7d7
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.consumer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.nifi.io.nio.BufferPool;
+
+/**
+ * A StreamConsumer must be thread safe. It may be accessed concurrently by a
+ * thread providing data to process and another thread that is processing that
+ * data.
+ *
+ * @author none
+ */
+public interface StreamConsumer {
+
+ /**
+ * Will be called once just after construction. It provides the queue to
+ * which processed and emptied and cleared buffers must be returned. For
+ * each time <code>addFilledBuffer</code> is called there should be an
+ * associated add to this given queue. If not, buffers will run out and all
+ * stream processing will halt. READ THIS!!!
+ *
+ * @param returnQueue
+ */
+ void setReturnBufferQueue(BufferPool returnQueue);
+
+ /**
+ * Will be called by the thread that produces byte buffers with available
+ * data to be processed. If the consumer is finished this should simply
+ * return the given buffer to the return buffer queue (after it is cleared)
+ *
+ * @param buffer
+ */
+ void addFilledBuffer(ByteBuffer buffer);
+
+ /**
+ * Will be called by the thread that executes the consumption of data. May
+ * be called many times though once <code>isConsumerFinished</code> returns
+ * true this method will likely do nothing.
+ * @throws java.io.IOException
+ */
+ void process() throws IOException;
+
+ /**
+ * Called once the end of the input stream is detected
+ */
+ void signalEndOfStream();
+
+ /**
+ * If true signals the consumer is done consuming data and will not process
+ * any more buffers.
+ *
+ * @return
+ */
+ boolean isConsumerFinished();
+
+ /**
+ * Uniquely identifies the consumer
+ *
+ * @return
+ */
+ String getId();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
new file mode 100644
index 0000000..df298d5
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.consumer;
+
+/**
+ *
+ * @author none
+ */
+public interface StreamConsumerFactory {
+
+ StreamConsumer newInstance(String streamId);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
----------------------------------------------------------------------
diff --git a/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
new file mode 100644
index 0000000..7ed5ad4
--- /dev/null
+++ b/commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.nifi.util.NiFiProperties;
+
+public class SSLContextFactory {
+
+ private final String keystore;
+ private final char[] keystorePass;
+ private final String keystoreType;
+ private final String truststore;
+ private final char[] truststorePass;
+ private final String truststoreType;
+
+ private final KeyManager[] keyManagers;
+ private final TrustManager[] trustManagers;
+
+ public SSLContextFactory(final NiFiProperties properties) throws NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException, KeyStoreException, UnrecoverableKeyException {
+ keystore = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE);
+ keystorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD));
+ keystoreType = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE);
+
+ truststore = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE);
+ truststorePass = getPass(properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD));
+ truststoreType = properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE);
+
+ // prepare the keystore
+ final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+ keyStore.load(new FileInputStream(keystore), keystorePass);
+ final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ keyManagerFactory.init(keyStore, keystorePass);
+
+ // prepare the truststore
+ final KeyStore trustStore = KeyStore.getInstance(truststoreType);
+ trustStore.load(new FileInputStream(truststore), truststorePass);
+ final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(trustStore);
+
+ keyManagers = keyManagerFactory.getKeyManagers();
+ trustManagers = trustManagerFactory.getTrustManagers();
+ }
+
+ private static char[] getPass(final String password) {
+ return password == null ? null : password.toCharArray();
+ }
+
+ /**
+ * Creates a SSLContext instance using the given information.
+ *
+ *
+ * @return a SSLContext instance
+ * @throws java.security.KeyStoreException
+ * @throws java.io.IOException
+ * @throws java.security.NoSuchAlgorithmException
+ * @throws java.security.cert.CertificateException
+ * @throws java.security.UnrecoverableKeyException
+ * @throws java.security.KeyManagementException
+ */
+ public SSLContext createSslContext() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException,
+ UnrecoverableKeyException, KeyManagementException {
+
+ // initialize the ssl context
+ final SSLContext sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(keyManagers, trustManagers, new SecureRandom());
+ sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
+
+ return sslContext;
+
+ }
+}