You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/11/06 20:35:33 UTC

flume git commit: FLUME-2511. Allow configuration of enabled protocols in Avro source and RpcClient.

Repository: flume
Updated Branches:
  refs/heads/flume-1.5 a2175841e -> f25ff9e7d


FLUME-2511. Allow configuration of enabled protocols in Avro source and RpcClient.

(Tom White via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f25ff9e7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f25ff9e7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f25ff9e7

Branch: refs/heads/flume-1.5
Commit: f25ff9e7d7fff44a9f82aec051051c28f59f60ee
Parents: a217584
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Oct 22 18:27:36 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Nov 6 11:34:54 2014 -0800

----------------------------------------------------------------------
 .../org/apache/flume/source/AvroSource.java     | 14 ++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 36 +++++++++++---------
 .../apache/flume/api/NettyAvroRpcClient.java    | 27 ++++++++++++---
 .../api/RpcClientConfigurationConstants.java    |  1 +
 4 files changed, 57 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f25ff9e7/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index 3eef687..59ee43a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
 import java.security.KeyStore;
 import java.security.Security;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -141,12 +142,14 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   private static final String KEYSTORE_KEY = "keystore";
   private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
   private static final String KEYSTORE_TYPE_KEY = "keystore-type";
+  private static final String EXCLUDE_PROTOCOLS = "exclude-protocols";
   private int port;
   private String bindAddress;
   private String compressionType;
   private String keystore;
   private String keystorePassword;
   private String keystoreType;
+  private List<String> excludeProtocols;
   private boolean enableSsl = false;
   private boolean enableIpFilter;
   private String patternRuleConfigDefinition;
@@ -178,6 +181,8 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     keystore = context.getString(KEYSTORE_KEY);
     keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY);
     keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS");
+    excludeProtocols = Arrays.asList(
+        context.getString(EXCLUDE_PROTOCOLS, "SSLv2Hello SSLv3").split(" "));
 
     if (enableSsl) {
       Preconditions.checkNotNull(keystore,
@@ -501,6 +506,15 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
       if (enableSsl) {
         SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
         sslEngine.setUseClientMode(false);
+        List<String> enabledProtocols = new ArrayList<String>();
+        for (String protocol : sslEngine.getEnabledProtocols()) {
+          if (!excludeProtocols.contains(protocol)) {
+            enabledProtocols.add(protocol);
+          }
+        }
+        sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
+        logger.info("SSLEngine protocols enabled: " +
+            Arrays.asList(sslEngine.getEnabledProtocols()));
         // addFirst() will make SSL handling the first stage of decoding
         // and the last stage of encoding this must be added after
         // adding compression handling above

http://git-wip-us.apache.org/repos/asf/flume/blob/f25ff9e7/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index b2058f5..be0f593 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -688,26 +688,27 @@ When paired with the built-in Avro Sink on another (previous hop) Flume agent,
 it can create tiered collection topologies.
 Required properties are in **bold**.
 
-==================   ===========  ===================================================
-Property Name        Default      Description
-==================   ===========  ===================================================
+==================   ================  ===================================================
+Property Name        Default           Description
+==================   ================  ===================================================
 **channels**         --
-**type**             --           The component type name, needs to be ``avro``
-**bind**             --           hostname or IP address to listen on
-**port**             --           Port # to bind to
-threads              --           Maximum number of worker threads to spawn
+**type**             --                The component type name, needs to be ``avro``
+**bind**             --                hostname or IP address to listen on
+**port**             --                Port # to bind to
+threads              --                Maximum number of worker threads to spawn
 selector.type
 selector.*
-interceptors         --           Space-separated list of interceptors
+interceptors         --                Space-separated list of interceptors
 interceptors.*
-compression-type     none         This can be "none" or "deflate".  The compression-type must match the compression-type of matching AvroSource
-ssl                  false        Set this to true to enable SSL encryption. You must also specify a "keystore" and a "keystore-password".
-keystore             --           This is the path to a Java keystore file. Required for SSL.
-keystore-password    --           The password for the Java keystore. Required for SSL.
-keystore-type        JKS          The type of the Java keystore. This can be "JKS" or "PKCS12".
-ipFilter             false        Set this to true to enable ipFiltering for netty
-ipFilter.rules       --           Define N netty ipFilter pattern rules with this config.
-==================   ===========  ===================================================
+compression-type     none              This can be "none" or "deflate".  The compression-type must match the compression-type of matching AvroSource
+ssl                  false             Set this to true to enable SSL encryption. You must also specify a "keystore" and a "keystore-password".
+keystore             --                This is the path to a Java keystore file. Required for SSL.
+keystore-password    --                The password for the Java keystore. Required for SSL.
+keystore-type        JKS               The type of the Java keystore. This can be "JKS" or "PKCS12".
+exclude-protocols    SSLv2Hello SSLv3  Space-separated list of SSL/TLS protocols to exclude
+ipFilter             false             Set this to true to enable ipFiltering for netty
+ipFilter.rules       --                Define N netty ipFilter pattern rules with this config.
+==================   ================  ===================================================
 
 Example for agent named a1:
 
@@ -1660,7 +1661,7 @@ batches of the configured batch size.
 Required properties are in **bold**.
 
 ==========================   =====================================================  ===========================================================================================
-Property Name                Default  Description
+Property Name                Default                                                Description
 ==========================   =====================================================  ===========================================================================================
 **channel**                  --
 **type**                     --                                                     The component type name, needs to be ``avro``.
@@ -1677,6 +1678,7 @@ trust-all-certs              false
 truststore                   --                                                     The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source's SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will be used.
 truststore-password          --                                                     The password for the specified truststore.
 truststore-type              JKS                                                    The type of the Java truststore. This can be "JKS" or other supported Java truststore type.
+exclude-protocols            SSLv2Hello SSLv3                                       Space-separated list of SSL/TLS protocols to exclude
 maxIoWorkers                 2 * the number of available processors in the machine  The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.
 ==========================   =====================================================  ===========================================================================================
 

http://git-wip-us.apache.org/repos/asf/flume/blob/f25ff9e7/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index a2eb264..ad9b580 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -25,6 +25,8 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.KeyStore;
 import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -94,6 +96,7 @@ implements RpcClient {
   private String truststore;
   private String truststorePassword;
   private String truststoreType;
+  private List<String> excludeProtocols;
 
   private Transceiver transceiver;
   private AvroSourceProtocol.Callback avroClient;
@@ -144,12 +147,13 @@ implements RpcClient {
             bossExecutor, workerExecutor,
             enableDeflateCompression, enableSsl, trustAllCerts,
             compressionLevel, truststore, truststorePassword, truststoreType,
-            maxIoWorkers);
+            excludeProtocols, maxIoWorkers);
         } else {
           socketChannelFactory = new SSLCompressionChannelFactory(
             bossExecutor, workerExecutor,
             enableDeflateCompression, enableSsl, trustAllCerts,
-            compressionLevel, truststore, truststorePassword, truststoreType);
+            compressionLevel, truststore, truststorePassword, truststoreType,
+            excludeProtocols);
         }
       } else {
         if (maxIoWorkers >= 1) {
@@ -603,6 +607,9 @@ implements RpcClient {
         RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD);
     truststoreType = properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS");
+    excludeProtocols = Arrays.asList(properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS, "SSLv2Hello SSLv3")
+        .split(" "));
 
     String maxIoWorkersStr = properties.getProperty(
       RpcClientConfigurationConstants.MAX_IO_WORKERS);
@@ -669,11 +676,12 @@ implements RpcClient {
     private final String truststore;
     private final String truststorePassword;
     private final String truststoreType;
+    private final List<String> excludeProtocols;
 
     public SSLCompressionChannelFactory(Executor bossExecutor, Executor workerExecutor,
         boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
         int compressionLevel, String truststore, String truststorePassword,
-        String truststoreType) {
+        String truststoreType, List<String> excludeProtocols) {
       super(bossExecutor, workerExecutor);
       this.enableCompression = enableCompression;
       this.enableSsl = enableSsl;
@@ -682,12 +690,13 @@ implements RpcClient {
       this.truststore = truststore;
       this.truststorePassword = truststorePassword;
       this.truststoreType = truststoreType;
+      this.excludeProtocols = excludeProtocols;
     }
 
     public SSLCompressionChannelFactory(Executor bossExecutor, Executor workerExecutor,
         boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
         int compressionLevel, String truststore, String truststorePassword,
-        String truststoreType, int maxIOWorkers) {
+        String truststoreType, List<String> excludeProtocols, int maxIOWorkers) {
       super(bossExecutor, workerExecutor, maxIOWorkers);
       this.enableCompression = enableCompression;
       this.enableSsl = enableSsl;
@@ -696,6 +705,7 @@ implements RpcClient {
       this.truststore = truststore;
       this.truststorePassword = truststorePassword;
       this.truststoreType = truststoreType;
+      this.excludeProtocols = excludeProtocols;
     }
 
     @Override
@@ -735,6 +745,15 @@ implements RpcClient {
           sslContext.init(null, managers, null);
           SSLEngine sslEngine = sslContext.createSSLEngine();
           sslEngine.setUseClientMode(true);
+          List<String> enabledProtocols = new ArrayList<String>();
+          for (String protocol : sslEngine.getEnabledProtocols()) {
+            if (!excludeProtocols.contains(protocol)) {
+              enabledProtocols.add(protocol);
+            }
+          }
+          sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
+          logger.info("SSLEngine protocols enabled: " +
+              Arrays.asList(sslEngine.getEnabledProtocols()));
           // addFirst() will make SSL handling the first stage of decoding
           // and the last stage of encoding this must be added after
           // adding compression handling above

http://git-wip-us.apache.org/repos/asf/flume/blob/f25ff9e7/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index 136c504..33a2330 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -143,6 +143,7 @@ public final class RpcClientConfigurationConstants {
   public static final String CONFIG_TRUSTSTORE = "truststore";
   public static final String CONFIG_TRUSTSTORE_PASSWORD = "truststore-password";
   public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type";
+  public static final String CONFIG_EXCLUDE_PROTOCOLS = "exclude-protocols";
 
   /**
    * Configuration constants for the NettyAvroRpcClient