You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/11/06 20:35:33 UTC
flume git commit: FLUME-2511. Allow configuration of enabled
protocols in Avro source and RpcClient.
Repository: flume
Updated Branches:
refs/heads/flume-1.5 a2175841e -> f25ff9e7d
FLUME-2511. Allow configuration of enabled protocols in Avro source and RpcClient.
(Tom White via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f25ff9e7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f25ff9e7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f25ff9e7
Branch: refs/heads/flume-1.5
Commit: f25ff9e7d7fff44a9f82aec051051c28f59f60ee
Parents: a217584
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Oct 22 18:27:36 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Nov 6 11:34:54 2014 -0800
----------------------------------------------------------------------
.../org/apache/flume/source/AvroSource.java | 14 ++++++++
flume-ng-doc/sphinx/FlumeUserGuide.rst | 36 +++++++++++---------
.../apache/flume/api/NettyAvroRpcClient.java | 27 ++++++++++++---
.../api/RpcClientConfigurationConstants.java | 1 +
4 files changed, 57 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/f25ff9e7/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index 3eef687..59ee43a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
import java.security.KeyStore;
import java.security.Security;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -141,12 +142,14 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
private static final String KEYSTORE_KEY = "keystore";
private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
private static final String KEYSTORE_TYPE_KEY = "keystore-type";
+ private static final String EXCLUDE_PROTOCOLS = "exclude-protocols";
private int port;
private String bindAddress;
private String compressionType;
private String keystore;
private String keystorePassword;
private String keystoreType;
+ private List<String> excludeProtocols;
private boolean enableSsl = false;
private boolean enableIpFilter;
private String patternRuleConfigDefinition;
@@ -178,6 +181,8 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
keystore = context.getString(KEYSTORE_KEY);
keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY);
keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS");
+ excludeProtocols = Arrays.asList(
+ context.getString(EXCLUDE_PROTOCOLS, "SSLv2Hello SSLv3").split(" "));
if (enableSsl) {
Preconditions.checkNotNull(keystore,
@@ -501,6 +506,15 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
if (enableSsl) {
SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
sslEngine.setUseClientMode(false);
+ List<String> enabledProtocols = new ArrayList<String>();
+ for (String protocol : sslEngine.getEnabledProtocols()) {
+ if (!excludeProtocols.contains(protocol)) {
+ enabledProtocols.add(protocol);
+ }
+ }
+ sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
+ logger.info("SSLEngine protocols enabled: " +
+ Arrays.asList(sslEngine.getEnabledProtocols()));
// addFirst() will make SSL handling the first stage of decoding
// and the last stage of encoding this must be added after
// adding compression handling above
http://git-wip-us.apache.org/repos/asf/flume/blob/f25ff9e7/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index b2058f5..be0f593 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -688,26 +688,27 @@ When paired with the built-in Avro Sink on another (previous hop) Flume agent,
it can create tiered collection topologies.
Required properties are in **bold**.
-================== =========== ===================================================
-Property Name Default Description
-================== =========== ===================================================
+================== ================ ===================================================
+Property Name Default Description
+================== ================ ===================================================
**channels** --
-**type** -- The component type name, needs to be ``avro``
-**bind** -- hostname or IP address to listen on
-**port** -- Port # to bind to
-threads -- Maximum number of worker threads to spawn
+**type** -- The component type name, needs to be ``avro``
+**bind** -- hostname or IP address to listen on
+**port** -- Port # to bind to
+threads -- Maximum number of worker threads to spawn
selector.type
selector.*
-interceptors -- Space-separated list of interceptors
+interceptors -- Space-separated list of interceptors
interceptors.*
-compression-type none This can be "none" or "deflate". The compression-type must match the compression-type of matching AvroSource
-ssl false Set this to true to enable SSL encryption. You must also specify a "keystore" and a "keystore-password".
-keystore -- This is the path to a Java keystore file. Required for SSL.
-keystore-password -- The password for the Java keystore. Required for SSL.
-keystore-type JKS The type of the Java keystore. This can be "JKS" or "PKCS12".
-ipFilter false Set this to true to enable ipFiltering for netty
-ipFilter.rules -- Define N netty ipFilter pattern rules with this config.
-================== =========== ===================================================
+compression-type none This can be "none" or "deflate". The compression-type must match the compression-type of matching AvroSource
+ssl false Set this to true to enable SSL encryption. You must also specify a "keystore" and a "keystore-password".
+keystore -- This is the path to a Java keystore file. Required for SSL.
+keystore-password -- The password for the Java keystore. Required for SSL.
+keystore-type JKS The type of the Java keystore. This can be "JKS" or "PKCS12".
+exclude-protocols SSLv2Hello SSLv3 Space-separated list of SSL/TLS protocols to exclude
+ipFilter false Set this to true to enable ipFiltering for netty
+ipFilter.rules -- Define N netty ipFilter pattern rules with this config.
+================== ================ ===================================================
Example for agent named a1:
@@ -1660,7 +1661,7 @@ batches of the configured batch size.
Required properties are in **bold**.
========================== ===================================================== ===========================================================================================
-Property Name Default Description
+Property Name Default Description
========================== ===================================================== ===========================================================================================
**channel** --
**type** -- The component type name, needs to be ``avro``.
@@ -1677,6 +1678,7 @@ trust-all-certs false
truststore -- The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source's SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will be used.
truststore-password -- The password for the specified truststore.
truststore-type JKS The type of the Java truststore. This can be "JKS" or other supported Java truststore type.
+exclude-protocols SSLv2Hello SSLv3 Space-separated list of SSL/TLS protocols to exclude
maxIoWorkers 2 * the number of available processors in the machine The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.
========================== ===================================================== ===========================================================================================
http://git-wip-us.apache.org/repos/asf/flume/blob/f25ff9e7/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index a2eb264..ad9b580 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -25,6 +25,8 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -94,6 +96,7 @@ implements RpcClient {
private String truststore;
private String truststorePassword;
private String truststoreType;
+ private List<String> excludeProtocols;
private Transceiver transceiver;
private AvroSourceProtocol.Callback avroClient;
@@ -144,12 +147,13 @@ implements RpcClient {
bossExecutor, workerExecutor,
enableDeflateCompression, enableSsl, trustAllCerts,
compressionLevel, truststore, truststorePassword, truststoreType,
- maxIoWorkers);
+ excludeProtocols, maxIoWorkers);
} else {
socketChannelFactory = new SSLCompressionChannelFactory(
bossExecutor, workerExecutor,
enableDeflateCompression, enableSsl, trustAllCerts,
- compressionLevel, truststore, truststorePassword, truststoreType);
+ compressionLevel, truststore, truststorePassword, truststoreType,
+ excludeProtocols);
}
} else {
if (maxIoWorkers >= 1) {
@@ -603,6 +607,9 @@ implements RpcClient {
RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD);
truststoreType = properties.getProperty(
RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS");
+ excludeProtocols = Arrays.asList(properties.getProperty(
+ RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS, "SSLv2Hello SSLv3")
+ .split(" "));
String maxIoWorkersStr = properties.getProperty(
RpcClientConfigurationConstants.MAX_IO_WORKERS);
@@ -669,11 +676,12 @@ implements RpcClient {
private final String truststore;
private final String truststorePassword;
private final String truststoreType;
+ private final List<String> excludeProtocols;
public SSLCompressionChannelFactory(Executor bossExecutor, Executor workerExecutor,
boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
int compressionLevel, String truststore, String truststorePassword,
- String truststoreType) {
+ String truststoreType, List<String> excludeProtocols) {
super(bossExecutor, workerExecutor);
this.enableCompression = enableCompression;
this.enableSsl = enableSsl;
@@ -682,12 +690,13 @@ implements RpcClient {
this.truststore = truststore;
this.truststorePassword = truststorePassword;
this.truststoreType = truststoreType;
+ this.excludeProtocols = excludeProtocols;
}
public SSLCompressionChannelFactory(Executor bossExecutor, Executor workerExecutor,
boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
int compressionLevel, String truststore, String truststorePassword,
- String truststoreType, int maxIOWorkers) {
+ String truststoreType, List<String> excludeProtocols, int maxIOWorkers) {
super(bossExecutor, workerExecutor, maxIOWorkers);
this.enableCompression = enableCompression;
this.enableSsl = enableSsl;
@@ -696,6 +705,7 @@ implements RpcClient {
this.truststore = truststore;
this.truststorePassword = truststorePassword;
this.truststoreType = truststoreType;
+ this.excludeProtocols = excludeProtocols;
}
@Override
@@ -735,6 +745,15 @@ implements RpcClient {
sslContext.init(null, managers, null);
SSLEngine sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(true);
+ List<String> enabledProtocols = new ArrayList<String>();
+ for (String protocol : sslEngine.getEnabledProtocols()) {
+ if (!excludeProtocols.contains(protocol)) {
+ enabledProtocols.add(protocol);
+ }
+ }
+ sslEngine.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
+ logger.info("SSLEngine protocols enabled: " +
+ Arrays.asList(sslEngine.getEnabledProtocols()));
// addFirst() will make SSL handling the first stage of decoding
// and the last stage of encoding this must be added after
// adding compression handling above
http://git-wip-us.apache.org/repos/asf/flume/blob/f25ff9e7/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index 136c504..33a2330 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -143,6 +143,7 @@ public final class RpcClientConfigurationConstants {
public static final String CONFIG_TRUSTSTORE = "truststore";
public static final String CONFIG_TRUSTSTORE_PASSWORD = "truststore-password";
public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type";
+ public static final String CONFIG_EXCLUDE_PROTOCOLS = "exclude-protocols";
/**
* Configuration constants for the NettyAvroRpcClient