You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/10/18 11:45:57 UTC

[1/2] flume git commit: FLUME-3182 add support for SSL/TLS for syslog (tcp) sources

Repository: flume
Updated Branches:
  refs/heads/trunk 327a43dbe -> 965e13264


http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index 9bcdf51..d7a69c9 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -33,6 +33,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
@@ -65,7 +66,6 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
-import org.apache.flume.util.SSLUtil;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@@ -80,7 +80,7 @@ import org.slf4j.LoggerFactory;
  * The connections are intended to be opened before clients are given access so
  * that the object cannot ever be in an inconsistent when exposed to users.
  */
-public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient {
+public class NettyAvroRpcClient extends SSLContextAwareAbstractRpcClient {
 
   private ExecutorService callTimeoutPool;
   private final ReentrantLock stateLock = new ReentrantLock();
@@ -91,12 +91,6 @@ public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient {
   private ConnState connState;
 
   private InetSocketAddress address;
-  private boolean enableSsl;
-  private boolean trustAllCerts;
-  private String truststore;
-  private String truststorePassword;
-  private String truststoreType;
-  private final List<String> excludeProtocols = new LinkedList<String>();
 
   private Transceiver transceiver;
   private AvroSourceProtocol.Callback avroClient;
@@ -147,13 +141,14 @@ public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient {
             bossExecutor, workerExecutor,
             enableDeflateCompression, enableSsl, trustAllCerts,
             compressionLevel, truststore, truststorePassword, truststoreType,
-            excludeProtocols, maxIoWorkers);
+            excludeProtocols, includeProtocols, excludeCipherSuites, includeCipherSuites,
+            maxIoWorkers);
         } else {
           socketChannelFactory = new SSLCompressionChannelFactory(
             bossExecutor, workerExecutor,
             enableDeflateCompression, enableSsl, trustAllCerts,
             compressionLevel, truststore, truststorePassword, truststoreType,
-            excludeProtocols);
+            excludeProtocols, includeProtocols, excludeCipherSuites, includeCipherSuites);
         }
       } else {
         if (maxIoWorkers >= 1) {
@@ -582,28 +577,7 @@ public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient {
       }
     }
 
-    enableSsl = Boolean.parseBoolean(properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_SSL));
-    trustAllCerts = Boolean.parseBoolean(properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_TRUST_ALL_CERTS));
-    truststore = properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_TRUSTSTORE, SSLUtil.getGlobalTruststorePath());
-    truststorePassword = properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD,
-        SSLUtil.getGlobalTruststorePassword());
-    truststoreType = properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE,
-        SSLUtil.getGlobalTruststoreType("JKS"));
-    String excludeProtocolsStr = properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS);
-    if (excludeProtocolsStr == null) {
-      excludeProtocols.add("SSLv3");
-    } else {
-      excludeProtocols.addAll(Arrays.asList(excludeProtocolsStr.split(" ")));
-      if (!excludeProtocols.contains("SSLv3")) {
-        excludeProtocols.add("SSLv3");
-      }
-    }
+    configureSSL(properties);
 
     String maxIoWorkersStr = properties.getProperty(RpcClientConfigurationConstants.MAX_IO_WORKERS);
     if (!StringUtils.isEmpty(maxIoWorkersStr)) {
@@ -669,12 +643,16 @@ public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient {
     private final String truststore;
     private final String truststorePassword;
     private final String truststoreType;
-    private final List<String> excludeProtocols;
+    private final Set<String> excludeProtocols;
+    private final Set<String> includeProtocols;
+    private final Set<String> excludeCipherSuites;
+    private final Set<String> includeCipherSuites;
 
     public SSLCompressionChannelFactory(Executor bossExecutor, Executor workerExecutor,
         boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
         int compressionLevel, String truststore, String truststorePassword,
-        String truststoreType, List<String> excludeProtocols) {
+        String truststoreType, Set<String> excludeProtocols, Set<String> includeProtocols,
+        Set<String> excludeCipherSuites, Set<String> includeCipherSuites) {
       super(bossExecutor, workerExecutor);
       this.enableCompression = enableCompression;
       this.enableSsl = enableSsl;
@@ -684,12 +662,16 @@ public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient {
       this.truststorePassword = truststorePassword;
       this.truststoreType = truststoreType;
       this.excludeProtocols = excludeProtocols;
+      this.includeProtocols = includeProtocols;
+      this.excludeCipherSuites = excludeCipherSuites;
+      this.includeCipherSuites = includeCipherSuites;
     }
 
     public SSLCompressionChannelFactory(Executor bossExecutor, Executor workerExecutor,
         boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
         int compressionLevel, String truststore, String truststorePassword,
-        String truststoreType, List<String> excludeProtocols, int maxIOWorkers) {
+        String truststoreType, Set<String> excludeProtocols, Set<String> includeProtocols,
+        Set<String> excludeCipherSuites, Set<String> includeCipherSuites, int maxIOWorkers) {
       super(bossExecutor, workerExecutor, maxIOWorkers);
       this.enableCompression = enableCompression;
       this.enableSsl = enableSsl;
@@ -699,6 +681,9 @@ public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient {
       this.truststorePassword = truststorePassword;
       this.truststoreType = truststoreType;
       this.excludeProtocols = excludeProtocols;
+      this.includeProtocols = includeProtocols;
+      this.excludeCipherSuites = excludeCipherSuites;
+      this.includeCipherSuites = includeCipherSuites;
     }
 
     @Override
@@ -736,15 +721,29 @@ public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient {
           sslContext.init(null, managers, null);
           SSLEngine sslEngine = sslContext.createSSLEngine();
           sslEngine.setUseClientMode(true);
+
           List<String> enabledProtocols = new ArrayList<String>();
           for (String protocol : sslEngine.getEnabledProtocols()) {
-            if (!excludeProtocols.contains(protocol)) {
+            if ((includeProtocols.isEmpty() || includeProtocols.contains(protocol))
+                && !excludeProtocols.contains(protocol)) {
               enabledProtocols.add(protocol);
             }
           }
           sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
+
+          List<String> enabledCipherSuites = new ArrayList<String>();
+          for (String suite : sslEngine.getEnabledCipherSuites()) {
+            if ((includeCipherSuites.isEmpty() || includeCipherSuites.contains(suite))
+                && !excludeCipherSuites.contains(suite)) {
+              enabledCipherSuites.add(suite);
+            }
+          }
+          sslEngine.setEnabledCipherSuites(enabledCipherSuites.toArray(new String[0]));
+
           logger.info("SSLEngine protocols enabled: " +
               Arrays.asList(sslEngine.getEnabledProtocols()));
+          logger.info("SSLEngine cipher suites enabled: " +
+              Arrays.asList(sslEngine.getEnabledProtocols()));
           // addFirst() will make SSL handling the first stage of decoding
           // and the last stage of encoding this must be added after
           // adding compression handling above

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index d83cf19..228ae66 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -144,6 +144,9 @@ public final class RpcClientConfigurationConstants {
   public static final String CONFIG_TRUSTSTORE_PASSWORD = "truststore-password";
   public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type";
   public static final String CONFIG_EXCLUDE_PROTOCOLS = "exclude-protocols";
+  public static final String CONFIG_INCLUDE_PROTOCOLS = "include-protocols";
+  public static final String CONFIG_EXCLUDE_CIPHER_SUITES = "exclude-cipher-suites";
+  public static final String CONFIG_INCLUDE_CIPHER_SUITES = "include-cipher-suites";
 
   public static final String KERBEROS_KEY = "kerberos";
 

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-sdk/src/main/java/org/apache/flume/api/SSLContextAwareAbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/SSLContextAwareAbstractRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/SSLContextAwareAbstractRpcClient.java
new file mode 100644
index 0000000..34557f6
--- /dev/null
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/SSLContextAwareAbstractRpcClient.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.api;
+
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.flume.FlumeException;
+import org.apache.flume.util.SSLUtil;
+
+public abstract class SSLContextAwareAbstractRpcClient extends AbstractRpcClient {
+  protected boolean enableSsl;
+  protected boolean trustAllCerts;
+  protected String truststore;
+  protected String truststorePassword;
+  protected String truststoreType;
+  protected final Set<String> excludeProtocols = new LinkedHashSet<>(Arrays.asList("SSLv3"));
+  protected final Set<String> includeProtocols = new LinkedHashSet<>();
+  protected final Set<String> excludeCipherSuites = new LinkedHashSet<>();
+  protected final Set<String> includeCipherSuites = new LinkedHashSet<>();
+
+  protected void configureSSL(Properties properties) throws FlumeException {
+    enableSsl = Boolean.parseBoolean(properties.getProperty(
+      RpcClientConfigurationConstants.CONFIG_SSL));
+    trustAllCerts = Boolean.parseBoolean(properties.getProperty(
+      RpcClientConfigurationConstants.CONFIG_TRUST_ALL_CERTS));
+    truststore = properties.getProperty(
+      RpcClientConfigurationConstants.CONFIG_TRUSTSTORE, SSLUtil.getGlobalTruststorePath());
+    truststorePassword = properties.getProperty(
+      RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD,
+      SSLUtil.getGlobalTruststorePassword());
+    truststoreType = properties.getProperty(
+      RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE,
+      SSLUtil.getGlobalTruststoreType("JKS"));
+    parseList(properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS,
+        SSLUtil.getGlobalExcludeProtocols()),
+        excludeProtocols);
+    parseList(properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_INCLUDE_PROTOCOLS,
+        SSLUtil.getGlobalIncludeProtocols()),
+        includeProtocols);
+    parseList(properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_EXCLUDE_CIPHER_SUITES,
+        SSLUtil.getGlobalExcludeCipherSuites()),
+        excludeCipherSuites);
+    parseList(properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_INCLUDE_CIPHER_SUITES,
+        SSLUtil.getGlobalIncludeCipherSuites()),
+        includeCipherSuites);
+  }
+
+  private void parseList(String value, Set<String> set) {
+    if (Objects.nonNull(value)) {
+      set.addAll(Arrays.asList(value.split(" ")));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
index 0048e61..26ccba8 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
@@ -24,7 +24,6 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.thrift.Status;
 import org.apache.flume.thrift.ThriftFlumeEvent;
 import org.apache.flume.thrift.ThriftSourceProtocol;
-import org.apache.flume.util.SSLUtil;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.transport.TFastFramedTransport;
@@ -41,7 +40,6 @@ import java.io.FileInputStream;
 import java.nio.ByteBuffer;
 import java.security.KeyStore;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -63,7 +61,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-public class ThriftRpcClient extends AbstractRpcClient {
+public class ThriftRpcClient extends SSLContextAwareAbstractRpcClient {
   private static final Logger LOGGER = LoggerFactory.getLogger(ThriftRpcClient.class);
 
   /**
@@ -84,12 +82,6 @@ public class ThriftRpcClient extends AbstractRpcClient {
   private final Random random = new Random();
   private String protocol;
 
-  private boolean enableSsl;
-  private String truststore;
-  private String truststorePassword;
-  private String truststoreType;
-  private final List<String> excludeProtocols = new LinkedList<String>();
-
   public ThriftRpcClient() {
     stateLock = new ReentrantLock(true);
     connState = State.INIT;
@@ -314,29 +306,7 @@ public class ThriftRpcClient extends AbstractRpcClient {
         connectionPoolSize = RpcClientConfigurationConstants
             .DEFAULT_CONNECTION_POOL_SIZE;
       }
-
-      enableSsl = Boolean.parseBoolean(properties.getProperty(
-          RpcClientConfigurationConstants.CONFIG_SSL));
-      if (enableSsl) {
-        truststore = properties.getProperty(
-            RpcClientConfigurationConstants.CONFIG_TRUSTSTORE, SSLUtil.getGlobalTruststorePath());
-        truststorePassword = properties.getProperty(
-            RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD,
-            SSLUtil.getGlobalTruststorePassword());
-        truststoreType = properties.getProperty(
-            RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE,
-            SSLUtil.getGlobalTruststoreType("JKS"));
-        String excludeProtocolsStr = properties.getProperty(
-            RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS);
-        if (excludeProtocolsStr == null) {
-          excludeProtocols.add("SSLv3");
-        } else {
-          excludeProtocols.addAll(Arrays.asList(excludeProtocolsStr.split(" ")));
-          if (!excludeProtocols.contains("SSLv3")) {
-            excludeProtocols.add("SSLv3");
-          }
-        }
-      }
+      configureSSL(properties);
 
       connectionManager = new ConnectionPoolManager(connectionPoolSize);
       connState = State.READY;
@@ -385,7 +355,8 @@ public class ThriftRpcClient extends AbstractRpcClient {
 
         // Create the TSocket from that
         tsocket = createSSLSocket(
-            sslSockFactory, hostname, port, 120000, excludeProtocols);
+            sslSockFactory, hostname, port, 120000, excludeProtocols,
+            includeProtocols, excludeCipherSuites, includeCipherSuites);
       } else {
         tsocket = new TSocket(hostname, port);
       }
@@ -538,7 +509,8 @@ public class ThriftRpcClient extends AbstractRpcClient {
   }
 
   private static TSocket createSSLSocket(SSLSocketFactory factory, String host,
-                                         int port, int timeout, List<String> excludeProtocols)
+      int port, int timeout, Set<String> excludeProtocols, Set<String> includeProtocols,
+      Set<String> excludeCipherSuites, Set<String> includeCipherSuites)
       throws FlumeException {
     try {
       SSLSocket socket = (SSLSocket) factory.createSocket(host, port);
@@ -546,11 +518,22 @@ public class ThriftRpcClient extends AbstractRpcClient {
 
       List<String> enabledProtocols = new ArrayList<String>();
       for (String protocol : socket.getEnabledProtocols()) {
-        if (!excludeProtocols.contains(protocol)) {
+        if ((includeProtocols.isEmpty() || includeProtocols.contains(protocol))
+            && !excludeProtocols.contains(protocol)) {
           enabledProtocols.add(protocol);
         }
       }
       socket.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
+
+      List<String> enabledCipherSuites = new ArrayList<String>();
+      for (String suite : socket.getEnabledCipherSuites()) {
+        if ((includeCipherSuites.isEmpty() || includeCipherSuites.contains(suite))
+            && !excludeCipherSuites.contains(suite)) {
+          enabledCipherSuites.add(suite);
+        }
+      }
+      socket.setEnabledCipherSuites(enabledCipherSuites.toArray(new String[0]));
+
       return new TSocket(socket);
     } catch (Exception e) {
       throw new FlumeException("Could not connect to " + host + " on port " + port, e);

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-sdk/src/main/java/org/apache/flume/util/SSLUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/util/SSLUtil.java b/flume-ng-sdk/src/main/java/org/apache/flume/util/SSLUtil.java
index 02fe8ed..04a7d5c 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/util/SSLUtil.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/util/SSLUtil.java
@@ -31,6 +31,10 @@ public class SSLUtil {
   private static final String SYS_PROP_TRUSTSTORE_PATH = "javax.net.ssl.trustStore";
   private static final String SYS_PROP_TRUSTSTORE_PASSWORD = "javax.net.ssl.trustStorePassword";
   private static final String SYS_PROP_TRUSTSTORE_TYPE = "javax.net.ssl.trustStoreType";
+  private static final String SYS_PROP_INCLUDE_PROTOCOLS = "flume.ssl.include.protocols";
+  private static final String SYS_PROP_EXCLUDE_PROTOCOLS = "flume.ssl.exclude.protocols";
+  private static final String SYS_PROP_INCLUDE_CIPHERSUITES = "flume.ssl.include.cipherSuites";
+  private static final String SYS_PROP_EXCLUDE_CIPHERSUITES = "flume.ssl.exclude.cipherSuites";
 
   private static final String ENV_VAR_KEYSTORE_PATH = "FLUME_SSL_KEYSTORE_PATH";
   private static final String ENV_VAR_KEYSTORE_PASSWORD = "FLUME_SSL_KEYSTORE_PASSWORD";
@@ -38,6 +42,10 @@ public class SSLUtil {
   private static final String ENV_VAR_TRUSTSTORE_PATH = "FLUME_SSL_TRUSTSTORE_PATH";
   private static final String ENV_VAR_TRUSTSTORE_PASSWORD = "FLUME_SSL_TRUSTSTORE_PASSWORD";
   private static final String ENV_VAR_TRUSTSTORE_TYPE = "FLUME_SSL_TRUSTSTORE_TYPE";
+  private static final String ENV_VAR_INCLUDE_PROTOCOLS  = "FLUME_SSL_INCLUDE_PROTOCOLS";
+  private static final String ENV_VAR_EXCLUDE_PROTOCOLS = "FLUME_SSL_EXCLUDE_PROTOCOLS";
+  private static final String ENV_VAR_INCLUDE_CIPHERSUITES = "FLUME_SSL_INCLUDE_CIPHERSUITES";
+  private static final String ENV_VAR_EXCLUDE_CIPHERSUITES = "FLUME_SSL_EXCLUDE_CIPHERSUITES";
 
   private static final String DESCR_KEYSTORE_PATH = "keystore path";
   private static final String DESCR_KEYSTORE_PASSWORD = "keystore password";
@@ -45,6 +53,10 @@ public class SSLUtil {
   private static final String DESCR_TRUSTSTORE_PATH = "truststore path";
   private static final String DESCR_TRUSTSTORE_PASSWORD = "truststore password";
   private static final String DESCR_TRUSTSTORE_TYPE = "truststore type";
+  private static final String DESCR_INCLUDE_PROTOCOLS = "include protocols";
+  private static final String DESCR_EXCLUDE_PROTOCOLS = "exclude protocols";
+  private static final String DESCR_INCLUDE_CIPHERSUITES = "include cipher suites";
+  private static final String DESCR_EXCLUDE_CIPHERSUITES = "exclude cipher suites";
 
   public static void initGlobalSSLParameters() {
     initSysPropFromEnvVar(
@@ -59,6 +71,14 @@ public class SSLUtil {
         SYS_PROP_TRUSTSTORE_PASSWORD, ENV_VAR_TRUSTSTORE_PASSWORD, DESCR_TRUSTSTORE_PASSWORD);
     initSysPropFromEnvVar(
         SYS_PROP_TRUSTSTORE_TYPE, ENV_VAR_TRUSTSTORE_TYPE, DESCR_TRUSTSTORE_TYPE);
+    initSysPropFromEnvVar(
+        SYS_PROP_INCLUDE_PROTOCOLS, ENV_VAR_INCLUDE_PROTOCOLS, DESCR_INCLUDE_PROTOCOLS);
+    initSysPropFromEnvVar(
+        SYS_PROP_EXCLUDE_PROTOCOLS, ENV_VAR_EXCLUDE_PROTOCOLS, DESCR_EXCLUDE_PROTOCOLS);
+    initSysPropFromEnvVar(
+        SYS_PROP_INCLUDE_CIPHERSUITES, ENV_VAR_INCLUDE_CIPHERSUITES, DESCR_INCLUDE_CIPHERSUITES);
+    initSysPropFromEnvVar(
+        SYS_PROP_EXCLUDE_CIPHERSUITES, ENV_VAR_EXCLUDE_CIPHERSUITES, DESCR_EXCLUDE_CIPHERSUITES);
   }
 
   private static void initSysPropFromEnvVar(String sysPropName, String envVarName,
@@ -103,4 +123,24 @@ public class SSLUtil {
     return sysPropValue != null ? sysPropValue : defaultValue;
   }
 
+  public static String getGlobalExcludeProtocols() {
+    return normalizeProperty(SYS_PROP_EXCLUDE_PROTOCOLS);
+  }
+
+  public static String getGlobalIncludeProtocols() {
+    return normalizeProperty(SYS_PROP_INCLUDE_PROTOCOLS);
+  }
+
+  public static String getGlobalExcludeCipherSuites() {
+    return normalizeProperty(SYS_PROP_EXCLUDE_CIPHERSUITES);
+  }
+
+  public static String getGlobalIncludeCipherSuites() {
+    return normalizeProperty(SYS_PROP_INCLUDE_CIPHERSUITES);
+  }
+
+  private static String normalizeProperty(String name) {
+    String property = System.getProperty(name);
+    return property == null ? null : property.replaceAll(",", " ");
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-sdk/src/test/java/org/apache/flume/util/AbstractSSLUtilListTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/AbstractSSLUtilListTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/AbstractSSLUtilListTest.java
new file mode 100644
index 0000000..a9e3b6f
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/AbstractSSLUtilListTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.runners.Parameterized.Parameters;
+
+public abstract class AbstractSSLUtilListTest extends AbstractSSLUtilTest {
+  @Parameters
+  public static Collection<?> data() {
+    return Arrays.asList(new Object[][]{
+      // system property value, environment variable value, expected value
+      { null, null, null },
+      { "sysprop", null, "sysprop" },
+      { "sysprop,sysprop", null, "sysprop sysprop" },
+      { null, "envvar", "envvar" },
+      { null, "envvar,envvar", "envvar envvar" },
+      { "sysprop", "envvar", "sysprop" },
+      { "sysprop,sysprop", "envvar,envvar", "sysprop sysprop" }
+    });
+  }
+
+  protected AbstractSSLUtilListTest(String sysPropValue, String envVarValue, String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilExcludeCipherSuitesTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilExcludeCipherSuitesTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilExcludeCipherSuitesTest.java
new file mode 100644
index 0000000..f176c5e
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilExcludeCipherSuitesTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SSLUtilExcludeCipherSuitesTest extends AbstractSSLUtilListTest {
+
+  public SSLUtilExcludeCipherSuitesTest(
+      String sysPropValue, String envVarValue, String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+
+  @Override
+  protected String getSysPropName() {
+    return "flume.ssl.exclude.cipherSuites";
+  }
+
+  @Override
+  protected String getEnvVarName() {
+    return "FLUME_SSL_EXCLUDE_CIPHERSUITES";
+  }
+
+  @Test
+  public void testIncludeProtocols() {
+    SSLUtil.initGlobalSSLParameters();
+    String actualValue = SSLUtil.getGlobalExcludeCipherSuites();
+
+    Assert.assertEquals(expectedValue, actualValue);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilExcludeProtocolsTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilExcludeProtocolsTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilExcludeProtocolsTest.java
new file mode 100644
index 0000000..8e66dfd
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilExcludeProtocolsTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SSLUtilExcludeProtocolsTest extends AbstractSSLUtilListTest {
+
+  public SSLUtilExcludeProtocolsTest(
+      String sysPropValue, String envVarValue, String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+
+  @Override
+  protected String getSysPropName() {
+    return "flume.ssl.exclude.protocols";
+  }
+
+  @Override
+  protected String getEnvVarName() {
+    return "FLUME_SSL_EXCLUDE_PROTOCOLS";
+  }
+
+  @Test
+  public void testExcludeProtocols() {
+    SSLUtil.initGlobalSSLParameters();
+    String actualValue = SSLUtil.getGlobalExcludeProtocols();
+
+    Assert.assertEquals(expectedValue, actualValue);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilIncludeCipherSuitesTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilIncludeCipherSuitesTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilIncludeCipherSuitesTest.java
new file mode 100644
index 0000000..7625c23
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilIncludeCipherSuitesTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SSLUtilIncludeCipherSuitesTest extends AbstractSSLUtilListTest {
+
+  public SSLUtilIncludeCipherSuitesTest(
+      String sysPropValue, String envVarValue, String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+
+  @Override
+  protected String getSysPropName() {
+    return "flume.ssl.include.cipherSuites";
+  }
+
+  @Override
+  protected String getEnvVarName() {
+    return "FLUME_SSL_INCLUDE_CIPHERSUITES";
+  }
+
+  @Test
+  public void testIncludeProtocols() {
+    SSLUtil.initGlobalSSLParameters();
+    String actualValue = SSLUtil.getGlobalIncludeCipherSuites();
+
+    Assert.assertEquals(expectedValue, actualValue);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilIncludeProtocolsTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilIncludeProtocolsTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilIncludeProtocolsTest.java
new file mode 100644
index 0000000..597dba6
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilIncludeProtocolsTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SSLUtilIncludeProtocolsTest extends AbstractSSLUtilListTest {
+
+  public SSLUtilIncludeProtocolsTest(
+      String sysPropValue, String envVarValue, String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+
+  @Override
+  protected String getSysPropName() {
+    return "flume.ssl.include.protocols";
+  }
+
+  @Override
+  protected String getEnvVarName() {
+    return "FLUME_SSL_INCLUDE_PROTOCOLS";
+  }
+
+  @Test
+  public void testIncludeProtocols() {
+    SSLUtil.initGlobalSSLParameters();
+    String actualValue = SSLUtil.getGlobalIncludeProtocols();
+
+    Assert.assertEquals(expectedValue, actualValue);
+  }
+
+}


[2/2] flume git commit: FLUME-3182 add support for SSL/TLS for syslog (tcp) sources

Posted by sz...@apache.org.
FLUME-3182 add support for SSL/TLS for syslog (tcp) sources

Adding SSL/TLS support for syslog and multi port syslog sources
This change also contains some refactoring to avoid code duplication in
sources with SSL/TLS support

As SSL/TLS handling is refactored it is convinient to
add the possibility to specify enabled protocol list FLUME-3275 and
add the possibility to specify enabled cipher suite list FLUME-3276

This closes #230

Reviewers: Peter Turcsanyi, Endre Major

(Ferenc Szabo via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/965e1326
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/965e1326
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/965e1326

Branch: refs/heads/trunk
Commit: 965e13264a304d500540593288f65157cc6fdd55
Parents: 327a43d
Author: Ferenc Szabo <sz...@apache.org>
Authored: Thu Oct 18 13:44:21 2018 +0200
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Thu Oct 18 13:44:21 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flume/source/AvroSource.java     | 114 +-------
 .../flume/source/MultiportSyslogTCPSource.java  |  17 +-
 .../source/SslContextAwareAbstractSource.java   | 199 ++++++++++++++
 .../apache/flume/source/SyslogTcpSource.java    |  62 +++--
 .../org/apache/flume/source/ThriftSource.java   |  75 +-----
 .../apache/flume/source/http/HTTPSource.java    |  83 +++---
 .../http/HTTPSourceConfigurationConstants.java  |   4 +
 .../source/TestMultiportSyslogTCPSource.java    | 105 +++++++-
 .../flume/source/TestSyslogTcpSource.java       |  74 +++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 266 ++++++++++++-------
 .../apache/flume/api/NettyAvroRpcClient.java    |  71 +++--
 .../api/RpcClientConfigurationConstants.java    |   3 +
 .../api/SSLContextAwareAbstractRpcClient.java   |  77 ++++++
 .../org/apache/flume/api/ThriftRpcClient.java   |  53 ++--
 .../java/org/apache/flume/util/SSLUtil.java     |  40 +++
 .../flume/util/AbstractSSLUtilListTest.java     |  44 +++
 .../util/SSLUtilExcludeCipherSuitesTest.java    |  49 ++++
 .../flume/util/SSLUtilExcludeProtocolsTest.java |  49 ++++
 .../util/SSLUtilIncludeCipherSuitesTest.java    |  49 ++++
 .../flume/util/SSLUtilIncludeProtocolsTest.java |  49 ++++
 20 files changed, 1073 insertions(+), 410 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index e7b12bd..ac3d51d 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -19,7 +19,6 @@
 
 package org.apache.flume.source;
 
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.NettyTransceiver;
@@ -41,7 +40,6 @@ import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
-import org.apache.flume.util.SSLUtil;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
@@ -55,22 +53,19 @@ import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
-import java.io.FileInputStream;
+
 import java.net.InetSocketAddress;
-import java.security.KeyStore;
-import java.security.Security;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 /**
  * <p>
@@ -127,7 +122,7 @@ import java.util.concurrent.TimeUnit;
  * TODO
  * </p>
  */
-public class AvroSource extends AbstractSource implements EventDrivenSource,
+public class AvroSource extends SslContextAwareAbstractSource implements EventDrivenSource,
     Configurable, AvroSourceProtocol {
 
   private static final String THREADS = "threads";
@@ -138,21 +133,11 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   private static final String PORT_KEY = "port";
   private static final String BIND_KEY = "bind";
   private static final String COMPRESSION_TYPE = "compression-type";
-  private static final String SSL_KEY = "ssl";
   private static final String IP_FILTER_KEY = "ipFilter";
   private static final String IP_FILTER_RULES_KEY = "ipFilterRules";
-  private static final String KEYSTORE_KEY = "keystore";
-  private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
-  private static final String KEYSTORE_TYPE_KEY = "keystore-type";
-  private static final String EXCLUDE_PROTOCOLS = "exclude-protocols";
   private int port;
   private String bindAddress;
   private String compressionType;
-  private String keystore;
-  private String keystorePassword;
-  private String keystoreType;
-  private final List<String> excludeProtocols = new LinkedList<String>();
-  private boolean enableSsl = false;
   private boolean enableIpFilter;
   private String patternRuleConfigDefinition;
 
@@ -167,6 +152,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
 
   @Override
   public void configure(Context context) {
+    configureSsl(context);
     Configurables.ensureRequiredNonNull(context, PORT_KEY, BIND_KEY);
 
     port = context.getInteger(PORT_KEY);
@@ -180,35 +166,6 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
               context.getString(THREADS));
     }
 
-    enableSsl = context.getBoolean(SSL_KEY, false);
-    keystore = context.getString(KEYSTORE_KEY, SSLUtil.getGlobalKeystorePath());
-    keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY,
-        SSLUtil.getGlobalKeystorePassword());
-    keystoreType = context.getString(KEYSTORE_TYPE_KEY, SSLUtil.getGlobalKeystoreType("JKS"));
-    String excludeProtocolsStr = context.getString(EXCLUDE_PROTOCOLS);
-    if (excludeProtocolsStr == null) {
-      excludeProtocols.add("SSLv3");
-    } else {
-      excludeProtocols.addAll(Arrays.asList(excludeProtocolsStr.split(" ")));
-      if (!excludeProtocols.contains("SSLv3")) {
-        excludeProtocols.add("SSLv3");
-      }
-    }
-
-    if (enableSsl) {
-      Preconditions.checkNotNull(keystore,
-          KEYSTORE_KEY + " must be specified when SSL is enabled");
-      Preconditions.checkNotNull(keystorePassword,
-          KEYSTORE_PASSWORD_KEY + " must be specified when SSL is enabled");
-      try {
-        KeyStore ks = KeyStore.getInstance(keystoreType);
-        ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());
-      } catch (Exception ex) {
-        throw new FlumeException(
-            "Avro source configured with invalid keystore: " + keystore, ex);
-      }
-    }
-
     enableIpFilter = context.getBoolean(IP_FILTER_KEY, false);
     if (enableIpFilter) {
       patternRuleConfigDefinition = context.getString(IP_FILTER_RULES_KEY);
@@ -283,11 +240,10 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   private ChannelPipelineFactory initChannelPipelineFactory() {
     ChannelPipelineFactory pipelineFactory;
     boolean enableCompression = compressionType.equalsIgnoreCase("deflate");
-    if (enableCompression || enableSsl || enableIpFilter) {
+    if (enableCompression || isSslEnabled() || enableIpFilter) {
       pipelineFactory = new AdvancedChannelPipelineFactory(
-        enableCompression, enableSsl, keystore,
-        keystorePassword, keystoreType, enableIpFilter,
-        patternRuleConfigDefinition);
+        enableCompression, enableIpFilter,
+        patternRuleConfigDefinition, getSslEngineSupplier(false));
     } else {
       pipelineFactory = Channels::pipeline;
     }
@@ -455,52 +411,19 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
       implements ChannelPipelineFactory {
 
     private boolean enableCompression;
-    private boolean enableSsl;
-    private String keystore;
-    private String keystorePassword;
-    private String keystoreType;
 
     private boolean enableIpFilter;
     private String patternRuleConfigDefinition;
+    private Supplier<Optional<SSLEngine>> sslEngineSupplier;
 
-    public AdvancedChannelPipelineFactory(boolean enableCompression,
-        boolean enableSsl, String keystore, String keystorePassword,
-        String keystoreType, boolean enableIpFilter,
-        String patternRuleConfigDefinition) {
+    public AdvancedChannelPipelineFactory(boolean enableCompression, boolean enableIpFilter,
+        String patternRuleConfigDefinition, Supplier<Optional<SSLEngine>> sslEngineSupplier) {
       this.enableCompression = enableCompression;
-      this.enableSsl = enableSsl;
-      this.keystore = keystore;
-      this.keystorePassword = keystorePassword;
-      this.keystoreType = keystoreType;
       this.enableIpFilter = enableIpFilter;
       this.patternRuleConfigDefinition = patternRuleConfigDefinition;
+      this.sslEngineSupplier = sslEngineSupplier;
     }
 
-    private SSLContext createServerSSLContext() {
-      try {
-        KeyStore ks = KeyStore.getInstance(keystoreType);
-        ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());
-
-        // Set up key manager factory to use our key store
-        KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm());
-        kmf.init(ks, keystorePassword.toCharArray());
-
-        SSLContext serverContext = SSLContext.getInstance("TLS");
-        serverContext.init(kmf.getKeyManagers(), null, null);
-        return serverContext;
-      } catch (Exception e) {
-        throw new Error("Failed to initialize the server-side SSLContext", e);
-      }
-    }
-
-    private String getAlgorithm() {
-      String algorithm = Security.getProperty(
-          "ssl.KeyManagerFactory.algorithm");
-      if (algorithm == null) {
-        algorithm = "SunX509";
-      }
-      return algorithm;
-    }
 
     @Override
     public ChannelPipeline getPipeline() throws Exception {
@@ -511,23 +434,14 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
         pipeline.addFirst("inflater", new ZlibDecoder());
       }
 
-      if (enableSsl) {
-        SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
-        sslEngine.setUseClientMode(false);
-        List<String> enabledProtocols = new ArrayList<String>();
-        for (String protocol : sslEngine.getEnabledProtocols()) {
-          if (!excludeProtocols.contains(protocol)) {
-            enabledProtocols.add(protocol);
-          }
-        }
-        sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
+      sslEngineSupplier.get().ifPresent(sslEngine -> {
         logger.info("SSLEngine protocols enabled: " +
             Arrays.asList(sslEngine.getEnabledProtocols()));
         // addFirst() will make SSL handling the first stage of decoding
         // and the last stage of encoding this must be added after
         // adding compression handling above
         pipeline.addFirst("ssl", new SslHandler(sslEngine));
-      }
+      });
 
       if (enableIpFilter) {
 

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
index 82cb2f2..d6abd37 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
@@ -21,6 +21,7 @@ package org.apache.flume.source;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.charset.Charset;
@@ -30,6 +31,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+
+import javax.net.ssl.SSLParameters;
+
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
@@ -43,6 +47,7 @@ import org.apache.mina.core.buffer.IoBuffer;
 import org.apache.mina.core.service.IoHandlerAdapter;
 import org.apache.mina.core.session.IdleStatus;
 import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.ssl.SslFilter;
 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +55,7 @@ import org.slf4j.LoggerFactory;
 /**
  *
  */
-public class MultiportSyslogTCPSource extends AbstractSource implements
+public class MultiportSyslogTCPSource extends SslContextAwareAbstractSource implements
         EventDrivenSource, Configurable, BatchSizeSupported {
 
   public static final Logger logger = LoggerFactory.getLogger(
@@ -77,6 +82,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
 
   @Override
   public void configure(Context context) {
+    configureSsl(context);
     String portsStr = context.getString(
             SyslogSourceConfigurationConstants.CONFIG_PORTS);
 
@@ -161,6 +167,15 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
     } else {
       acceptor = new NioSocketAcceptor();
     }
+
+    getSslContextSupplier().get().ifPresent(sslContext -> {
+      SslFilter filter = new SslFilter(sslContext);
+      SSLParameters sslParameters = sslContext.getDefaultSSLParameters();
+      filter.setEnabledProtocols(getFilteredProtocols(sslParameters));
+      filter.setEnabledCipherSuites(getFilteredCipherSuites(sslParameters));
+      acceptor.getFilterChain().addFirst("ssl", filter);
+    });
+
     acceptor.setReuseAddress(true);
     acceptor.getSessionConfig().setReadBufferSize(readBufferSize);
     acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-core/src/main/java/org/apache/flume/source/SslContextAwareAbstractSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SslContextAwareAbstractSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SslContextAwareAbstractSource.java
new file mode 100644
index 0000000..f220ae2
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SslContextAwareAbstractSource.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.flume.source;
+
+import java.io.FileInputStream;
+import java.security.KeyStore;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.util.SSLUtil;
+
+public abstract class SslContextAwareAbstractSource extends AbstractSource {
+  private static final String SSL_ENABLED_KEY = "ssl";
+  private static final boolean SSL_ENABLED_DEFAULT_VALUE = false;
+  private static final String KEYSTORE_KEY = "keystore";
+  private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
+  private static final String KEYSTORE_TYPE_KEY = "keystore-type";
+  private static final String KEYSTORE_TYPE_DEFAULT_VALUE = "JKS";
+
+  private static final String EXCLUDE_PROTOCOLS = "exclude-protocols";
+  private static final String INCLUDE_PROTOCOLS = "include-protocols";
+
+  private static final String EXCLUDE_CIPHER_SUITES = "exclude-cipher-suites";
+  private static final String INCLUDE_CIPHER_SUITES = "include-cipher-suites";
+
+  private String keystore;
+  private String keystorePassword;
+  private String keystoreType;
+  private boolean sslEnabled = false;
+  private final Set<String> excludeProtocols = new LinkedHashSet<>(Arrays.asList("SSLv3"));
+  private final Set<String> includeProtocols = new LinkedHashSet<>();
+  private final Set<String> excludeCipherSuites = new LinkedHashSet<>();
+  private final Set<String> includeCipherSuites = new LinkedHashSet<>();
+
+
+  public String getKeystore() {
+    return keystore;
+  }
+
+  public String getKeystorePassword() {
+    return keystorePassword;
+  }
+
+  public String getKeystoreType() {
+    return keystoreType;
+  }
+
+  public Set<String> getExcludeProtocols() {
+    return excludeProtocols;
+  }
+
+  public Set<String> getIncludeProtocols() {
+    return includeProtocols;
+  }
+
+  public Set<String> getExcludeCipherSuites() {
+    return excludeCipherSuites;
+  }
+
+  public Set<String> getIncludeCipherSuites() {
+    return includeCipherSuites;
+  }
+
+  public boolean isSslEnabled() {
+    return sslEnabled;
+  }
+
+  protected void configureSsl(Context context) {
+    sslEnabled = context.getBoolean(SSL_ENABLED_KEY, SSL_ENABLED_DEFAULT_VALUE);
+    keystore = context.getString(KEYSTORE_KEY, SSLUtil.getGlobalKeystorePath());
+    keystorePassword = context.getString(
+        KEYSTORE_PASSWORD_KEY, SSLUtil.getGlobalKeystorePassword());
+    keystoreType = context.getString(
+        KEYSTORE_TYPE_KEY, SSLUtil.getGlobalKeystoreType(KEYSTORE_TYPE_DEFAULT_VALUE));
+
+    parseList(context.getString(EXCLUDE_PROTOCOLS, SSLUtil.getGlobalExcludeProtocols()),
+        excludeProtocols);
+    parseList(context.getString(INCLUDE_PROTOCOLS, SSLUtil.getGlobalIncludeProtocols()),
+        includeProtocols);
+    parseList(context.getString(EXCLUDE_CIPHER_SUITES, SSLUtil.getGlobalExcludeCipherSuites()),
+        excludeCipherSuites);
+    parseList(context.getString(INCLUDE_CIPHER_SUITES, SSLUtil.getGlobalIncludeCipherSuites()),
+        includeCipherSuites);
+
+    if (sslEnabled) {
+      Objects.requireNonNull(keystore,
+          KEYSTORE_KEY + " must be specified when SSL is enabled");
+      Objects.requireNonNull(keystorePassword,
+          KEYSTORE_PASSWORD_KEY + " must be specified when SSL is enabled");
+      try {
+        KeyStore ks = KeyStore.getInstance(keystoreType);
+        ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());
+      } catch (Exception ex) {
+        throw new FlumeException(
+          "Source " + getName() + " configured with invalid keystore: " + keystore, ex);
+      }
+    }
+  }
+
+  private Optional<SSLContext> getSslContext() {
+    if (sslEnabled) {
+      try {
+        KeyStore ks = KeyStore.getInstance(keystoreType);
+        ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());
+
+        // can be set with "ssl.KeyManagerFactory.algorithm"
+        String algorithm = KeyManagerFactory.getDefaultAlgorithm();
+        // Set up key manager factory to use our key store
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm);
+        kmf.init(ks, keystorePassword.toCharArray());
+
+        SSLContext serverContext = SSLContext.getInstance("TLS");
+        serverContext.init(kmf.getKeyManagers(), null, null);
+
+        return Optional.of(serverContext);
+      } catch (Exception e) {
+        throw new Error("Failed to initialize the server-side SSLContext", e);
+      }
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  private Optional<SSLEngine> getSslEngine(boolean useClientMode) {
+    return getSslContext().map(sslContext -> {
+      SSLEngine sslEngine = sslContext.createSSLEngine();
+      sslEngine.setUseClientMode(useClientMode);
+      sslEngine.setEnabledProtocols(
+          getFilteredProtocols(sslEngine.getEnabledProtocols()));
+      sslEngine.setEnabledCipherSuites(
+          getFilteredCipherSuites(sslEngine.getEnabledCipherSuites()));
+      return sslEngine;
+    });
+  }
+
+  protected Supplier<Optional<SSLContext>> getSslContextSupplier() {
+    return this::getSslContext;
+  }
+
+
+  protected Supplier<Optional<SSLEngine>> getSslEngineSupplier(boolean useClientMode) {
+    return () -> getSslEngine(useClientMode);
+  }
+
+  protected String[] getFilteredProtocols(SSLParameters sslParameters) {
+    return getFilteredProtocols(sslParameters.getProtocols());
+  }
+
+  private String[] getFilteredProtocols(String[] enabledProtocols) {
+    return Stream.of(enabledProtocols)
+      .filter(o -> includeProtocols.isEmpty() || includeProtocols.contains(o))
+      .filter(o -> !excludeProtocols.contains(o) )
+      .toArray(String[]::new);
+  }
+
+  protected String[] getFilteredCipherSuites(SSLParameters sslParameters) {
+    return getFilteredCipherSuites(sslParameters.getCipherSuites());
+  }
+
+  private String[] getFilteredCipherSuites(String[] enabledCipherSuites) {
+    return Stream.of(enabledCipherSuites)
+      .filter(o -> includeCipherSuites.isEmpty() || includeCipherSuites.contains(o))
+      .filter(o -> !excludeCipherSuites.contains(o))
+      .toArray(String[]::new);
+  }
+
+  private void parseList(String value, Set<String> set) {
+    if (Objects.nonNull(value)) {
+      set.addAll(Arrays.asList(value.split(" ")));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
index 1a0432c..067c21b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
@@ -21,9 +21,13 @@ package org.apache.flume.source;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import javax.net.ssl.SSLEngine;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flume.ChannelException;
@@ -44,6 +48,7 @@ import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,8 +56,8 @@ import org.slf4j.LoggerFactory;
  * @deprecated use {@link MultiportSyslogTCPSource} instead.
  */
 @Deprecated
-public class SyslogTcpSource extends AbstractSource
-                             implements EventDrivenSource, Configurable {
+public class SyslogTcpSource extends SslContextAwareAbstractSource
+    implements EventDrivenSource, Configurable {
   private static final Logger logger = LoggerFactory.getLogger(SyslogTcpSource.class);
 
   private int port;
@@ -113,17 +118,10 @@ public class SyslogTcpSource extends AbstractSource
         Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
 
     ServerBootstrap serverBootstrap = new ServerBootstrap(factory);
-    serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-      @Override
-      public ChannelPipeline getPipeline() {
-        syslogTcpHandler handler = new syslogTcpHandler();
-        handler.setEventSize(eventSize);
-        handler.setFormater(formaterProp);
-        handler.setKeepFields(keepFields);
-        return Channels.pipeline(handler);
-      }
-    });
 
+    serverBootstrap.setPipelineFactory(new PipelineFactory(
+        eventSize, formaterProp, keepFields, getSslEngineSupplier(false)
+    ));
     logger.info("Syslog TCP Source starting...");
 
     if (host == null) {
@@ -158,17 +156,18 @@ public class SyslogTcpSource extends AbstractSource
 
   @Override
   public void configure(Context context) {
+    configureSsl(context);
     Configurables.ensureRequiredNonNull(context,
         SyslogSourceConfigurationConstants.CONFIG_PORT);
     port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT);
     host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST);
     eventSize = context.getInteger("eventSize", SyslogUtils.DEFAULT_SIZE);
     formaterProp = context.getSubProperties(
-        SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
+      SyslogSourceConfigurationConstants.CONFIG_FORMAT_PREFIX);
     keepFields = SyslogUtils.chooseFieldsToKeep(
-        context.getString(
-            SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
-            SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
+      context.getString(
+        SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS,
+        SyslogSourceConfigurationConstants.DEFAULT_KEEP_FIELDS));
 
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
@@ -189,4 +188,35 @@ public class SyslogTcpSource extends AbstractSource
   SourceCounter getSourceCounter() {
     return sourceCounter;
   }
+
+  private class PipelineFactory implements ChannelPipelineFactory {
+    private final Integer eventSize;
+    private final Map<String, String> formaterProp;
+    private final Set<String> keepFields;
+    private Supplier<Optional<SSLEngine>> sslEngineSupplier;
+
+    public PipelineFactory(Integer eventSize, Map<String, String> formaterProp,
+        Set<String> keepFields, Supplier<Optional<SSLEngine>> sslEngineSupplier) {
+      this.eventSize = eventSize;
+      this.formaterProp = formaterProp;
+      this.keepFields = keepFields;
+      this.sslEngineSupplier = sslEngineSupplier;
+    }
+
+    @Override
+    public ChannelPipeline getPipeline() {
+      syslogTcpHandler handler = new syslogTcpHandler();
+      handler.setEventSize(eventSize);
+      handler.setFormater(formaterProp);
+      handler.setKeepFields(keepFields);
+
+      ChannelPipeline pipeline = Channels.pipeline(handler);
+
+      sslEngineSupplier.get().ifPresent(sslEngine -> {
+        pipeline.addFirst("ssl", new SslHandler(sslEngine));
+      });
+
+      return pipeline;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
index 637c42e..af33cf2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
@@ -34,7 +34,6 @@ import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.thrift.Status;
 import org.apache.flume.thrift.ThriftSourceProtocol;
 import org.apache.flume.thrift.ThriftFlumeEvent;
-import org.apache.flume.util.SSLUtil;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -54,19 +53,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLParameters;
 import javax.net.ssl.SSLServerSocket;
 import javax.security.sasl.Sasl;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.security.KeyStore;
-import java.security.Security;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
@@ -76,7 +70,8 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.security.PrivilegedAction;
 
-public class ThriftSource extends AbstractSource implements Configurable, EventDrivenSource {
+public class ThriftSource extends SslContextAwareAbstractSource
+    implements Configurable, EventDrivenSource {
 
   public static final Logger logger = LoggerFactory.getLogger(ThriftSource.class);
 
@@ -100,12 +95,6 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
   public static final String BINARY_PROTOCOL = "binary";
   public static final String COMPACT_PROTOCOL = "compact";
 
-  private static final String SSL_KEY = "ssl";
-  private static final String KEYSTORE_KEY = "keystore";
-  private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
-  private static final String KEYSTORE_TYPE_KEY = "keystore-type";
-  private static final String EXCLUDE_PROTOCOLS = "exclude-protocols";
-
   private static final String KERBEROS_KEY = "kerberos";
   private static final String AGENT_PRINCIPAL = "agent-principal";
   private static final String AGENT_KEYTAB = "agent-keytab";
@@ -117,17 +106,13 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
   private TServer server;
   private ExecutorService servingExecutor;
   private String protocol;
-  private String keystore;
-  private String keystorePassword;
-  private String keystoreType;
-  private final List<String> excludeProtocols = new LinkedList<String>();
-  private boolean enableSsl = false;
   private boolean enableKerberos = false;
   private String principal;
   private FlumeAuthenticator flumeAuth;
 
   @Override
   public void configure(Context context) {
+    configureSsl(context);
     logger.info("Configuring thrift source.");
     port = context.getInteger(CONFIG_PORT);
     Preconditions.checkNotNull(port, "Port must be specified for Thrift " +
@@ -159,34 +144,6 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
         "binary or compact are the only valid Thrift protocol types to " +
                 "choose from.");
 
-    enableSsl = context.getBoolean(SSL_KEY, false);
-    if (enableSsl) {
-      keystore = context.getString(KEYSTORE_KEY, SSLUtil.getGlobalKeystorePath());
-      keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY,
-          SSLUtil.getGlobalKeystorePassword());
-      keystoreType = context.getString(KEYSTORE_TYPE_KEY, SSLUtil.getGlobalKeystoreType("JKS"));
-      String excludeProtocolsStr = context.getString(EXCLUDE_PROTOCOLS);
-      if (excludeProtocolsStr == null) {
-        excludeProtocols.add("SSLv3");
-      } else {
-        excludeProtocols.addAll(Arrays.asList(excludeProtocolsStr.split(" ")));
-        if (!excludeProtocols.contains("SSLv3")) {
-          excludeProtocols.add("SSLv3");
-        }
-      }
-      Preconditions.checkNotNull(keystore,
-              KEYSTORE_KEY + " must be specified when SSL is enabled");
-      Preconditions.checkNotNull(keystorePassword,
-              KEYSTORE_PASSWORD_KEY + " must be specified when SSL is enabled");
-      try {
-        KeyStore ks = KeyStore.getInstance(keystoreType);
-        ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());
-      } catch (Exception ex) {
-        throw new FlumeException(
-                "Thrift source configured with invalid keystore: " + keystore, ex);
-      }
-    }
-
     principal = context.getString(AGENT_PRINCIPAL);
     String keytab = context.getString(AGENT_KEYTAB);
     enableKerberos = context.getBoolean(KERBEROS_KEY, false);
@@ -250,33 +207,23 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
     super.start();
   }
 
-  private String getkeyManagerAlgorithm() {
-    String algorithm = Security.getProperty(
-            "ssl.KeyManagerFactory.algorithm");
-    return (algorithm != null) ?
-            algorithm : KeyManagerFactory.getDefaultAlgorithm();
-  }
-
   private TServerTransport getSSLServerTransport() {
     try {
       TServerTransport transport;
       TSSLTransportFactory.TSSLTransportParameters params =
               new TSSLTransportFactory.TSSLTransportParameters();
 
-      params.setKeyStore(keystore, keystorePassword, getkeyManagerAlgorithm(), keystoreType);
+      params.setKeyStore(getKeystore(), getKeystorePassword(),
+          KeyManagerFactory.getDefaultAlgorithm(), getKeystoreType());
       transport = TSSLTransportFactory.getServerSocket(
               port, 120000, InetAddress.getByName(bindAddress), params);
 
       ServerSocket serverSock = ((TServerSocket) transport).getServerSocket();
       if (serverSock instanceof SSLServerSocket) {
         SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
-        List<String> enabledProtocols = new ArrayList<String>();
-        for (String protocol : sslServerSock.getEnabledProtocols()) {
-          if (!excludeProtocols.contains(protocol)) {
-            enabledProtocols.add(protocol);
-          }
-        }
-        sslServerSock.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
+        SSLParameters sslParameters = sslServerSock.getSSLParameters();
+        sslServerSock.setEnabledCipherSuites(getFilteredCipherSuites(sslParameters));
+        sslServerSock.setEnabledProtocols(getFilteredProtocols(sslParameters));
       }
       return transport;
     } catch (Throwable throwable) {
@@ -303,7 +250,7 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
   }
 
   private TServer getTThreadedSelectorServer() {
-    if (enableSsl || enableKerberos) {
+    if (isSslEnabled() || enableKerberos) {
       return null;
     }
     Class<?> serverClass;
@@ -353,7 +300,7 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
 
   private TServer getTThreadPoolServer() {
     TServerTransport serverTransport;
-    if (enableSsl) {
+    if (isSslEnabled()) {
       serverTransport = getSSLServerTransport();
     } else {
       serverTransport = getTServerTransport();

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
index e9324fb..f2c881c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
@@ -25,10 +25,9 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SourceCounter;
-import org.apache.flume.source.AbstractSource;
+import org.apache.flume.source.SslContextAwareAbstractSource;
 import org.apache.flume.tools.FlumeBeanConfigurator;
 import org.apache.flume.tools.HTTPServerConstraintUtil;
-import org.apache.flume.util.SSLUtil;
 import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
 import org.eclipse.jetty.server.SecureRequestCustomizer;
@@ -49,9 +48,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -79,7 +76,7 @@ import java.util.Map;
  * A JSON handler which converts JSON objects to Flume events is provided.
  *
  */
-public class HTTPSource extends AbstractSource implements
+public class HTTPSource extends SslContextAwareAbstractSource implements
         EventDrivenSource, Configurable {
   /*
    * There are 2 ways of doing this:
@@ -101,21 +98,13 @@ public class HTTPSource extends AbstractSource implements
   private HTTPSourceHandler handler;
   private SourceCounter sourceCounter;
 
-  // SSL configuration variable
-  private volatile String keyStorePath;
-  private volatile String keyStorePassword;
-  private volatile Boolean sslEnabled;
-  private final List<String> excludedProtocols = new LinkedList<String>();
-
   private Context sourceContext;
 
   @Override
   public void configure(Context context) {
+    configureSsl(context);
     sourceContext = context;
     try {
-      // SSL related config
-      sslEnabled = context.getBoolean(HTTPSourceConfigurationConstants.SSL_ENABLED, false);
-
       port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT);
       host = context.getString(HTTPSourceConfigurationConstants.CONFIG_BIND,
           HTTPSourceConfigurationConstants.DEFAULT_BIND);
@@ -129,29 +118,6 @@ public class HTTPSource extends AbstractSource implements
               HTTPSourceConfigurationConstants.CONFIG_HANDLER,
               HTTPSourceConfigurationConstants.DEFAULT_HANDLER).trim();
 
-      if (sslEnabled) {
-        LOG.debug("SSL configuration enabled");
-        keyStorePath = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE,
-                SSLUtil.getGlobalKeystorePath());
-        Preconditions.checkArgument(keyStorePath != null && !keyStorePath.isEmpty(),
-                                    "Keystore is required for SSL Conifguration" );
-        keyStorePassword = context.getString(
-                HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD,
-                SSLUtil.getGlobalKeystorePassword());
-        Preconditions.checkArgument(keyStorePassword != null,
-            "Keystore password is required for SSL Configuration");
-        String excludeProtocolsStr =
-            context.getString(HTTPSourceConfigurationConstants.EXCLUDE_PROTOCOLS);
-        if (excludeProtocolsStr == null) {
-          excludedProtocols.add("SSLv3");
-        } else {
-          excludedProtocols.addAll(Arrays.asList(excludeProtocolsStr.split(" ")));
-          if (!excludedProtocols.contains("SSLv3")) {
-            excludedProtocols.add("SSLv3");
-          }
-        }
-      }
-
       @SuppressWarnings("unchecked")
       Class<? extends HTTPSourceHandler> clazz =
               (Class<? extends HTTPSourceHandler>)
@@ -199,24 +165,25 @@ public class HTTPSource extends AbstractSource implements
     httpConfiguration.addCustomizer(new SecureRequestCustomizer());
 
     FlumeBeanConfigurator.setConfigurationFields(httpConfiguration, sourceContext);
-    ServerConnector connector;
-
-    if (sslEnabled) {
+    ServerConnector connector = getSslContextSupplier().get().map(sslContext -> {
       SslContextFactory sslCtxFactory = new SslContextFactory();
+      sslCtxFactory.setSslContext(sslContext);
+      sslCtxFactory.setExcludeProtocols(getExcludeProtocols().toArray(new String[]{}));
+      sslCtxFactory.setIncludeProtocols(getIncludeProtocols().toArray(new String[]{}));
+      sslCtxFactory.setExcludeCipherSuites(getExcludeCipherSuites().toArray(new String[]{}));
+      sslCtxFactory.setIncludeCipherSuites(getIncludeCipherSuites().toArray(new String[]{}));
+
       FlumeBeanConfigurator.setConfigurationFields(sslCtxFactory, sourceContext);
-      sslCtxFactory.setExcludeProtocols(excludedProtocols.toArray(new String[0]));
-      sslCtxFactory.setKeyStorePath(keyStorePath);
-      sslCtxFactory.setKeyStorePassword(keyStorePassword);
-      
+
       httpConfiguration.setSecurePort(port);
       httpConfiguration.setSecureScheme("https");
 
-      connector = new ServerConnector(srv,
-          new SslConnectionFactory(sslCtxFactory,HttpVersion.HTTP_1_1.asString()),
-          new HttpConnectionFactory(httpConfiguration));
-    } else {
-      connector = new ServerConnector(srv, new HttpConnectionFactory(httpConfiguration));
-    }
+      return new ServerConnector(srv,
+        new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.asString()),
+        new HttpConnectionFactory(httpConfiguration));
+    }).orElse(
+        new ServerConnector(srv, new HttpConnectionFactory(httpConfiguration))
+    );
 
     connector.setPort(port);
     connector.setHost(host);
@@ -315,4 +282,20 @@ public class HTTPSource extends AbstractSource implements
       doPost(request, response);
     }
   }
+
+  @Override
+  protected void configureSsl(Context context) {
+    handleDeprecatedParameter(context, "ssl", "enableSSL");
+    handleDeprecatedParameter(context, "exclude-protocols", "excludeProtocols");
+    handleDeprecatedParameter(context, "keystore-password", "keystorePassword");
+
+    super.configureSsl(context);
+  }
+
+  private void handleDeprecatedParameter(Context context, String newParam, String oldParam) {
+    if (!context.containsKey(newParam) && context.containsKey(oldParam)) {
+      context.put(newParam, context.getString(oldParam));
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
index f1f7a90..ff59d38 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
@@ -35,9 +35,13 @@ public class HTTPSourceConfigurationConstants {
   public static final String DEFAULT_HANDLER =
           "org.apache.flume.source.http.JSONHandler";
 
+  @Deprecated
   public static final String SSL_KEYSTORE = "keystore";
+  @Deprecated
   public static final String SSL_KEYSTORE_PASSWORD = "keystorePassword";
+  @Deprecated
   public static final String SSL_ENABLED = "enableSSL";
+  @Deprecated
   public static final String EXCLUDE_PROTOCOLS = "excludeProtocols";
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
index 8155a12..f132152 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
@@ -30,6 +30,7 @@ import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
+import java.security.cert.X509Certificate;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -37,6 +38,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiConsumer;
+
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
@@ -63,6 +66,11 @@ import org.mockito.internal.util.reflection.Whitebox;
 
 import static org.mockito.Mockito.*;
 
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
 public class TestMultiportSyslogTCPSource {
 
   private static final int getFreePort() throws IOException {
@@ -89,8 +97,9 @@ public class TestMultiportSyslogTCPSource {
   }
 
   private List<Integer> testNPorts(MultiportSyslogTCPSource source, Channel channel,
-                                   List<Event> channelEvents, int numPorts,
-                                   ChannelProcessor channelProcessor) throws IOException {
+      List<Event> channelEvents, int numPorts, ChannelProcessor channelProcessor,
+      BiConsumer<Integer, byte[]> eventSenderFuncton, Context additionalContext)
+      throws IOException {
     Context channelContext = new Context();
     channelContext.put("capacity", String.valueOf(2000));
     channelContext.put("transactionCapacity", String.valueOf(2000));
@@ -123,15 +132,12 @@ public class TestMultiportSyslogTCPSource {
     Context context = new Context();
     context.put(SyslogSourceConfigurationConstants.CONFIG_PORTS,
         ports.toString().trim());
+    context.putAll(additionalContext.getParameters());
     source.configure(context);
     source.start();
 
-    Socket syslogSocket;
     for (int i = 0; i < numPorts; i++) {
-      syslogSocket = new Socket(
-          InetAddress.getLocalHost(), portList.get(i));
-      syslogSocket.getOutputStream().write(getEvent(i));
-      syslogSocket.close();
+      eventSenderFuncton.accept(portList.get(i), getEvent(i));
     }
 
     Transaction txn = channel.getTransaction();
@@ -151,7 +157,6 @@ public class TestMultiportSyslogTCPSource {
       txn.close();
     }
 
-
     return portList;
   }
 
@@ -166,9 +171,85 @@ public class TestMultiportSyslogTCPSource {
     int numPorts = 1000;
 
     List<Integer> portList = testNPorts(source, channel, channelEvents,
-        numPorts, null);
+        numPorts, null, getSimpleEventSender(), new Context());
 
     //Since events can arrive out of order, search for each event in the array
+    processEvents(channelEvents, numPorts, portList);
+    source.stop();
+  }
+
+  /**
+   * Basic test to exercise multiple-port parsing.
+   */
+  @Test
+  public void testMultiplePortsSSL() throws Exception {
+
+    SSLContext sslContext = SSLContext.getInstance("TLS");
+    sslContext.init(null, new TrustManager[]{new X509TrustManager() {
+        @Override
+        public void checkClientTrusted(X509Certificate[] certs, String s) {
+          // nothing
+        }
+
+        @Override
+        public void checkServerTrusted(X509Certificate[] certs, String s) {
+          // nothing
+        }
+
+        @Override
+        public X509Certificate[] getAcceptedIssuers() {
+          return new X509Certificate[0];
+        }
+      } },
+        null);
+
+    SocketFactory socketFactory = sslContext.getSocketFactory();
+
+    Context context = new Context();
+    context.put("ssl", "true");
+    context.put("keystore", "src/test/resources/server.p12");
+    context.put("keystore-password", "password");
+    context.put("keystore-type", "PKCS12");
+
+
+    MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
+    Channel channel = new MemoryChannel();
+    List<Event> channelEvents = new ArrayList<>();
+    int numPorts = 10;
+
+    List<Integer> portList = testNPorts(source, channel, channelEvents,
+        numPorts, null, getSSLEventSender(socketFactory), context);
+
+    //Since events can arrive out of order, search for each event in the array
+    processEvents(channelEvents, numPorts, portList);
+    source.stop();
+  }
+
+  private BiConsumer<Integer, byte[]> getSSLEventSender(SocketFactory socketFactory) {
+    return (port, event) -> {
+      try {
+        Socket syslogSocket = socketFactory.createSocket(InetAddress.getLocalHost(), port);
+        syslogSocket.getOutputStream().write(event);
+        syslogSocket.close();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    };
+  }
+
+  private BiConsumer<Integer, byte[]> getSimpleEventSender() {
+    return (Integer port, byte[] event) -> {
+      try {
+        Socket syslogSocket = new Socket(InetAddress.getLocalHost(), port);
+        syslogSocket.getOutputStream().write(event);
+        syslogSocket.close();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    };
+  }
+
+  private void processEvents(List<Event> channelEvents, int numPorts, List<Integer> portList) {
     for (int i = 0; i < numPorts ; i++) {
       Iterator<Event> iter = channelEvents.iterator();
       while (iter.hasNext()) {
@@ -179,7 +260,7 @@ public class TestMultiportSyslogTCPSource {
         if (headers.containsKey(
             SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER)) {
           port = Integer.parseInt(headers.get(
-                SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER));
+              SyslogSourceConfigurationConstants.DEFAULT_PORT_HEADER));
         }
         iter.remove();
 
@@ -196,7 +277,6 @@ public class TestMultiportSyslogTCPSource {
         }
       }
     }
-    source.stop();
   }
 
   /**
@@ -442,7 +522,8 @@ public class TestMultiportSyslogTCPSource {
     doThrow(new ChannelException("dummy")).doNothing().when(cp)
         .processEventBatch(anyListOf(Event.class));
     try {
-      testNPorts(source, channel, channelEvents, 1, cp);
+      testNPorts(source, channel, channelEvents, 1, cp,
+          getSimpleEventSender(), new Context());
     } catch (Exception e) {
       //
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
index fbacdec..9398707 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
@@ -36,15 +36,22 @@ import org.mockito.Mockito;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.InetAddress;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
 public class TestSyslogTcpSource {
   private static final org.slf4j.Logger logger =
       LoggerFactory.getLogger(TestSyslogTcpSource.class);
@@ -63,6 +70,10 @@ public class TestSyslogTcpSource {
       data1 + "\n";
 
   private void init(String keepFields) {
+    init(keepFields, new Context());
+  }
+
+  private void init(String keepFields, Context context) {
     source = new SyslogTcpSource();
     channel = new MemoryChannel();
 
@@ -75,13 +86,22 @@ public class TestSyslogTcpSource {
     rcs.setChannels(channels);
 
     source.setChannelProcessor(new ChannelProcessor(rcs));
-    Context context = new Context();
     context.put("port", String.valueOf(TEST_SYSLOG_PORT));
     context.put("keepFields", keepFields);
 
     source.configure(context);
 
   }
+
+  private void initSsl() {
+    Context context = new Context();
+    context.put("ssl", "true");
+    context.put("keystore", "src/test/resources/server.p12");
+    context.put("keystore-password", "password");
+    context.put("keystore-type", "PKCS12");
+    init("none", context);
+  }
+
   /** Tests the keepFields configuration parameter (enabled or disabled)
    using SyslogTcpSource.*/
   private void runKeepFieldsTest(String keepFields) throws IOException {
@@ -161,8 +181,8 @@ public class TestSyslogTcpSource {
   @Test
   public void testSourceCounter() throws IOException {
     runKeepFieldsTest("all");
-    Assert.assertEquals(10, source.getSourceCounter().getEventAcceptedCount());
-    Assert.assertEquals(10, source.getSourceCounter().getEventReceivedCount());
+    assertEquals(10, source.getSourceCounter().getEventAcceptedCount());
+    assertEquals(10, source.getSourceCounter().getEventReceivedCount());
   }
 
   @Test
@@ -174,7 +194,7 @@ public class TestSyslogTcpSource {
     for (int i = 0; i < 10 && source.getSourceCounter().getChannelWriteFail() == 0; i++) {
       Thread.sleep(100);
     }
-    Assert.assertEquals(1, source.getSourceCounter().getChannelWriteFail());
+    assertEquals(1, source.getSourceCounter().getChannelWriteFail());
   }
 
   @Test
@@ -186,7 +206,7 @@ public class TestSyslogTcpSource {
     for (int i = 0; i < 10 && source.getSourceCounter().getEventReadFail() == 0; i++) {
       Thread.sleep(100);
     }
-    Assert.assertEquals(1, source.getSourceCounter().getEventReadFail());
+    assertEquals(1, source.getSourceCounter().getEventReadFail());
   }
 
   private void errorCounterCommon(Exception e) throws IOException {
@@ -202,5 +222,47 @@ public class TestSyslogTcpSource {
     }
   }
 
+  @Test
+  public void testSSLMessages() throws Exception {
+    initSsl();
+
+    source.start();
+    InetSocketAddress address = source.getBoundAddress();
+
+    SSLContext sslContext = SSLContext.getInstance("TLS");
+    sslContext.init(null, new TrustManager[]{new X509TrustManager() {
+        @Override
+        public void checkClientTrusted(X509Certificate[] certs, String s) {
+          // nothing
+        }
+
+        @Override
+        public void checkServerTrusted(X509Certificate[] certs, String s) {
+          // nothing
+        }
+
+        @Override
+        public X509Certificate[] getAcceptedIssuers() {
+          return new X509Certificate[0];
+        }
+      } },
+        null);
+    SocketFactory socketFactory = sslContext.getSocketFactory();
+    Socket socket = socketFactory.createSocket();
+    socket.connect(address);
+    OutputStream outputStream = socket.getOutputStream();
+    outputStream.write(bodyWithTandH.getBytes());
+    socket.close();
+   // Thread.sleep(100);
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+
+    Event event = channel.take();
+    assertEquals(new String(event.getBody()), data1);
+    transaction.commit();
+    transaction.close();
+
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/965e1326/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 16adc79..9d163c7 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -755,19 +755,21 @@ SSL/TLS support
 Several Flume components support the SSL/TLS protocols in order to communicate with other systems
 securely.
 
-===============  ======================
-Component        SSL server or client
-===============  ======================
-Avro Source      server
-Avro Sink        client
-Thrift Source    server
-Thrift Sink      client
-Kafka Source     client
-Kafka Channel    client
-Kafka Sink       client
-HTTP Source      server
-JMS Source       client
-===============  ======================
+===========================  ======================
+Component                    SSL server or client
+===========================  ======================
+Avro Source                  server
+Avro Sink                    client
+Thrift Source                server
+Thrift Sink                  client
+Kafka Source                 client
+Kafka Channel                client
+Kafka Sink                   client
+HTTP Source                  server
+JMS Source                   client
+Syslog TCP Source            server
+Multiport Syslog TCP Source  server
+===========================  ======================
 
 The SSL compatible components have several configuration parameters to set up SSL, like
 enable SSL flag, keystore / truststore parameters (location, password, type) and additional
@@ -799,6 +801,12 @@ javax.net.ssl.keyStoreType          FLUME_SSL_KEYSTORE_TYPE          Keystore ty
 javax.net.ssl.trustStore            FLUME_SSL_TRUSTSTORE_PATH        Truststore location
 javax.net.ssl.trustStorePassword    FLUME_SSL_TRUSTSTORE_PASSWORD    Truststore password
 javax.net.ssl.trustStoreType        FLUME_SSL_TRUSTSTORE_TYPE        Truststore type (by default JKS)
+flume.ssl.include.protocols         FLUME_SSL_INCLUDE_PROTOCOLS      Protocols to include when calculating enabled protocols. A comma (,) separated list.
+                                                                     Excluded protocols will be excluded from this list if provided.
+flume.ssl.exclude.protocols         FLUME_SSL_EXCLUDE_PROTOCOLS      Protocols to exclude when calculating enabled protocols. A comma (,) separated list.
+flume.ssl.include.cipherSuites      FLUME_SSL_INCLUDE_CIPHERSUITES   Cipher suites to include when calculating enabled cipher suites. A comma (,) separated list.
+                                                                     Excluded cipher suites will be excluded from this list if provided.
+flume.ssl.exclude.cipherSuites      FLUME_SSL_EXCLUDE_CIPHERSUITES   Cipher suites to exclude when calculating enabled cipher suites. A comma (,) separated list.
 ==================================  ===============================  ==================================
 
 The SSL system properties can either be passed on the command line or by setting the ``JAVA_OPTS``
@@ -856,36 +864,44 @@ When paired with the built-in Avro Sink on another (previous hop) Flume agent,
 it can create tiered collection topologies.
 Required properties are in **bold**.
 
-==================   ================  ===================================================
-Property Name        Default           Description
-==================   ================  ===================================================
-**channels**         --
-**type**             --                The component type name, needs to be ``avro``
-**bind**             --                hostname or IP address to listen on
-**port**             --                Port # to bind to
-threads              --                Maximum number of worker threads to spawn
+===================   ================  ===================================================
+Property Name         Default           Description
+===================   ================  ===================================================
+**channels**          --
+**type**              --                The component type name, needs to be ``avro``
+**bind**              --                hostname or IP address to listen on
+**port**              --                Port # to bind to
+threads               --                Maximum number of worker threads to spawn
 selector.type
 selector.*
-interceptors         --                Space-separated list of interceptors
+interceptors          --                Space-separated list of interceptors
 interceptors.*
-compression-type     none              This can be "none" or "deflate".  The compression-type must match the compression-type of matching AvroSource
-ssl                  false             Set this to true to enable SSL encryption. If SSL is enabled,
-                                       you must also specify a "keystore" and a "keystore-password",
-                                       either through component level parameters (see below)
-                                       or as global SSL parameters (see `SSL/TLS support`_ section).
-keystore             --                This is the path to a Java keystore file.
-                                       If not specified here, then the global keystore will be used
-                                       (if defined, otherwise configuration error).
-keystore-password    --                The password for the Java keystore.
-                                       If not specified here, then the global keystore password will be used
-                                       (if defined, otherwise configuration error).
-keystore-type        JKS               The type of the Java keystore. This can be "JKS" or "PKCS12".
-                                       If not specified here, then the global keystore type will be used
-                                       (if defined, otherwise the default is JKS).
-exclude-protocols    SSLv3             Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
-ipFilter             false             Set this to true to enable ipFiltering for netty
-ipFilterRules        --                Define N netty ipFilter pattern rules with this config.
-==================   ================  ===================================================
+compression-type      none              This can be "none" or "deflate".  The compression-type must match the compression-type of matching AvroSource
+ssl                   false             Set this to true to enable SSL encryption. If SSL is enabled,
+                                        you must also specify a "keystore" and a "keystore-password",
+                                        either through component level parameters (see below)
+                                        or as global SSL parameters (see `SSL/TLS support`_ section).
+keystore              --                This is the path to a Java keystore file.
+                                        If not specified here, then the global keystore will be used
+                                        (if defined, otherwise configuration error).
+keystore-password     --                The password for the Java keystore.
+                                        If not specified here, then the global keystore password will be used
+                                        (if defined, otherwise configuration error).
+keystore-type         JKS               The type of the Java keystore. This can be "JKS" or "PKCS12".
+                                        If not specified here, then the global keystore type will be used
+                                        (if defined, otherwise the default is JKS).
+exclude-protocols     SSLv3             Space-separated list of SSL/TLS protocols to exclude.
+                                        SSLv3 will always be excluded in addition to the protocols specified.
+include-protocols     --                Space-separated list of SSL/TLS protocols to include.
+                                        The enabled protocols will be the included protocols without the excluded protocols.
+                                        If included-protocols is empty, it includes every supported protocols.
+exclude-cipher-suites --                Space-separated list of cipher suites to exclude.
+include-cipher-suites --                Space-separated list of cipher suites to include.
+                                        The enabled cipher suites will be the included cipher suites without the excluded cipher suites.
+                                        If included-cipher-suites is empty, it includes every supported cipher suites.
+ipFilter              false             Set this to true to enable ipFiltering for netty
+ipFilterRules         --                Define N netty ipFilter pattern rules with this config.
+==================    ================  ===================================================
 
 Example for agent named a1:
 
@@ -924,36 +940,44 @@ agent-principal and agent-keytab are the properties used by the
 Thrift source to authenticate to the kerberos KDC.
 Required properties are in **bold**.
 
-==================   ===========  ==================================================================
-Property Name        Default      Description
-==================   ===========  ==================================================================
-**channels**         --
-**type**             --           The component type name, needs to be ``thrift``
-**bind**             --           hostname or IP address to listen on
-**port**             --           Port # to bind to
-threads              --           Maximum number of worker threads to spawn
+===================   ===========  ==================================================================
+Property Name         Default      Description
+===================   ===========  ==================================================================
+**channels**          --
+**type**              --           The component type name, needs to be ``thrift``
+**bind**              --           hostname or IP address to listen on
+**port**              --           Port # to bind to
+threads               --           Maximum number of worker threads to spawn
 selector.type
 selector.*
-interceptors         --           Space separated list of interceptors
+interceptors          --           Space separated list of interceptors
 interceptors.*
-ssl                  false        Set this to true to enable SSL encryption. If SSL is enabled,
-                                  you must also specify a "keystore" and a "keystore-password",
-                                  either through component level parameters (see below)
-                                  or as global SSL parameters (see `SSL/TLS support`_ section)
-keystore             --           This is the path to a Java keystore file.
-                                  If not specified here, then the global keystore will be used
-                                  (if defined, otherwise configuration error).
-keystore-password    --           The password for the Java keystore.
-                                  If not specified here, then the global keystore password will be used
-                                  (if defined, otherwise configuration error).
-keystore-type        JKS          The type of the Java keystore. This can be "JKS" or "PKCS12".
-                                  If not specified here, then the global keystore type will be used
-                                  (if defined, otherwise the default is JKS).
-exclude-protocols    SSLv3        Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
-kerberos             false        Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab  are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC.
-agent-principal      --           The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC.
-agent-keytab         —-           The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC.
-==================   ===========  ==================================================================
+ssl                   false        Set this to true to enable SSL encryption. If SSL is enabled,
+                                   you must also specify a "keystore" and a "keystore-password",
+                                   either through component level parameters (see below)
+                                   or as global SSL parameters (see `SSL/TLS support`_ section)
+keystore              --           This is the path to a Java keystore file.
+                                   If not specified here, then the global keystore will be used
+                                   (if defined, otherwise configuration error).
+keystore-password     --           The password for the Java keystore.
+                                   If not specified here, then the global keystore password will be used
+                                   (if defined, otherwise configuration error).
+keystore-type         JKS          The type of the Java keystore. This can be "JKS" or "PKCS12".
+                                   If not specified here, then the global keystore type will be used
+                                   (if defined, otherwise the default is JKS).
+exclude-protocols     SSLv3        Space-separated list of SSL/TLS protocols to exclude.
+                                   SSLv3 will always be excluded in addition to the protocols specified.
+include-protocols     --           Space-separated list of SSL/TLS protocols to include.
+                                   The enabled protocols will be the included protocols without the excluded protocols.
+                                   If included-protocols is empty, it includes every supported protocols.
+exclude-cipher-suites --           Space-separated list of cipher suites to exclude.
+include-cipher-suites --           Space-separated list of cipher suites to include.
+                                   The enabled cipher suites will be the included cipher suites without the excluded cipher suites.
+
+kerberos              false        Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab  are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC.
+agent-principal       --           The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC.
+agent-keytab          —-           The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC.
+===================   ===========  ==================================================================
 
 Example for agent named a1:
 
@@ -1770,26 +1794,48 @@ Syslog TCP Source
 
 The original, tried-and-true syslog TCP source.
 
-==============   ===========  ==============================================
-Property Name    Default      Description
-==============   ===========  ==============================================
-**channels**     --
-**type**         --           The component type name, needs to be ``syslogtcp``
-**host**         --           Host name or IP address to bind to
-**port**         --           Port # to bind to
-eventSize        2500         Maximum size of a single event line, in bytes
-keepFields       none         Setting this to 'all' will preserve the Priority,
-                              Timestamp and Hostname in the body of the event.
-                              A spaced separated list of fields to include
-                              is allowed as well. Currently, the following
-                              fields can be included: priority, version,
-                              timestamp, hostname. The values 'true' and 'false'
-                              have been deprecated in favor of 'all' and 'none'.
-selector.type                 replicating or multiplexing
-selector.*       replicating  Depends on the selector.type value
-interceptors     --           Space-separated list of interceptors
+===================   ===========  ==============================================
+Property Name         Default      Description
+===================   ===========  ==============================================
+**channels**          --
+**type**              --           The component type name, needs to be ``syslogtcp``
+**host**              --           Host name or IP address to bind to
+**port**              --           Port # to bind to
+eventSize             2500         Maximum size of a single event line, in bytes
+keepFields            none         Setting this to 'all' will preserve the Priority,
+                                   Timestamp and Hostname in the body of the event.
+                                   A spaced separated list of fields to include
+                                   is allowed as well. Currently, the following
+                                   fields can be included: priority, version,
+                                   timestamp, hostname. The values 'true' and 'false'
+                                   have been deprecated in favor of 'all' and 'none'.
+selector.type                      replicating or multiplexing
+selector.*            replicating  Depends on the selector.type value
+interceptors          --           Space-separated list of interceptors
 interceptors.*
-==============   ===========  ==============================================
+ssl                   false        Set this to true to enable SSL encryption. If SSL is enabled,
+                                   you must also specify a "keystore" and a "keystore-password",
+                                   either through component level parameters (see below)
+                                   or as global SSL parameters (see `SSL/TLS support`_ section).
+keystore              --           This is the path to a Java keystore file.
+                                   If not specified here, then the global keystore will be used
+                                   (if defined, otherwise configuration error).
+keystore-password     --           The password for the Java keystore.
+                                   If not specified here, then the global keystore password will be used
+                                   (if defined, otherwise configuration error).
+keystore-type         JKS          The type of the Java keystore. This can be "JKS" or "PKCS12".
+                                   If not specified here, then the global keystore type will be used
+                                   (if defined, otherwise the default is JKS).
+exclude-protocols     SSLv3        Space-separated list of SSL/TLS protocols to exclude.
+                                   SSLv3 will always be excluded in addition to the protocols specified.
+include-protocols     --           Space-separated list of SSL/TLS protocols to include.
+                                   The enabled protocols will be the included protocols without the excluded protocols.
+                                   If included-protocols is empty, it includes every supported protocols.
+exclude-cipher-suites --           Space-separated list of cipher suites to exclude.
+include-cipher-suites --           Space-separated list of cipher suites to include.
+                                   The enabled cipher suites will be the included cipher suites without the excluded cipher suites.
+                                   If included-cipher-suites is empty, it includes every supported cipher suites.
+===================   ===========  ==============================================
 
 For example, a syslog TCP source for agent named a1:
 
@@ -1838,6 +1884,28 @@ selector.type         replicating       replicating, multiplexing, or custom
 selector.*            --                Depends on the ``selector.type`` value
 interceptors          --                Space-separated list of interceptors.
 interceptors.*
+ssl                   false             Set this to true to enable SSL encryption. If SSL is enabled,
+                                        you must also specify a "keystore" and a "keystore-password",
+                                        either through component level parameters (see below)
+                                        or as global SSL parameters (see `SSL/TLS support`_ section).
+keystore              --                This is the path to a Java keystore file.
+                                        If not specified here, then the global keystore will be used
+                                        (if defined, otherwise configuration error).
+keystore-password     --                The password for the Java keystore.
+                                        If not specified here, then the global keystore password will be used
+                                        (if defined, otherwise configuration error).
+keystore-type         JKS               The type of the Java keystore. This can be "JKS" or "PKCS12".
+                                        If not specified here, then the global keystore type will be used
+                                        (if defined, otherwise the default is JKS).
+exclude-protocols     SSLv3             Space-separated list of SSL/TLS protocols to exclude.
+                                        SSLv3 will always be excluded in addition to the protocols specified.
+include-protocols     --                Space-separated list of SSL/TLS protocols to include.
+                                        The enabled protocols will be the included protocols without the excluded protocols.
+                                        If included-protocols is empty, it includes every supported protocols.
+exclude-cipher-suites --                Space-separated list of cipher suites to exclude.
+include-cipher-suites --                Space-separated list of cipher suites to include.
+                                        The enabled cipher suites will be the included cipher suites without the excluded cipher suites.
+                                        If included-cipher-suites is empty, it includes every supported cipher suites.
 ====================  ================  ==============================================
 
 For example, a multiport syslog TCP source for agent named a1:
@@ -1913,24 +1981,42 @@ selector.type         replicating                                   replicating
 selector.*                                                          Depends on the selector.type value
 interceptors          --                                            Space-separated list of interceptors
 interceptors.*
-enableSSL             false                                         Set the property true, to enable SSL. *HTTP Source does not support SSLv3.*
-excludeProtocols      SSLv3                                         Space-separated list of SSL/TLS protocols to exclude. SSLv3 is always excluded.
+ssl                   false                                         Set the property true, to enable SSL. *HTTP Source does not support SSLv3.*
+exclude-protocols     SSLv3                                         Space-separated list of SSL/TLS protocols to exclude.
+                                                                    SSLv3 will always be excluded in addition to the protocols specified.
+include-protocols     --                                            Space-separated list of SSL/TLS protocols to include.
+                                                                    The enabled protocols will be the included protocols without the excluded protocols.
+                                                                    If included-protocols is empty, it includes every supported protocols.
+exclude-cipher-suites --                                            Space-separated list of cipher suites to exclude.
+include-cipher-suites --                                            Space-separated list of cipher suites to include.
+                                                                    The enabled cipher suites will be the included cipher suites without the excluded cipher suites.
 keystore                                                            Location of the keystore including keystore file name.
                                                                     If SSL is enabled but the keystore is not specified here,
                                                                     then the global keystore will be used
                                                                     (if defined, otherwise configuration error).
-keystorePassword                                                    Keystore password.
+keystore-password                                                   Keystore password.
                                                                     If SSL is enabled but the keystore password is not specified here,
                                                                     then the global keystore password will be used
                                                                     (if defined, otherwise configuration error).
+keystore-type         JKS                                           Keystore type. This can be "JKS" or "PKCS12".
 QueuedThreadPool.*                                                  Jetty specific settings to be set on org.eclipse.jetty.util.thread.QueuedThreadPool.
                                                                     N.B. QueuedThreadPool will only be used if at least one property of this class is set.
 HttpConfiguration.*                                                 Jetty specific settings to be set on org.eclipse.jetty.server.HttpConfiguration
 SslContextFactory.*                                                 Jetty specific settings to be set on org.eclipse.jetty.util.ssl.SslContextFactory (only
-                                                                    applicable when *enableSSL* is set to true).
+                                                                    applicable when *ssl* is set to true).
 ServerConnector.*                                                   Jetty specific settings to be set on org.eclipse.jetty.server.ServerConnector
 =========================================================================================================================================================
 
+Deprecated Properties
+
+===============================  ===================  =============================================================================================
+Property Name                    Default              Description
+===============================  ===================  =============================================================================================
+keystorePassword                 --                   Use *keystore-password*. Deprecated value will be overwritten with the new one.
+excludeProtocols                 SSLv3                Use *exclude-protocols*. Deprecated value will be overwritten with the new one.
+enableSSL                        false                Use *ssl*. Deprecated value will be overwritten with the new one.
+===============================  ===================  =============================================================================================
+
 N.B. Jetty-specific settings are set using the setter-methods on the objects listed above. For full details see the Javadoc for these classes
 (`QueuedThreadPool <http://www.eclipse.org/jetty/javadoc/9.4.6.v20170531/org/eclipse/jetty/util/thread/QueuedThreadPool.html>`_,
 `HttpConfiguration <http://www.eclipse.org/jetty/javadoc/9.4.6.v20170531/org/eclipse/jetty/server/HttpConfiguration.html>`_,