You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/10/05 09:42:45 UTC

[1/2] flume git commit: FLUME-3269: Support JSSE keystore/trustore -D system properties

Repository: flume
Updated Branches:
  refs/heads/trunk 1b4378396 -> c5168c902


http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
index 07d9c90..0048e61 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/ThriftRpcClient.java
@@ -24,6 +24,7 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.thrift.Status;
 import org.apache.flume.thrift.ThriftFlumeEvent;
 import org.apache.flume.thrift.ThriftSourceProtocol;
+import org.apache.flume.util.SSLUtil;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.transport.TFastFramedTransport;
@@ -318,11 +319,13 @@ public class ThriftRpcClient extends AbstractRpcClient {
           RpcClientConfigurationConstants.CONFIG_SSL));
       if (enableSsl) {
         truststore = properties.getProperty(
-            RpcClientConfigurationConstants.CONFIG_TRUSTSTORE);
+            RpcClientConfigurationConstants.CONFIG_TRUSTSTORE, SSLUtil.getGlobalTruststorePath());
         truststorePassword = properties.getProperty(
-            RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD);
+            RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD,
+            SSLUtil.getGlobalTruststorePassword());
         truststoreType = properties.getProperty(
-            RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS");
+            RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE,
+            SSLUtil.getGlobalTruststoreType("JKS"));
         String excludeProtocolsStr = properties.getProperty(
             RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS);
         if (excludeProtocolsStr == null) {
@@ -520,7 +523,8 @@ public class ThriftRpcClient extends AbstractRpcClient {
       KeyStore ts = null;
       if (truststore != null && truststoreType != null) {
         ts = KeyStore.getInstance(truststoreType);
-        ts.load(new FileInputStream(truststore), truststorePassword.toCharArray());
+        ts.load(new FileInputStream(truststore),
+            truststorePassword != null ? truststorePassword.toCharArray() : null);
         tmf.init(ts);
       }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/main/java/org/apache/flume/util/SSLUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/util/SSLUtil.java b/flume-ng-sdk/src/main/java/org/apache/flume/util/SSLUtil.java
new file mode 100644
index 0000000..02fe8ed
--- /dev/null
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/util/SSLUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SSLUtil {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SSLUtil.class);
+
+  private static final String SYS_PROP_KEYSTORE_PATH = "javax.net.ssl.keyStore";
+  private static final String SYS_PROP_KEYSTORE_PASSWORD = "javax.net.ssl.keyStorePassword";
+  private static final String SYS_PROP_KEYSTORE_TYPE = "javax.net.ssl.keyStoreType";
+  private static final String SYS_PROP_TRUSTSTORE_PATH = "javax.net.ssl.trustStore";
+  private static final String SYS_PROP_TRUSTSTORE_PASSWORD = "javax.net.ssl.trustStorePassword";
+  private static final String SYS_PROP_TRUSTSTORE_TYPE = "javax.net.ssl.trustStoreType";
+
+  private static final String ENV_VAR_KEYSTORE_PATH = "FLUME_SSL_KEYSTORE_PATH";
+  private static final String ENV_VAR_KEYSTORE_PASSWORD = "FLUME_SSL_KEYSTORE_PASSWORD";
+  private static final String ENV_VAR_KEYSTORE_TYPE = "FLUME_SSL_KEYSTORE_TYPE";
+  private static final String ENV_VAR_TRUSTSTORE_PATH = "FLUME_SSL_TRUSTSTORE_PATH";
+  private static final String ENV_VAR_TRUSTSTORE_PASSWORD = "FLUME_SSL_TRUSTSTORE_PASSWORD";
+  private static final String ENV_VAR_TRUSTSTORE_TYPE = "FLUME_SSL_TRUSTSTORE_TYPE";
+
+  private static final String DESCR_KEYSTORE_PATH = "keystore path";
+  private static final String DESCR_KEYSTORE_PASSWORD = "keystore password";
+  private static final String DESCR_KEYSTORE_TYPE = "keystore type";
+  private static final String DESCR_TRUSTSTORE_PATH = "truststore path";
+  private static final String DESCR_TRUSTSTORE_PASSWORD = "truststore password";
+  private static final String DESCR_TRUSTSTORE_TYPE = "truststore type";
+
+  public static void initGlobalSSLParameters() {
+    initSysPropFromEnvVar(
+        SYS_PROP_KEYSTORE_PATH, ENV_VAR_KEYSTORE_PATH, DESCR_KEYSTORE_PATH);
+    initSysPropFromEnvVar(
+        SYS_PROP_KEYSTORE_PASSWORD, ENV_VAR_KEYSTORE_PASSWORD, DESCR_KEYSTORE_PASSWORD);
+    initSysPropFromEnvVar(
+        SYS_PROP_KEYSTORE_TYPE, ENV_VAR_KEYSTORE_TYPE, DESCR_KEYSTORE_TYPE);
+    initSysPropFromEnvVar(
+        SYS_PROP_TRUSTSTORE_PATH, ENV_VAR_TRUSTSTORE_PATH, DESCR_TRUSTSTORE_PATH);
+    initSysPropFromEnvVar(
+        SYS_PROP_TRUSTSTORE_PASSWORD, ENV_VAR_TRUSTSTORE_PASSWORD, DESCR_TRUSTSTORE_PASSWORD);
+    initSysPropFromEnvVar(
+        SYS_PROP_TRUSTSTORE_TYPE, ENV_VAR_TRUSTSTORE_TYPE, DESCR_TRUSTSTORE_TYPE);
+  }
+
+  private static void initSysPropFromEnvVar(String sysPropName, String envVarName,
+                                            String description) {
+    if (System.getProperty(sysPropName) != null) {
+      LOGGER.debug("Global SSL " + description + " has been initialized from system property.");
+    } else {
+      String envVarValue = System.getenv(envVarName);
+      if (envVarValue != null) {
+        System.setProperty(sysPropName, envVarValue);
+        LOGGER.debug("Global SSL " + description +
+            " has been initialized from environment variable.");
+      } else {
+        LOGGER.debug("No global SSL " + description + " specified.");
+      }
+    }
+  }
+
+  public static String getGlobalKeystorePath() {
+    return System.getProperty(SYS_PROP_KEYSTORE_PATH);
+  }
+
+  public static String getGlobalKeystorePassword() {
+    return System.getProperty(SYS_PROP_KEYSTORE_PASSWORD);
+  }
+
+  public static String getGlobalKeystoreType(String defaultValue) {
+    String sysPropValue = System.getProperty(SYS_PROP_KEYSTORE_TYPE);
+    return sysPropValue != null ? sysPropValue : defaultValue;
+  }
+
+  public static String getGlobalTruststorePath() {
+    return System.getProperty(SYS_PROP_TRUSTSTORE_PATH);
+  }
+
+  public static String getGlobalTruststorePassword() {
+    return System.getProperty(SYS_PROP_TRUSTSTORE_PASSWORD);
+  }
+
+  public static String getGlobalTruststoreType(String defaultValue) {
+    String sysPropValue = System.getProperty(SYS_PROP_TRUSTSTORE_TYPE);
+    return sysPropValue != null ? sysPropValue : defaultValue;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java
index 2eee0ef..8ae86e0 100644
--- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java
@@ -41,9 +41,9 @@ import java.util.concurrent.TimeoutException;
 public class TestThriftRpcClient {
   private static final String SEQ = "sequence";
   private final Properties props = new Properties();
-  ThriftRpcClient client;
-  ThriftTestingSource src;
-  int port;
+  private ThriftRpcClient client;
+  private ThriftTestingSource src;
+  private int port;
 
   @Before
   public void setUp() throws Exception {
@@ -70,7 +70,7 @@ public class TestThriftRpcClient {
    * @param count
    * @throws Exception
    */
-  public static void insertEvents(RpcClient client, int count) throws Exception {
+  private  static void insertEvents(RpcClient client, int count) throws Exception {
     for (int i = 0; i < count; i++) {
       Map<String, String> header = new HashMap<String, String>();
       header.put(SEQ, String.valueOf(i));
@@ -87,7 +87,7 @@ public class TestThriftRpcClient {
    * @throws Exception
    */
 
-  public static void insertAsBatch(RpcClient client, int start,
+  private static void insertAsBatch(RpcClient client, int start,
                                    int limit) throws Exception {
     List<Event> events = new ArrayList<Event>();
     for (int i = start; i <= limit; i++) {

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/test/java/org/apache/flume/util/AbstractSSLUtilTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/AbstractSSLUtilTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/AbstractSSLUtilTest.java
new file mode 100644
index 0000000..944a485
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/AbstractSSLUtilTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public abstract class AbstractSSLUtilTest {
+
+  @Parameters
+  public static Collection<?> data() {
+    return Arrays.asList(new Object[][]{
+        // system property value, environment variable value, expected value
+        { null, null, null },
+        { "sysprop", null, "sysprop" },
+        { null, "envvar", "envvar" },
+        { "sysprop", "envvar", "sysprop" }
+    });
+  }
+
+  protected String sysPropValue;
+  protected String envVarValue;
+  protected String expectedValue;
+
+  protected AbstractSSLUtilTest(String sysPropValue, String envVarValue, String expectedValue) {
+    this.sysPropValue = sysPropValue;
+    this.envVarValue = envVarValue;
+    this.expectedValue = expectedValue;
+  }
+
+  protected abstract String getSysPropName();
+
+  protected abstract String getEnvVarName();
+
+  @Before
+  public void setUp() {
+    setSysProp(getSysPropName(), sysPropValue);
+    setEnvVar(getEnvVarName(), envVarValue);
+  }
+
+  @After
+  public void tearDown() {
+    setSysProp(getSysPropName(), null);
+    setEnvVar(getEnvVarName(), null);
+  }
+
+  private static void setSysProp(String name, String value) {
+    if (value != null) {
+      System.setProperty(name, value);
+    } else {
+      System.clearProperty(name);
+    }
+  }
+
+  private static void setEnvVar(String name, String value) {
+    try {
+      injectEnvironmentVariable(name, value);
+    } catch (ReflectiveOperationException e) {
+      throw new AssertionError("Test setup  failed.", e);
+    }
+  }
+
+  // based on https://dzone.com/articles/how-to-change-environment-variables-in-java
+  private static void injectEnvironmentVariable(String key, String value)
+      throws ReflectiveOperationException {
+    Class<?> processEnvironment = Class.forName("java.lang.ProcessEnvironment");
+    Field unmodifiableMapField = getAccessibleField(processEnvironment,
+        "theUnmodifiableEnvironment");
+    Object unmodifiableMap = unmodifiableMapField.get(null);
+    injectIntoUnmodifiableMap(key, value, unmodifiableMap);
+    Field mapField = getAccessibleField(processEnvironment, "theEnvironment");
+    Map<String, String> map = (Map<String, String>) mapField.get(null);
+    if (value != null) {
+      map.put(key, value);
+    } else {
+      map.remove(key);
+    }
+  }
+
+  private static Field getAccessibleField(Class<?> clazz, String fieldName)
+      throws NoSuchFieldException {
+    Field field = clazz.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    return field;
+  }
+
+  private static void injectIntoUnmodifiableMap(String key, String value, Object map)
+      throws ReflectiveOperationException {
+    Class unmodifiableMap = Class.forName("java.util.Collections$UnmodifiableMap");
+    Field field = getAccessibleField(unmodifiableMap, "m");
+    Object obj = field.get(map);
+    if (value != null) {
+      ((Map<String, String>) obj).put(key, value);
+    } else {
+      ((Map<String, String>) obj).remove(key);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystorePasswordTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystorePasswordTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystorePasswordTest.java
new file mode 100644
index 0000000..c54b309
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystorePasswordTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SSLUtilKeystorePasswordTest extends AbstractSSLUtilTest {
+
+  public SSLUtilKeystorePasswordTest(String sysPropValue, String envVarValue,
+                                     String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+
+  @Override
+  protected String getSysPropName() {
+    return "javax.net.ssl.keyStorePassword";
+  }
+
+  @Override
+  protected String getEnvVarName() {
+    return "FLUME_SSL_KEYSTORE_PASSWORD";
+  }
+
+  @Test
+  public void testKeystorePassword() {
+    SSLUtil.initGlobalSSLParameters();
+    String keystorePassword = SSLUtil.getGlobalKeystorePassword();
+
+    Assert.assertEquals(expectedValue, keystorePassword);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystorePathTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystorePathTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystorePathTest.java
new file mode 100644
index 0000000..a7b3206
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystorePathTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SSLUtilKeystorePathTest extends AbstractSSLUtilTest {
+
+  public SSLUtilKeystorePathTest(String sysPropValue, String envVarValue, String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+
+  @Override
+  protected String getSysPropName() {
+    return "javax.net.ssl.keyStore";
+  }
+
+  @Override
+  protected String getEnvVarName() {
+    return "FLUME_SSL_KEYSTORE_PATH";
+  }
+
+  @Test
+  public void testKeystorePath() {
+    SSLUtil.initGlobalSSLParameters();
+    String keystorePath = SSLUtil.getGlobalKeystorePath();
+
+    Assert.assertEquals(expectedValue, keystorePath);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystoreTypeTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystoreTypeTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystoreTypeTest.java
new file mode 100644
index 0000000..a8c00c7
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystoreTypeTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SSLUtilKeystoreTypeTest extends AbstractSSLUtilTest {
+
+  public SSLUtilKeystoreTypeTest(String sysPropValue, String envVarValue, String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+
+  @Override
+  protected String getSysPropName() {
+    return "javax.net.ssl.keyStoreType";
+  }
+
+  @Override
+  protected String getEnvVarName() {
+    return "FLUME_SSL_KEYSTORE_TYPE";
+  }
+
+  @Test
+  public void testKeystoreType() {
+    SSLUtil.initGlobalSSLParameters();
+    String keystoreType = SSLUtil.getGlobalKeystoreType(null);
+
+    Assert.assertEquals(expectedValue, keystoreType);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystoreTypeWithDefaultTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystoreTypeWithDefaultTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystoreTypeWithDefaultTest.java
new file mode 100644
index 0000000..d30e30a
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilKeystoreTypeWithDefaultTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+public class SSLUtilKeystoreTypeWithDefaultTest extends AbstractSSLUtilTest {
+
+  @Parameters
+  public static Collection<?> data() {
+    return Arrays.asList(new Object[][]{
+        // system property value, environment variable value, expected value
+        { null, null, "default" },
+        { "sysprop", null, "sysprop" },
+        { null, "envvar", "envvar" },
+        { "sysprop", "envvar", "sysprop" }
+    });
+  }
+
+  public SSLUtilKeystoreTypeWithDefaultTest(String sysPropValue, String envVarValue,
+                                            String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+
+  @Override
+  protected String getSysPropName() {
+    return "javax.net.ssl.keyStoreType";
+  }
+
+  @Override
+  protected String getEnvVarName() {
+    return "FLUME_SSL_KEYSTORE_TYPE";
+  }
+
+  @Test
+  public void testKeystoreType() {
+    SSLUtil.initGlobalSSLParameters();
+    String keystoreType = SSLUtil.getGlobalKeystoreType("default");
+
+    Assert.assertEquals(expectedValue, keystoreType);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststorePasswordTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststorePasswordTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststorePasswordTest.java
new file mode 100644
index 0000000..9d69d44
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststorePasswordTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SSLUtilTruststorePasswordTest extends AbstractSSLUtilTest {
+
+  public SSLUtilTruststorePasswordTest(String sysPropValue, String envVarValue,
+                                       String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+
+  @Override
+  protected String getSysPropName() {
+    return "javax.net.ssl.trustStorePassword";
+  }
+
+  @Override
+  protected String getEnvVarName() {
+    return "FLUME_SSL_TRUSTSTORE_PASSWORD";
+  }
+
+  @Test
+  public void testTruststorePassword() {
+    SSLUtil.initGlobalSSLParameters();
+    String truststorePassword = SSLUtil.getGlobalTruststorePassword();
+
+    Assert.assertEquals(expectedValue, truststorePassword);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststorePathTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststorePathTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststorePathTest.java
new file mode 100644
index 0000000..c3e23c6
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststorePathTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SSLUtilTruststorePathTest extends AbstractSSLUtilTest {
+
+  public SSLUtilTruststorePathTest(String sysPropValue, String envVarValue, String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+
+  @Override
+  protected String getSysPropName() {
+    return "javax.net.ssl.trustStore";
+  }
+
+  @Override
+  protected String getEnvVarName() {
+    return "FLUME_SSL_TRUSTSTORE_PATH";
+  }
+
+  @Test
+  public void testTruststorePath() {
+    SSLUtil.initGlobalSSLParameters();
+    String truststorePath = SSLUtil.getGlobalTruststorePath();
+
+    Assert.assertEquals(expectedValue, truststorePath);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststoreTypeTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststoreTypeTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststoreTypeTest.java
new file mode 100644
index 0000000..5ef080b
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststoreTypeTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SSLUtilTruststoreTypeTest extends AbstractSSLUtilTest {
+
+  public SSLUtilTruststoreTypeTest(String sysPropValue, String envVarValue, String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+
+  @Override
+  protected String getSysPropName() {
+    return "javax.net.ssl.trustStoreType";
+  }
+
+  @Override
+  protected String getEnvVarName() {
+    return "FLUME_SSL_TRUSTSTORE_TYPE";
+  }
+
+  @Test
+  public void testTruststoreType() {
+    SSLUtil.initGlobalSSLParameters();
+    String truststoreType = SSLUtil.getGlobalTruststoreType(null);
+
+    Assert.assertEquals(expectedValue, truststoreType);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststoreTypeWithDefaultTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststoreTypeWithDefaultTest.java b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststoreTypeWithDefaultTest.java
new file mode 100644
index 0000000..34dda4f
--- /dev/null
+++ b/flume-ng-sdk/src/test/java/org/apache/flume/util/SSLUtilTruststoreTypeWithDefaultTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+public class SSLUtilTruststoreTypeWithDefaultTest extends AbstractSSLUtilTest {
+
+  @Parameters
+  public static Collection<?> data() {
+    return Arrays.asList(new Object[][]{
+        // system property value, environment variable value, expected value
+        { null, null, "default" },
+        { "sysprop", null, "sysprop" },
+        { null, "envvar", "envvar" },
+        { "sysprop", "envvar", "sysprop" }
+    });
+  }
+
+  public SSLUtilTruststoreTypeWithDefaultTest(String sysPropValue, String envVarValue,
+                                              String expectedValue) {
+    super(sysPropValue, envVarValue, expectedValue);
+  }
+
+  @Override
+  protected String getSysPropName() {
+    return "javax.net.ssl.trustStoreType";
+  }
+
+  @Override
+  protected String getEnvVarName() {
+    return "FLUME_SSL_TRUSTSTORE_TYPE";
+  }
+
+  @Test
+  public void testTruststoreType() {
+    SSLUtil.initGlobalSSLParameters();
+    String truststoreType = SSLUtil.getGlobalTruststoreType("default");
+
+    Assert.assertEquals(expectedValue, truststoreType);
+  }
+
+}


[2/2] flume git commit: FLUME-3269: Support JSSE keystore/trustore -D system properties

Posted by sz...@apache.org.
FLUME-3269: Support JSSE keystore/trustore -D system properties

It makes possible to specify global/common SSL keystore parameters (path,
password and type) at Flume agent (process) level for all sources/sinks.
In this way, it is not necessary to define (=copy) the SSL config for each
component in the agent config.

The global SSL parameters can be specified through the standard -D JSSE
system properties or in environment variables.
Component level configuration is still possible.

Priority:
 1. component parameters in agent config
 2. -D system properties
 2. environment variables

This closes #228

Reviewers: Ferenc Szabo, Tristan Stevens, Endre Major

(Peter Turcsanyi via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c5168c90
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c5168c90
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c5168c90

Branch: refs/heads/trunk
Commit: c5168c902634e8ea1f25ec578ed0b7055b246d68
Parents: 1b43783
Author: Peter Turcsanyi <tu...@cloudera.com>
Authored: Fri Oct 5 11:40:28 2018 +0200
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Fri Oct 5 11:40:28 2018 +0200

----------------------------------------------------------------------
 .../apache/flume/client/avro/AvroCLIClient.java |   3 +
 .../org/apache/flume/source/AvroSource.java     |   8 +-
 .../org/apache/flume/source/ThriftSource.java   |   8 +-
 .../apache/flume/source/http/HTTPSource.java    |   9 +-
 .../org/apache/flume/sink/TestAvroSink.java     | 253 +++++++++----------
 .../org/apache/flume/sink/TestThriftSink.java   | 167 ++++++------
 .../org/apache/flume/source/TestAvroSource.java |  31 ++-
 .../apache/flume/source/TestThriftSource.java   |  47 +++-
 .../flume/source/http/TestHTTPSource.java       | 134 ++++++----
 flume-ng-doc/sphinx/FlumeUserGuide.rst          | 217 +++++++++++++---
 .../java/org/apache/flume/node/Application.java |   8 +-
 .../apache/flume/api/NettyAvroRpcClient.java    |  15 +-
 .../org/apache/flume/api/ThriftRpcClient.java   |  12 +-
 .../java/org/apache/flume/util/SSLUtil.java     | 106 ++++++++
 .../apache/flume/api/TestThriftRpcClient.java   |  10 +-
 .../apache/flume/util/AbstractSSLUtilTest.java  | 124 +++++++++
 .../flume/util/SSLUtilKeystorePasswordTest.java |  49 ++++
 .../flume/util/SSLUtilKeystorePathTest.java     |  48 ++++
 .../flume/util/SSLUtilKeystoreTypeTest.java     |  48 ++++
 .../SSLUtilKeystoreTypeWithDefaultTest.java     |  64 +++++
 .../util/SSLUtilTruststorePasswordTest.java     |  49 ++++
 .../flume/util/SSLUtilTruststorePathTest.java   |  48 ++++
 .../flume/util/SSLUtilTruststoreTypeTest.java   |  48 ++++
 .../SSLUtilTruststoreTypeWithDefaultTest.java   |  64 +++++
 24 files changed, 1233 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
index 242c821..0f0fda6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
@@ -44,6 +44,7 @@ import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.api.RpcClient;
 import org.apache.flume.api.RpcClientFactory;
 import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.util.SSLUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,6 +67,8 @@ public class AvroCLIClient {
   private int sent;
 
   public static void main(String[] args) {
+    SSLUtil.initGlobalSSLParameters();
+
     AvroCLIClient client = new AvroCLIClient();
 
     try {

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index a105bbe..e7b12bd 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -41,6 +41,7 @@ import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
+import org.apache.flume.util.SSLUtil;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
@@ -180,9 +181,10 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     }
 
     enableSsl = context.getBoolean(SSL_KEY, false);
-    keystore = context.getString(KEYSTORE_KEY);
-    keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY);
-    keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS");
+    keystore = context.getString(KEYSTORE_KEY, SSLUtil.getGlobalKeystorePath());
+    keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY,
+        SSLUtil.getGlobalKeystorePassword());
+    keystoreType = context.getString(KEYSTORE_TYPE_KEY, SSLUtil.getGlobalKeystoreType("JKS"));
     String excludeProtocolsStr = context.getString(EXCLUDE_PROTOCOLS);
     if (excludeProtocolsStr == null) {
       excludeProtocols.add("SSLv3");

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
index 33c37f2..637c42e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
@@ -34,6 +34,7 @@ import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.thrift.Status;
 import org.apache.flume.thrift.ThriftSourceProtocol;
 import org.apache.flume.thrift.ThriftFlumeEvent;
+import org.apache.flume.util.SSLUtil;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -160,9 +161,10 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
 
     enableSsl = context.getBoolean(SSL_KEY, false);
     if (enableSsl) {
-      keystore = context.getString(KEYSTORE_KEY);
-      keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY);
-      keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS");
+      keystore = context.getString(KEYSTORE_KEY, SSLUtil.getGlobalKeystorePath());
+      keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY,
+          SSLUtil.getGlobalKeystorePassword());
+      keystoreType = context.getString(KEYSTORE_TYPE_KEY, SSLUtil.getGlobalKeystoreType("JKS"));
       String excludeProtocolsStr = context.getString(EXCLUDE_PROTOCOLS);
       if (excludeProtocolsStr == null) {
         excludeProtocols.add("SSLv3");

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
index d14bde2..e9324fb 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
@@ -28,6 +28,7 @@ import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.AbstractSource;
 import org.apache.flume.tools.FlumeBeanConfigurator;
 import org.apache.flume.tools.HTTPServerConstraintUtil;
+import org.apache.flume.util.SSLUtil;
 import org.eclipse.jetty.server.HttpConfiguration;
 import org.eclipse.jetty.server.HttpConnectionFactory;
 import org.eclipse.jetty.server.SecureRequestCustomizer;
@@ -130,11 +131,13 @@ public class HTTPSource extends AbstractSource implements
 
       if (sslEnabled) {
         LOG.debug("SSL configuration enabled");
-        keyStorePath = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE);
+        keyStorePath = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE,
+                SSLUtil.getGlobalKeystorePath());
         Preconditions.checkArgument(keyStorePath != null && !keyStorePath.isEmpty(),
                                     "Keystore is required for SSL Conifguration" );
-        keyStorePassword =
-            context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD);
+        keyStorePassword = context.getString(
+                HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD,
+                SSLUtil.getGlobalKeystorePassword());
         Preconditions.checkArgument(keyStorePassword != null,
             "Keystore password is required for SSL Configuration");
         String excludeProtocolsStr =

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index 8b6f493..cc2c91a 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -103,13 +103,8 @@ public class TestAvroSink {
     sink = new AvroSink();
     channel = new MemoryChannel();
 
-    Context context = new Context();
+    Context context = createBaseContext();
 
-    context.put("hostname", hostname);
-    context.put("port", String.valueOf(port));
-    context.put("batch-size", String.valueOf(2));
-    context.put("connect-timeout", String.valueOf(2000L));
-    context.put("request-timeout", String.valueOf(3000L));
     if (compressionType.equals("deflate")) {
       context.put("compression-type", compressionType);
       context.put("compression-level", Integer.toString(compressionLevel));
@@ -121,6 +116,28 @@ public class TestAvroSink {
     Configurables.configure(channel, context);
   }
 
+  private Context createBaseContext() {
+    Context context = new Context();
+
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
+
+    return context;
+  }
+
+  private Server createServer(AvroSourceProtocol protocol)
+      throws IllegalAccessException, InstantiationException {
+
+    Server server = new NettyServer(new SpecificResponder(
+        AvroSourceProtocol.class, protocol), new InetSocketAddress(
+        hostname, port));
+
+    return server;
+  }
+
   @Test
   public void testLifecycle() throws InterruptedException,
       InstantiationException, IllegalAccessException {
@@ -384,85 +401,101 @@ public class TestAvroSink {
   }
 
   @Test
-  public void testSslProcess() throws InterruptedException,
+  public void testSslProcessTrustAllCerts() throws InterruptedException,
       EventDeliveryException, InstantiationException, IllegalAccessException {
     setUp();
-    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
-    Server server = createSslServer(new MockAvroServer());
-
-    server.start();
-
-    Context context = new Context();
 
-    context.put("hostname", hostname);
-    context.put("port", String.valueOf(port));
+    Context context = createBaseContext();
     context.put("ssl", String.valueOf(true));
     context.put("trust-all-certs", String.valueOf(true));
-    context.put("batch-size", String.valueOf(2));
-    context.put("connect-timeout", String.valueOf(2000L));
-    context.put("request-timeout", String.valueOf(3000L));
 
     Configurables.configure(sink, context);
 
-    sink.start();
-    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
-        LifecycleState.START_OR_ERROR, 5000));
+    doTestSslProcess();
+  }
 
-    Transaction transaction = channel.getTransaction();
+  @Test
+  public void testSslProcessWithComponentTruststore() throws InterruptedException,
+      EventDeliveryException, InstantiationException, IllegalAccessException {
+    setUp();
 
-    transaction.begin();
-    for (int i = 0; i < 10; i++) {
-      channel.put(event);
-    }
-    transaction.commit();
-    transaction.close();
+    Context context = createBaseContext();
+    context.put("ssl", String.valueOf(true));
+    context.put("truststore", "src/test/resources/truststore.jks");
+    context.put("truststore-password", "password");
 
-    for (int i = 0; i < 5; i++) {
-      Sink.Status status = sink.process();
-      Assert.assertEquals(Sink.Status.READY, status);
-    }
+    Configurables.configure(sink, context);
 
-    Assert.assertEquals(Sink.Status.BACKOFF, sink.process());
+    doTestSslProcess();
+  }
 
-    sink.stop();
-    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
-        LifecycleState.STOP_OR_ERROR, 5000));
+  @Test
+  public void testSslProcessWithComponentTruststoreNoPassword() throws InterruptedException,
+      EventDeliveryException, InstantiationException, IllegalAccessException {
+    setUp();
 
-    server.close();
+    Context context = createBaseContext();
+    context.put("ssl", String.valueOf(true));
+    context.put("truststore", "src/test/resources/truststore.jks");
+
+    Configurables.configure(sink, context);
+
+    doTestSslProcess();
   }
 
   @Test
-  public void testSslProcessWithTrustStore() throws InterruptedException,
+  public void testSslProcessWithGlobalTruststore() throws InterruptedException,
       EventDeliveryException, InstantiationException, IllegalAccessException {
     setUp();
-    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
-    Server server = createSslServer(new MockAvroServer());
 
-    server.start();
+    System.setProperty("javax.net.ssl.trustStore", "src/test/resources/truststore.jks");
+    System.setProperty("javax.net.ssl.trustStorePassword", "password");
 
-    Context context = new Context();
+    Context context = createBaseContext();
+    context.put("ssl", String.valueOf(true));
 
-    context.put("hostname", hostname);
-    context.put("port", String.valueOf(port));
+    Configurables.configure(sink, context);
+
+    doTestSslProcess();
+
+    System.clearProperty("javax.net.ssl.trustStore");
+    System.clearProperty("javax.net.ssl.trustStorePassword");
+  }
+
+  @Test
+  public void testSslProcessWithGlobalTruststoreNoPassword() throws InterruptedException,
+      EventDeliveryException, InstantiationException, IllegalAccessException {
+    setUp();
+
+    System.setProperty("javax.net.ssl.trustStore", "src/test/resources/truststore.jks");
+
+    Context context = createBaseContext();
     context.put("ssl", String.valueOf(true));
-    context.put("truststore", "src/test/resources/truststore.jks");
-    context.put("truststore-password", "password");
-    context.put("batch-size", String.valueOf(2));
-    context.put("connect-timeout", String.valueOf(2000L));
-    context.put("request-timeout", String.valueOf(3000L));
 
     Configurables.configure(sink, context);
 
+    doTestSslProcess();
+
+    System.clearProperty("javax.net.ssl.trustStore");
+  }
+
+  private void doTestSslProcess() throws InterruptedException,
+      EventDeliveryException, InstantiationException, IllegalAccessException {
+    Server server = createSslServer(new MockAvroServer());
+    server.start();
+
     sink.start();
     Assert.assertTrue(LifecycleController.waitForOneOf(sink,
         LifecycleState.START_OR_ERROR, 5000));
 
     Transaction transaction = channel.getTransaction();
-
     transaction.begin();
+
+    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
     for (int i = 0; i < 10; i++) {
       channel.put(event);
     }
+
     transaction.commit();
     transaction.close();
 
@@ -480,16 +513,6 @@ public class TestAvroSink {
     server.close();
   }
 
-  private Server createServer(AvroSourceProtocol protocol)
-      throws IllegalAccessException, InstantiationException {
-
-    Server server = new NettyServer(new SpecificResponder(
-        AvroSourceProtocol.class, protocol), new InetSocketAddress(
-        hostname, port));
-
-    return server;
-  }
-
   @Test
   public void testSslWithCompression() throws InterruptedException,
       EventDeliveryException, InstantiationException, IllegalAccessException {
@@ -538,15 +561,9 @@ public class TestAvroSink {
     Event event = EventBuilder.withBody("Hello avro",
         Charset.forName("UTF8"));
 
-    context = new Context();
-
-    context.put("hostname", hostname);
-    context.put("port", String.valueOf(port));
+    context = createBaseContext();
     context.put("ssl", String.valueOf(true));
     context.put("trust-all-certs", String.valueOf(true));
-    context.put("batch-size", String.valueOf(2));
-    context.put("connect-timeout", String.valueOf(2000L));
-    context.put("request-timeout", String.valueOf(3000L));
     context.put("compression-type", "deflate");
     context.put("compression-level", Integer.toString(6));
 
@@ -591,57 +608,25 @@ public class TestAvroSink {
 
   @Test
   public void testSslSinkWithNonSslServer() throws InterruptedException,
-      EventDeliveryException, InstantiationException, IllegalAccessException {
+      InstantiationException, IllegalAccessException {
     setUp();
-    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
-    Server server = createServer(new MockAvroServer());
 
+    Server server = createServer(new MockAvroServer());
     server.start();
 
-    Context context = new Context();
-
-    context.put("hostname", hostname);
-    context.put("port", String.valueOf(port));
+    Context context = createBaseContext();
     context.put("ssl", String.valueOf(true));
     context.put("trust-all-certs", String.valueOf(true));
-    context.put("batch-size", String.valueOf(2));
-    context.put("connect-timeout", String.valueOf(2000L));
-    context.put("request-timeout", String.valueOf(3000L));
 
     Configurables.configure(sink, context);
 
-    sink.start();
-    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
-        LifecycleState.START_OR_ERROR, 5000));
-
-    Transaction transaction = channel.getTransaction();
-
-    transaction.begin();
-    for (int i = 0; i < 10; i++) {
-      channel.put(event);
-    }
-    transaction.commit();
-    transaction.close();
-
-    boolean failed = false;
-    try {
-      for (int i = 0; i < 5; i++) {
-        sink.process();
-        failed = true;
-      }
-    } catch (EventDeliveryException ex) {
-      logger.info("Correctly failed to send event", ex);
-    }
-
-
-    sink.stop();
-    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
-        LifecycleState.STOP_OR_ERROR, 5000));
+    boolean failed = doRequestWhenFailureExpected();
 
     server.close();
 
-    if (failed) {
-      Assert.fail("SSL-enabled sink successfully connected to a non-SSL-enabled server, that's wrong.");
+    if (!failed) {
+      Assert.fail("SSL-enabled sink successfully connected to a non-SSL-enabled server, " +
+          "that's wrong.");
     }
 
     SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
@@ -649,61 +634,59 @@ public class TestAvroSink {
   }
 
   @Test
-  public void testSslSinkWithNonTrustedCert()
-      throws InterruptedException, EventDeliveryException, InstantiationException,
-             IllegalAccessException {
+  public void testSslSinkWithNonTrustedCert() throws InterruptedException,
+      InstantiationException, IllegalAccessException {
     setUp();
-    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
-    Server server = createSslServer(new MockAvroServer());
 
+    Server server = createSslServer(new MockAvroServer());
     server.start();
 
-    Context context = new Context();
-
-    context.put("hostname", hostname);
-    context.put("port", String.valueOf(port));
+    Context context = createBaseContext();
     context.put("ssl", String.valueOf(true));
-    context.put("batch-size", String.valueOf(2));
-    context.put("connect-timeout", String.valueOf(2000L));
-    context.put("request-timeout", String.valueOf(3000L));
 
     Configurables.configure(sink, context);
 
+    boolean failed = doRequestWhenFailureExpected();
+
+    server.close();
+
+    if (!failed) {
+      Assert.fail("SSL-enabled sink successfully connected to a server with an " +
+          "untrusted certificate when it should have failed");
+    }
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sinkCounter.getEventWriteFail());
+  }
+
+  private boolean doRequestWhenFailureExpected()
+      throws InterruptedException {
     sink.start();
     Assert.assertTrue(LifecycleController.waitForOneOf(sink,
         LifecycleState.START_OR_ERROR, 5000));
 
     Transaction transaction = channel.getTransaction();
-
     transaction.begin();
-    for (int i = 0; i < 10; i++) {
-      channel.put(event);
-    }
+
+    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+    channel.put(event);
+
     transaction.commit();
     transaction.close();
 
-    boolean failed = false;
+    boolean failed;
     try {
-      for (int i = 0; i < 5; i++) {
-        sink.process();
-        failed = true;
-      }
+      sink.process();
+      failed = false;
     } catch (EventDeliveryException ex) {
       logger.info("Correctly failed to send event", ex);
+      failed = true;
     }
 
-
     sink.stop();
     Assert.assertTrue(LifecycleController.waitForOneOf(sink,
         LifecycleState.STOP_OR_ERROR, 5000));
 
-    server.close();
-
-    if (failed) {
-      Assert.fail("SSL-enabled sink successfully connected to a server with an untrusted certificate when it should have failed");
-    }
-    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
-    Assert.assertEquals(1, sinkCounter.getEventWriteFail());
+    return failed;
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
index 687c635..c573fe7 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestThriftSink.java
@@ -60,17 +60,24 @@ public class TestThriftSink {
     try (ServerSocket socket = new ServerSocket(0)) {
       port = socket.getLocalPort();
     }
+    Context context = createBaseContext();
+    context.put(ThriftRpcClient.CONFIG_PROTOCOL, ThriftRpcClient.COMPACT_PROTOCOL);
+    sink.setChannel(channel);
+
+    Configurables.configure(sink, context);
+    Configurables.configure(channel, context);
+  }
+
+  private Context createBaseContext() {
     Context context = new Context();
 
     context.put("hostname", hostname);
     context.put("port", String.valueOf(port));
     context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
     context.put("request-timeout", String.valueOf(2000L));
-    context.put(ThriftRpcClient.CONFIG_PROTOCOL, ThriftRpcClient.COMPACT_PROTOCOL);
-    sink.setChannel(channel);
 
-    Configurables.configure(sink, context);
-    Configurables.configure(channel, context);
+    return context;
   }
 
   @After
@@ -146,7 +153,7 @@ public class TestThriftSink {
     sink.process();
 
     // should throw another EventDeliveryException due to request timeout
-    delay.set(2500L); // because request-timeout = 3000
+    delay.set(2500L); // because request-timeout = 2000
     threw = false;
     try {
       sink.process();
@@ -201,32 +208,77 @@ public class TestThriftSink {
   }
 
   @Test
-  public void testSslProcess() throws Exception {
-    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
-    src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), port,
-            ThriftRpcClient.COMPACT_PROTOCOL, "src/test/resources/keystorefile.jks",
-            "password", KeyManagerFactory.getDefaultAlgorithm(), "JKS");
-    Context context = new Context();
-    context.put("hostname", hostname);
-    context.put("port", String.valueOf(port));
+  public void testSslProcessWithComponentTruststore() throws Exception {
+    Context context = createBaseContext();
     context.put("ssl", String.valueOf(true));
-    context.put("batch-size", String.valueOf(2));
-    context.put("connect-timeout", String.valueOf(2000L));
-    context.put("request-timeout", String.valueOf(3000L));
     context.put("truststore", "src/test/resources/truststorefile.jks");
     context.put("truststore-password", "password");
-    context.put("trustmanager-type", TrustManagerFactory.getDefaultAlgorithm());
 
     Configurables.configure(sink, context);
+
+    doTestSslProcess();
+  }
+
+  @Test
+  public void testSslProcessWithComponentTruststoreNoPassword() throws Exception {
+    Context context = createBaseContext();
+    context.put("ssl", String.valueOf(true));
+    context.put("truststore", "src/test/resources/truststorefile.jks");
+
+    Configurables.configure(sink, context);
+
+    doTestSslProcess();
+  }
+
+  @Test
+  public void testSslProcessWithGlobalTruststore() throws Exception {
+    System.setProperty("javax.net.ssl.trustStore", "src/test/resources/truststorefile.jks");
+    System.setProperty("javax.net.ssl.trustStorePassword", "password");
+
+    Context context = createBaseContext();
+    context.put("ssl", String.valueOf(true));
+
+    Configurables.configure(sink, context);
+
+    doTestSslProcess();
+
+    System.clearProperty("javax.net.ssl.trustStore");
+    System.clearProperty("javax.net.ssl.trustStorePassword");
+  }
+
+  @Test
+  public void testSslProcessWithGlobalTruststoreNoPassword() throws Exception {
+    System.setProperty("javax.net.ssl.trustStore", "src/test/resources/truststorefile.jks");
+
+    Context context = createBaseContext();
+    context.put("ssl", String.valueOf(true));
+
+    Configurables.configure(sink, context);
+
+    doTestSslProcess();
+
+    System.clearProperty("javax.net.ssl.trustStore");
+  }
+
+  private void doTestSslProcess() throws Exception {
+    src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), port,
+        ThriftRpcClient.COMPACT_PROTOCOL, "src/test/resources/keystorefile.jks",
+        "password", KeyManagerFactory.getDefaultAlgorithm(), "JKS");
+
     channel.start();
     sink.start();
+
     Transaction transaction = channel.getTransaction();
     transaction.begin();
+
+    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
     for (int i = 0; i < 11; i++) {
       channel.put(event);
     }
+
     transaction.commit();
     transaction.close();
+
     for (int i = 0; i < 6; i++) {
       Sink.Status status = sink.process();
       Assert.assertEquals(Sink.Status.READY, status);
@@ -241,48 +293,18 @@ public class TestThriftSink {
 
   @Test
   public void testSslSinkWithNonSslServer() throws Exception {
-    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
     src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(),
             port, ThriftRpcClient.COMPACT_PROTOCOL);
 
-    Context context = new Context();
-    context.put("hostname", hostname);
-    context.put("port", String.valueOf(port));
+    Context context = createBaseContext();
     context.put("ssl", String.valueOf(true));
-    context.put("batch-size", String.valueOf(2));
-    context.put("connect-timeout", String.valueOf(2000L));
-    context.put("request-timeout", String.valueOf(3000L));
     context.put("truststore", "src/test/resources/truststorefile.jks");
     context.put("truststore-password", "password");
-    context.put("trustmanager-type", TrustManagerFactory.getDefaultAlgorithm());
 
     Configurables.configure(sink, context);
-    channel.start();
-    sink.start();
-    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
-            LifecycleState.START_OR_ERROR, 5000));
-    Transaction transaction = channel.getTransaction();
-    transaction.begin();
-    for (int i = 0; i < 11; i++) {
-      channel.put(event);
-    }
-    transaction.commit();
-    transaction.close();
 
-    boolean failed = false;
-    try {
-      for (int i = 0; i < 6; i++) {
-        Sink.Status status = sink.process();
-        failed = true;
-      }
-    } catch (EventDeliveryException ex) {
-      // This is correct
-    }
-
-    sink.stop();
-    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
-            LifecycleState.STOP_OR_ERROR, 5000));
-    if (failed) {
+    boolean failed = doRequestWhenFailureExpected();
+    if (!failed) {
       Assert.fail("SSL-enabled sink successfully connected to a non-SSL-enabled server, " +
                   "that's wrong.");
     }
@@ -290,48 +312,51 @@ public class TestThriftSink {
 
   @Test
   public void testSslSinkWithNonTrustedCert() throws Exception {
-    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
     src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), port,
             ThriftRpcClient.COMPACT_PROTOCOL, "src/test/resources/keystorefile.jks",
             "password", KeyManagerFactory.getDefaultAlgorithm(), "JKS");
 
-    Context context = new Context();
-    context.put("hostname", hostname);
-    context.put("port", String.valueOf(port));
+    Context context = createBaseContext();
     context.put("ssl", String.valueOf(true));
-    context.put("batch-size", String.valueOf(2));
-    context.put("connect-timeout", String.valueOf(2000L));
-    context.put("request-timeout", String.valueOf(3000L));
 
     Configurables.configure(sink, context);
+
+    boolean failed = doRequestWhenFailureExpected();
+    if (!failed) {
+      Assert.fail("SSL-enabled sink successfully connected to a server with an " +
+                  "untrusted certificate when it should have failed");
+    }
+  }
+
+  private boolean doRequestWhenFailureExpected() throws Exception {
     channel.start();
     sink.start();
     Assert.assertTrue(LifecycleController.waitForOneOf(sink,
-            LifecycleState.START_OR_ERROR, 5000));
+        LifecycleState.START_OR_ERROR, 5000));
+
     Transaction transaction = channel.getTransaction();
     transaction.begin();
-    for (int i = 0; i < 11; i++) {
-      channel.put(event);
-    }
+
+    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+    channel.put(event);
+
     transaction.commit();
     transaction.close();
 
-    boolean failed = false;
+    boolean failed;
     try {
-      for (int i = 0; i < 6; i++) {
-        Sink.Status status = sink.process();
-        failed = true;
-      }
+      Sink.Status status = sink.process();
+      failed = false;
     } catch (EventDeliveryException ex) {
       // This is correct
+      failed = true;
     }
 
     sink.stop();
     Assert.assertTrue(LifecycleController.waitForOneOf(sink,
-            LifecycleState.STOP_OR_ERROR, 5000));
-    if (failed) {
-      Assert.fail("SSL-enabled sink successfully connected to a server with an " +
-                  "untrusted certificate when it should have failed");
-    }
+        LifecycleState.STOP_OR_ERROR, 5000));
+
+    return failed;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
index 6f784ea..21e65ad 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
@@ -329,7 +329,7 @@ public class TestAvroSource {
   }
 
   @Test
-  public void testSslRequest() throws InterruptedException, IOException {
+  public void testSslRequestWithComponentKeystore() throws InterruptedException, IOException {
 
     Context context = new Context();
 
@@ -339,7 +339,34 @@ public class TestAvroSource {
     context.put("keystore", "src/test/resources/server.p12");
     context.put("keystore-password", "password");
     context.put("keystore-type", "PKCS12");
+
+    Configurables.configure(source, context);
+
+    doSslRequest();
+  }
+
+  @Test
+  public void testSslRequestWithGlobalKeystore() throws InterruptedException, IOException {
+
+    System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.p12");
+    System.setProperty("javax.net.ssl.keyStorePassword", "password");
+    System.setProperty("javax.net.ssl.keyStoreType", "PKCS12");
+
+    Context context = new Context();
+
+    context.put("port", String.valueOf(selectedPort = getFreePort()));
+    context.put("bind", "0.0.0.0");
+    context.put("ssl", "true");
+
     Configurables.configure(source, context);
+
+    doSslRequest();
+
+    System.clearProperty("javax.net.ssl.keyStore");
+    System.clearProperty("javax.net.ssl.keyStorePassword");
+  }
+
+  private void doSslRequest() throws InterruptedException, IOException {
     source.start();
 
     Assert
@@ -350,7 +377,7 @@ public class TestAvroSource {
 
     AvroSourceProtocol client = SpecificRequestor.getClient(
         AvroSourceProtocol.class, new NettyTransceiver(new InetSocketAddress(
-        selectedPort), new SSLChannelFactory()));
+            selectedPort), new SSLChannelFactory()));
 
     AvroFlumeEvent avroEvent = new AvroFlumeEvent();
 

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
index d594276..610d6fc 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
@@ -102,24 +102,55 @@ public class TestThriftSource {
   }
 
   @Test
-  public void testAppendSSL() throws Exception {
-    Properties sslprops = (Properties)props.clone();
-    sslprops.put("ssl", "true");
-    sslprops.put("truststore", "src/test/resources/truststorefile.jks");
-    sslprops.put("truststore-password", "password");
-    sslprops.put("trustmanager-type", TrustManagerFactory.getDefaultAlgorithm());
-    client = RpcClientFactory.getThriftInstance(sslprops);
+  public void testAppendSSLWithComponentKeystore() throws Exception {
 
     Context context = new Context();
     channel.configure(context);
     configureSource();
+
     context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
     context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
     context.put("ssl", "true");
     context.put("keystore", "src/test/resources/keystorefile.jks");
     context.put("keystore-password", "password");
-    context.put("keymanager-type", KeyManagerFactory.getDefaultAlgorithm());
+    context.put("keystore-type", "JKS");
+
+    Configurables.configure(source, context);
+
+    doAppendSSL();
+  }
+
+  @Test
+  public void testAppendSSLWithGlobalKeystore() throws Exception {
+
+    System.setProperty("javax.net.ssl.keyStore", "src/test/resources/keystorefile.jks");
+    System.setProperty("javax.net.ssl.keyStorePassword", "password");
+    System.setProperty("javax.net.ssl.keyStoreType", "JKS");
+
+    Context context = new Context();
+    channel.configure(context);
+    configureSource();
+
+    context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
+    context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
+    context.put("ssl", "true");
+
     Configurables.configure(source, context);
+
+    doAppendSSL();
+
+    System.clearProperty("javax.net.ssl.keyStore");
+    System.clearProperty("javax.net.ssl.keyStorePassword");
+    System.clearProperty("javax.net.ssl.keyStoreType");
+  }
+
+  private void doAppendSSL() throws EventDeliveryException {
+    Properties sslprops = (Properties)props.clone();
+    sslprops.put("ssl", "true");
+    sslprops.put("truststore", "src/test/resources/truststorefile.jks");
+    sslprops.put("truststore-password", "password");
+    client = RpcClientFactory.getThriftInstance(sslprops);
+
     source.start();
     for (int i = 0; i < 30; i++) {
       client.append(EventBuilder.withBody(String.valueOf(i).getBytes()));

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
index 04eec24..39949e2 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
@@ -22,7 +22,6 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import junit.framework.Assert;
 import org.apache.flume.Channel;
-import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -90,15 +89,18 @@ import static org.mockito.Mockito.doThrow;
  */
 public class TestHTTPSource {
 
-  private static HTTPSource source;
+  private static HTTPSource httpSource;
   private static HTTPSource httpsSource;
+  private static HTTPSource httpsGlobalKeystoreSource;
 
-  private static Channel channel;
+  private static Channel httpChannel;
   private static Channel httpsChannel;
-  private static int selectedPort;
-  private static int sslPort;
-  HttpClient httpClient;
-  HttpPost postRequest;
+  private static Channel httpsGlobalKeystoreChannel;
+  private static int httpPort;
+  private static int httpsPort;
+  private static int httpsGlobalKeystorePort;
+  private HttpClient httpClient;
+  private HttpPost postRequest;
 
   private static int findFreePort() throws IOException {
     ServerSocket socket = new ServerSocket(0);
@@ -107,17 +109,17 @@ public class TestHTTPSource {
     return port;
   }
 
-  private static Context getDefaultNonSecureContext(int selectedPort) throws IOException {
+  private static Context getDefaultNonSecureContext(int port) throws IOException {
     Context ctx = new Context();
     ctx.put(HTTPSourceConfigurationConstants.CONFIG_BIND, "0.0.0.0");
-    ctx.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(selectedPort));
+    ctx.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(port));
     ctx.put("QueuedThreadPool.MaxThreads", "100");
     return ctx;
   }
 
-  private static Context getDefaultSecureContext(int sslPort) throws IOException {
+  private static Context getDefaultSecureContext(int port) throws IOException {
     Context sslContext = new Context();
-    sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(sslPort));
+    sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(port));
     sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true");
     sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD, "password");
     sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE,
@@ -125,21 +127,42 @@ public class TestHTTPSource {
     return sslContext;
   }
 
+  private static Context getDefaultSecureContextGlobalKeystore(int port) throws IOException {
+    System.setProperty("javax.net.ssl.keyStore", "src/test/resources/jettykeystore");
+    System.setProperty("javax.net.ssl.keyStorePassword", "password");
+
+    Context sslContext = new Context();
+    sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, String.valueOf(port));
+    sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true");
+    return sslContext;
+  }
+
   @BeforeClass
   public static void setUpClass() throws Exception {
-    source = new HTTPSource();
-    channel = new MemoryChannel();
-    selectedPort = findFreePort();
-    configureSourceAndChannel(source, channel, getDefaultNonSecureContext(selectedPort));
-    channel.start();
-    source.start();
+    httpSource = new HTTPSource();
+    httpChannel = new MemoryChannel();
+    httpPort = findFreePort();
+    configureSourceAndChannel(httpSource, httpChannel, getDefaultNonSecureContext(httpPort));
+    httpChannel.start();
+    httpSource.start();
 
     httpsSource = new HTTPSource();
     httpsChannel = new MemoryChannel();
-    sslPort = findFreePort();
-    configureSourceAndChannel(httpsSource, httpsChannel, getDefaultSecureContext(sslPort));
+    httpsPort = findFreePort();
+    configureSourceAndChannel(httpsSource, httpsChannel, getDefaultSecureContext(httpsPort));
     httpsChannel.start();
     httpsSource.start();
+
+    httpsGlobalKeystoreSource = new HTTPSource();
+    httpsGlobalKeystoreChannel = new MemoryChannel();
+    httpsGlobalKeystorePort = findFreePort();
+    configureSourceAndChannel(httpsGlobalKeystoreSource, httpsGlobalKeystoreChannel,
+        getDefaultSecureContextGlobalKeystore(httpsGlobalKeystorePort));
+    httpsGlobalKeystoreChannel.start();
+    httpsGlobalKeystoreSource.start();
+
+    System.clearProperty("javax.net.ssl.keyStore");
+    System.clearProperty("javax.net.ssl.keyStorePassword");
   }
 
   private static void configureSourceAndChannel(
@@ -158,17 +181,19 @@ public class TestHTTPSource {
 
   @AfterClass
   public static void tearDownClass() throws Exception {
-    source.stop();
-    channel.stop();
+    httpSource.stop();
+    httpChannel.stop();
     httpsSource.stop();
     httpsChannel.stop();
+    httpsGlobalKeystoreSource.stop();
+    httpsGlobalKeystoreChannel.stop();
   }
 
   @Before
   public void setUp() {
     HttpClientBuilder builder = HttpClientBuilder.create();
     httpClient = builder.build();
-    postRequest = new HttpPost("http://0.0.0.0:" + selectedPort);
+    postRequest = new HttpPost("http://0.0.0.0:" + httpPort);
   }
 
   @Test
@@ -185,14 +210,14 @@ public class TestHTTPSource {
 
     Assert.assertEquals(HttpServletResponse.SC_OK,
             response.getStatusLine().getStatusCode());
-    Transaction tx = channel.getTransaction();
+    Transaction tx = httpChannel.getTransaction();
     tx.begin();
-    Event e = channel.take();
+    Event e = httpChannel.take();
     Assert.assertNotNull(e);
     Assert.assertEquals("b", e.getHeaders().get("a"));
     Assert.assertEquals("random_body", new String(e.getBody(), "UTF-8"));
 
-    e = channel.take();
+    e = httpChannel.take();
     Assert.assertNotNull(e);
     Assert.assertEquals("f", e.getHeaders().get("e"));
     Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-8"));
@@ -202,12 +227,12 @@ public class TestHTTPSource {
 
   @Test
   public void testTrace() throws Exception {
-    doTestForbidden(new HttpTrace("http://0.0.0.0:" + selectedPort));
+    doTestForbidden(new HttpTrace("http://0.0.0.0:" + httpPort));
   }
 
   @Test
   public void testOptions() throws Exception {
-    doTestForbidden(new HttpOptions("http://0.0.0.0:" + selectedPort));
+    doTestForbidden(new HttpOptions("http://0.0.0.0:" + httpPort));
   }
 
   private void doTestForbidden(HttpRequestBase request) throws Exception {
@@ -228,14 +253,14 @@ public class TestHTTPSource {
 
     Assert.assertEquals(HttpServletResponse.SC_OK,
             response.getStatusLine().getStatusCode());
-    Transaction tx = channel.getTransaction();
+    Transaction tx = httpChannel.getTransaction();
     tx.begin();
-    Event e = channel.take();
+    Event e = httpChannel.take();
     Assert.assertNotNull(e);
     Assert.assertEquals("b", e.getHeaders().get("a"));
     Assert.assertEquals("random_body", new String(e.getBody(), "UTF-16"));
 
-    e = channel.take();
+    e = httpChannel.take();
     Assert.assertNotNull(e);
     Assert.assertEquals("f", e.getHeaders().get("e"));
     Assert.assertEquals("random_body2", new String(e.getBody(), "UTF-16"));
@@ -253,7 +278,7 @@ public class TestHTTPSource {
 
     Assert.assertEquals(HttpServletResponse.SC_BAD_REQUEST,
             response.getStatusLine().getStatusCode());
-    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(httpSource, "sourceCounter");
     Assert.assertEquals(1, sc.getEventReadFail());
 
   }
@@ -277,12 +302,12 @@ public class TestHTTPSource {
   public void testCounterGenericFail() throws Exception {
     ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
     doThrow(new RuntimeException("dummy")).when(cp).processEventBatch(anyListOf(Event.class));
-    ChannelProcessor oldCp = source.getChannelProcessor();
-    source.setChannelProcessor(cp);
+    ChannelProcessor oldCp = httpSource.getChannelProcessor();
+    httpSource.setChannelProcessor(cp);
     testBatchWithVariousEncoding("UTF-8");
-    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(httpSource, "sourceCounter");
     Assert.assertEquals(1, sc.getGenericProcessingFail());
-    source.setChannelProcessor(oldCp);
+    httpSource.setChannelProcessor(oldCp);
   }
 
   @Test
@@ -293,9 +318,9 @@ public class TestHTTPSource {
     postRequest.setEntity(input);
 
     httpClient.execute(postRequest);
-    Transaction tx = channel.getTransaction();
+    Transaction tx = httpChannel.getTransaction();
     tx.begin();
-    Event e = channel.take();
+    Event e = httpChannel.take();
     Assert.assertNotNull(e);
     Assert.assertEquals("b", e.getHeaders().get("a"));
     Assert.assertEquals("random_body", new String(e.getBody(),"UTF-8"));
@@ -323,9 +348,9 @@ public class TestHTTPSource {
     Assert.assertTrue(resp.getHeaders("X-Powered-By").length == 0);
     Assert.assertTrue(resp.getHeaders("Server").length == 1);
 
-    Transaction tx = channel.getTransaction();
+    Transaction tx = httpChannel.getTransaction();
     tx.begin();
-    Event e = channel.take();
+    Event e = httpChannel.take();
     Assert.assertNotNull(e);
     tx.commit();
     tx.close();
@@ -375,7 +400,7 @@ public class TestHTTPSource {
 
     newPostRequest = new HttpPost("http://0.0.0.0:" + newPort);
     try {
-      doTestHttps(null, newPort);
+      doTestHttps(null, newPort, httpsChannel);
       //We are testing that this fails because we've deliberately configured the wrong protocols
       Assert.assertTrue(false);
     } catch (AssertionError ex) {
@@ -390,22 +415,22 @@ public class TestHTTPSource {
     HttpResponse response = putWithEncoding("UTF-8", 150).response;
     Assert.assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
             response.getStatusLine().getStatusCode());
-    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(httpSource, "sourceCounter");
     Assert.assertEquals(1, sc.getChannelWriteFail());
   }
 
   @Test
   public void testFail() throws Exception {
     HTTPSourceHandler handler = field("handler").ofType(HTTPSourceHandler.class)
-            .in(source).get();
+            .in(httpSource).get();
     //Cause an exception in the source - this is equivalent to any exception
     //thrown by the handler since the handler is called inside a try-catch
-    field("handler").ofType(HTTPSourceHandler.class).in(source).set(null);
+    field("handler").ofType(HTTPSourceHandler.class).in(httpSource).set(null);
     HttpResponse response = putWithEncoding("UTF-8", 1).response;
     Assert.assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
             response.getStatusLine().getStatusCode());
     //Set the original handler back so tests don't fail after this runs.
-    field("handler").ofType(HTTPSourceHandler.class).in(source).set(handler);
+    field("handler").ofType(HTTPSourceHandler.class).in(httpSource).set(handler);
   }
 
   @Test
@@ -458,15 +483,20 @@ public class TestHTTPSource {
 
   @Test
   public void testHttps() throws Exception {
-    doTestHttps(null, sslPort);
+    doTestHttps(null, httpsPort, httpsChannel);
   }
 
   @Test (expected = javax.net.ssl.SSLHandshakeException.class)
   public void testHttpsSSLv3() throws Exception {
-    doTestHttps("SSLv3", sslPort);
+    doTestHttps("SSLv3", httpsPort, httpsChannel);
+  }
+
+  @Test
+  public void testHttpsGlobalKeystore() throws Exception {
+    doTestHttps(null, httpsGlobalKeystorePort, httpsGlobalKeystoreChannel);
   }
 
-  public void doTestHttps(String protocol, int port) throws Exception {
+  private void doTestHttps(String protocol, int port, Channel channel) throws Exception {
     Type listType = new TypeToken<List<JSONEvent>>() {
     }.getType();
     List<JSONEvent> events = new ArrayList<JSONEvent>();
@@ -539,10 +569,10 @@ public class TestHTTPSource {
       int statusCode = httpsURLConnection.getResponseCode();
       Assert.assertEquals(200, statusCode);
 
-      transaction = httpsChannel.getTransaction();
+      transaction = channel.getTransaction();
       transaction.begin();
       for (int i = 0; i < 10; i++) {
-        Event e = httpsChannel.take();
+        Event e = channel.take();
         Assert.assertNotNull(e);
         Assert.assertEquals(String.valueOf(i), e.getHeaders().get("MsgNum"));
       }
@@ -577,7 +607,7 @@ public class TestHTTPSource {
     String json = gson.toJson(events, listType);
     HttpURLConnection httpURLConnection = null;
     try {
-      URL url = new URL("http://0.0.0.0:" + sslPort);
+      URL url = new URL("http://0.0.0.0:" + httpsPort);
       httpURLConnection = (HttpURLConnection) url.openConnection();
       httpURLConnection.setDoInput(true);
       httpURLConnection.setDoOutput(true);
@@ -594,12 +624,12 @@ public class TestHTTPSource {
   }
 
   private void takeWithEncoding(String encoding, int n, List<JSONEvent> events) throws Exception {
-    Transaction tx = channel.getTransaction();
+    Transaction tx = httpChannel.getTransaction();
     tx.begin();
     Event e = null;
     int i = 0;
     while (true) {
-      e = channel.take();
+      e = httpChannel.take();
       if (e == null) {
         break;
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 3b0c183..16adc79 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -749,6 +749,102 @@ the selector will attempt to write the events to the optional channels. Any
 failures are simply ignored in that case.
 
 
+SSL/TLS support
+---------------
+
+Several Flume components support the SSL/TLS protocols in order to communicate with other systems
+securely.
+
+===============  ======================
+Component        SSL server or client
+===============  ======================
+Avro Source      server
+Avro Sink        client
+Thrift Source    server
+Thrift Sink      client
+Kafka Source     client
+Kafka Channel    client
+Kafka Sink       client
+HTTP Source      server
+JMS Source       client
+===============  ======================
+
+The SSL compatible components have several configuration parameters to set up SSL, like
+enable SSL flag, keystore / truststore parameters (location, password, type) and additional
+SSL parameters (eg. disabled protocols).
+
+Enabling SSL for a component is always specified at component level in the agent configuration file.
+So some components may be configured to use SSL while others not (even with the same component type).
+
+The keystore / truststore setup can be specified at component level or globally.
+
+In case of the component level setup, the keystore / truststore is configured in the agent
+configuration file through component specific parameters. The advantage of this method is that the
+components can use different keystores (if this would be needed). The disadvantage is that the
+keystore parameters must be copied for each component in the agent configuration file.
+The component level setup is optional, but if it is defined, it has higher precedence than
+the global parameters.
+
+With the global setup, it is enough to define the keystore / truststore parameters once
+and use the same settings for all components, which means less and more centralized configuration.
+
+The global setup can be configured either through system properties or through environment variables.
+
+==================================  ===============================  ==================================
+System property                     Environment variable             Description
+==================================  ===============================  ==================================
+javax.net.ssl.keyStore              FLUME_SSL_KEYSTORE_PATH          Keystore location
+javax.net.ssl.keyStorePassword      FLUME_SSL_KEYSTORE_PASSWORD      Keystore password
+javax.net.ssl.keyStoreType          FLUME_SSL_KEYSTORE_TYPE          Keystore type (by default JKS)
+javax.net.ssl.trustStore            FLUME_SSL_TRUSTSTORE_PATH        Truststore location
+javax.net.ssl.trustStorePassword    FLUME_SSL_TRUSTSTORE_PASSWORD    Truststore password
+javax.net.ssl.trustStoreType        FLUME_SSL_TRUSTSTORE_TYPE        Truststore type (by default JKS)
+==================================  ===============================  ==================================
+
+The SSL system properties can either be passed on the command line or by setting the ``JAVA_OPTS``
+environment variable in *conf/flume-env.sh*. (Although, using the command line is inadvisable because
+the commands including the passwords will be saved to the command history.)
+
+.. code-block:: properties
+
+    export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks"
+    export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStorePassword=password"
+
+Flume uses the system properties defined in JSSE (Java Secure Socket Extension), so this is
+a standard way for setting up SSL. On the other hand, specifying passwords in system properties
+means that the passwords can be seen in the process list. For cases where it is not acceptable,
+it is also be possible to define the parameters in environment variables. Flume initializes
+the JSSE system properties from the corresponding environment variables internally in this case.
+
+The SSL environment variables can either be set in the shell environment before
+starting Flume or in *conf/flume-env.sh*. (Although, using the command line is inadvisable because
+the commands including the passwords will be saved to the command history.)
+
+.. code-block:: properties
+
+    export FLUME_SSL_KEYSTORE_PATH=/path/to/keystore.jks
+    export FLUME_SSL_KEYSTORE_PASSWORD=password
+
+**Please note:**
+
+* SSL must be enabled at component level. Specifying the global SSL parameters alone will not
+  have any effect.
+* If the global SSL parameters are specified at multiple levels, the priority is the
+  following (from higher to lower):
+
+  * component parameters in agent config
+  * system properties
+  * environment variables
+
+* If SSL is enabled for a component, but the SSL parameters are not specified in any of the ways
+  described above, then
+
+  * in case of keystores: configuration error
+  * in case of truststores: the default truststore will be used (``jssecacerts`` / ``cacerts`` in Oracle JDK)
+* The trustore password is optional in all cases. If not specified, then no integrity check will be
+  performed on the truststore when it is opened by the JDK.
+
+
 Flume Sources
 -------------
 
@@ -773,10 +869,19 @@ selector.*
 interceptors         --                Space-separated list of interceptors
 interceptors.*
 compression-type     none              This can be "none" or "deflate".  The compression-type must match the compression-type of matching AvroSource
-ssl                  false             Set this to true to enable SSL encryption. You must also specify a "keystore" and a "keystore-password".
-keystore             --                This is the path to a Java keystore file. Required for SSL.
-keystore-password    --                The password for the Java keystore. Required for SSL.
+ssl                  false             Set this to true to enable SSL encryption. If SSL is enabled,
+                                       you must also specify a "keystore" and a "keystore-password",
+                                       either through component level parameters (see below)
+                                       or as global SSL parameters (see `SSL/TLS support`_ section).
+keystore             --                This is the path to a Java keystore file.
+                                       If not specified here, then the global keystore will be used
+                                       (if defined, otherwise configuration error).
+keystore-password    --                The password for the Java keystore.
+                                       If not specified here, then the global keystore password will be used
+                                       (if defined, otherwise configuration error).
 keystore-type        JKS               The type of the Java keystore. This can be "JKS" or "PKCS12".
+                                       If not specified here, then the global keystore type will be used
+                                       (if defined, otherwise the default is JKS).
 exclude-protocols    SSLv3             Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
 ipFilter             false             Set this to true to enable ipFiltering for netty
 ipFilterRules        --                Define N netty ipFilter pattern rules with this config.
@@ -819,9 +924,9 @@ agent-principal and agent-keytab are the properties used by the
 Thrift source to authenticate to the kerberos KDC.
 Required properties are in **bold**.
 
-==================   ===========  ===================================================
+==================   ===========  ==================================================================
 Property Name        Default      Description
-==================   ===========  ===================================================
+==================   ===========  ==================================================================
 **channels**         --
 **type**             --           The component type name, needs to be ``thrift``
 **bind**             --           hostname or IP address to listen on
@@ -831,15 +936,24 @@ selector.type
 selector.*
 interceptors         --           Space separated list of interceptors
 interceptors.*
-ssl                  false        Set this to true to enable SSL encryption. You must also specify a "keystore" and a "keystore-password".
-keystore             --           This is the path to a Java keystore file. Required for SSL.
-keystore-password    --           The password for the Java keystore. Required for SSL.
+ssl                  false        Set this to true to enable SSL encryption. If SSL is enabled,
+                                  you must also specify a "keystore" and a "keystore-password",
+                                  either through component level parameters (see below)
+                                  or as global SSL parameters (see `SSL/TLS support`_ section)
+keystore             --           This is the path to a Java keystore file.
+                                  If not specified here, then the global keystore will be used
+                                  (if defined, otherwise configuration error).
+keystore-password    --           The password for the Java keystore.
+                                  If not specified here, then the global keystore password will be used
+                                  (if defined, otherwise configuration error).
 keystore-type        JKS          The type of the Java keystore. This can be "JKS" or "PKCS12".
+                                  If not specified here, then the global keystore type will be used
+                                  (if defined, otherwise the default is JKS).
 exclude-protocols    SSLv3        Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
 kerberos             false        Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab  are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC.
 agent-principal      --           The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC.
 agent-keytab         —-           The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC.
-==================   ===========  ===================================================
+==================   ===========  ==================================================================
 
 Example for agent named a1:
 
@@ -963,8 +1077,8 @@ durableSubscriptionName     --           Name used to identify the durable subsc
 =========================   ===========  ==============================================================
 
 
-Converter
-'''''''''
+JMS message converter
+'''''''''''''''''''''
 The JMS source allows pluggable converters, though it's likely the default converter will work
 for most purposes. The default converter is able to convert Bytes, Text, and Object messages
 to FlumeEvents. In all cases, the properties in the message are added as headers to the
@@ -998,8 +1112,8 @@ Example for agent named a1:
   a1.sources.r1.destinationType = QUEUE
 
 
-SSL/TLS support
-'''''''''''''''
+SSL and JMS Source
+''''''''''''''''''
 
 JMS client implementations typically support to configure SSL/TLS via some Java system properties defined by JSSE
 (Java Secure Socket Extension). Specifying these system properties for Flume's JVM, JMS Source (or more precisely the
@@ -1007,9 +1121,6 @@ JMS client implementation used by the JMS Source) can connect to the JMS server
 server has also been set up to use SSL).
 It should work with any JMS provider and has been tested with ActiveMQ, IBM MQ and Oracle WebLogic.
 
-The JSSE Java system properties can either be passed on the command line or by setting the ``JAVA_OPTS`` environment
-variable in *conf/flume-env.sh* (the examples below show the second approach).
-
 The following sections describe the SSL configuration steps needed on the Flume side only. You can find more detailed
 descriptions about the server side setup of the different JMS providers and also full working configuration examples on
 Flume Wiki.
@@ -1017,13 +1128,8 @@ Flume Wiki.
 **SSL transport / server authentication:**
 
 If the JMS server uses self-signed certificate or its certificate is signed by a non-trusted CA (eg. the company's own
-CA), then a truststore (containing the right certificate) needs to be set up and passed to Flume via the following JSSE
-Java system properties:
-
-.. code-block:: properties
-
-    export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.trustStore=/path/to/truststore.jks"
-    export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.trustStorePassword=password"
+CA), then a truststore (containing the right certificate) needs to be set up and passed to Flume. It can be done via
+the global SSL parameters. For more details about the global SSL setup, see the `SSL/TLS support`_ section.
 
 Some JMS providers require SSL specific JNDI Initial Context Factory and/or Provider URL settings when using SSL (eg.
 ActiveMQ uses ssl:// URL prefix instead of tcp://).
@@ -1035,13 +1141,8 @@ config file.
 JMS Source can authenticate to the JMS server through client certificate authentication instead of the usual
 user/password login (when SSL is used and the JMS server is configured to accept this kind of authentication).
 
-The keystore containing Flume's key used for the authentication needs to be configured via the following JSSE Java
-system properties (similarly to the truststore properties above):
-
-.. code-block:: properties
-
-    export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks"
-    export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStorePassword=password"
+The keystore containing Flume's key used for the authentication needs to be configured via the global SSL parameters
+again. For more details about the global SSL setup, see the `SSL/TLS support`_ section.
 
 The keystore should contain only one key (if multiple keys are present, then the first one will be used).
 The key password must be the same as the keystore password.
@@ -1049,6 +1150,13 @@ The key password must be the same as the keystore password.
 In case of client certificate authentication, it is not needed to specify the ``userName`` / ``passwordFile`` properties
 for the JMS Source in the Flume agent config file.
 
+**Please note:**
+
+There are no component level configuration parameters for JMS Source unlike in case of other components.
+No enable SSL flag either.
+SSL setup is controlled by JNDI/Provider URL settings (ultimately the JMS server settings) and by the presence / absence
+of the truststore / keystore.
+
 
 Spooling Directory Source
 ~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -1440,9 +1548,12 @@ Example configuration with server side authentication and data encryption.
     a1.sources.source1.kafka.topics = mytopic
     a1.sources.source1.kafka.consumer.group.id = flume-consumer
     a1.sources.source1.kafka.consumer.security.protocol = SSL
+    # optional, the global truststore can be used alternatively
     a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
     a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
 
+Specyfing the truststore is optional here, the global truststore can be used instead.
+For more details about the global SSL setup, see the `SSL/TLS support`_ section.
 
 Note: By default the property ``ssl.endpoint.identification.algorithm``
 is not defined, so hostname verification is not performed.
@@ -1458,13 +1569,15 @@ against one of the following two fields:
 #) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
 #) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
 
-If client side authentication is also required then additionally the following should be added to Flume agent configuration.
+If client side authentication is also required then additionally the following needs to be added to Flume agent
+configuration or the global SSL setup can be used (see `SSL/TLS support`_ section).
 Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either
 individually or by their signature chain. Common example is to sign each client certificate by a single Root CA
 which in turn is trusted by Kafka brokers.
 
 .. code-block:: properties
 
+    # optional, the global keystore can be used alternatively
     a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
     a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>
 
@@ -1511,6 +1624,7 @@ Example secure configuration using SASL_SSL:
     a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
     a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
     a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
+    # optional, the global truststore can be used alternatively
     a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
     a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>
 
@@ -1801,8 +1915,14 @@ interceptors          --                                            Space-separa
 interceptors.*
 enableSSL             false                                         Set the property true, to enable SSL. *HTTP Source does not support SSLv3.*
 excludeProtocols      SSLv3                                         Space-separated list of SSL/TLS protocols to exclude. SSLv3 is always excluded.
-keystore                                                            Location of the keystore includng keystore file name
-keystorePassword                                                    Keystore password
+keystore                                                            Location of the keystore including keystore file name.
+                                                                    If SSL is enabled but the keystore is not specified here,
+                                                                    then the global keystore will be used
+                                                                    (if defined, otherwise configuration error).
+keystorePassword                                                    Keystore password.
+                                                                    If SSL is enabled but the keystore password is not specified here,
+                                                                    then the global keystore password will be used
+                                                                    (if defined, otherwise configuration error).
 QueuedThreadPool.*                                                  Jetty specific settings to be set on org.eclipse.jetty.util.thread.QueuedThreadPool.
                                                                     N.B. QueuedThreadPool will only be used if at least one property of this class is set.
 HttpConfiguration.*                                                 Jetty specific settings to be set on org.eclipse.jetty.server.HttpConfiguration
@@ -2372,9 +2492,9 @@ compression-type             none
 compression-level            6                                                      The level of compression to compress event. 0 = no compression and 1-9 is compression.  The higher the number the more compression
 ssl                          false                                                  Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a "truststore", "truststore-password", "truststore-type", and specify whether to "trust-all-certs".
 trust-all-certs              false                                                  If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and "listen in" on the encrypted connection.
-truststore                   --                                                     The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source's SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will be used.
-truststore-password          --                                                     The password for the specified truststore.
-truststore-type              JKS                                                    The type of the Java truststore. This can be "JKS" or other supported Java truststore type.
+truststore                   --                                                     The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source's SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will be used.
+truststore-password          --                                                     The password for the truststore. If not specified, then the global keystore password will be used (if defined).
+truststore-type              JKS                                                    The type of the Java truststore. This can be "JKS" or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS).
 exclude-protocols            SSLv3                                                  Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
 maxIoWorkers                 2 * the number of available processors in the machine  The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.
 ==========================   =====================================================  ===========================================================================================
@@ -2417,9 +2537,9 @@ connect-timeout              20000    Amount of time (ms) to allow for the first
 request-timeout              20000    Amount of time (ms) to allow for requests after the first.
 connection-reset-interval    none     Amount of time (s) before the connection to the next hop is reset. This will force the Thrift Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.
 ssl                          false    Set to true to enable SSL for this ThriftSink. When configuring SSL, you can optionally set a "truststore", "truststore-password" and "truststore-type"
-truststore                   --       The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Thrift Source's SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will be used.
-truststore-password          --       The password for the specified truststore.
-truststore-type              JKS      The type of the Java truststore. This can be "JKS" or other supported Java truststore type.
+truststore                   --       The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Thrift Source's SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically "jssecacerts" or "cacerts" in the Oracle JRE) will be used.
+truststore-password          --       The password for the truststore. If not specified, then the global keystore password will be used (if defined).
+truststore-type              JKS      The type of the Java truststore. This can be "JKS" or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS).
 exclude-protocols            SSLv3    Space-separated list of SSL/TLS protocols to exclude
 kerberos                     false    Set to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab and server-principal are required for successful authentication and communication to a kerberos enabled Thrift Source.
 client-principal             —-       The kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC.
@@ -3002,9 +3122,12 @@ Example configuration with server side authentication and data encryption.
     a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
     a1.sinks.sink1.kafka.topic = mytopic
     a1.sinks.sink1.kafka.producer.security.protocol = SSL
+    # optional, the global truststore can be used alternatively
     a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
     a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>
 
+Specyfing the truststore is optional here, the global truststore can be used instead.
+For more details about the global SSL setup, see the `SSL/TLS support`_ section.
 
 Note: By default the property ``ssl.endpoint.identification.algorithm``
 is not defined, so hostname verification is not performed.
@@ -3020,13 +3143,15 @@ against one of the following two fields:
 #) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
 #) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
 
-If client side authentication is also required then additionally the following should be added to Flume agent configuration.
+If client side authentication is also required then additionally the following needs to be added to Flume agent
+configuration or the global SSL setup can be used (see `SSL/TLS support`_ section).
 Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either
 individually or by their signature chain. Common example is to sign each client certificate by a single Root CA
 which in turn is trusted by Kafka brokers.
 
 .. code-block:: properties
 
+    # optional, the global keystore can be used alternatively
     a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
     a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>
 
@@ -3072,6 +3197,7 @@ Example secure configuration using SASL_SSL:
     a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
     a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
     a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
+    # optional, the global truststore can be used alternatively
     a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
     a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>
 
@@ -3401,12 +3527,16 @@ Example configuration with server side authentication and data encryption.
     a1.channels.channel1.kafka.topic = channel1
     a1.channels.channel1.kafka.consumer.group.id = flume-consumer
     a1.channels.channel1.kafka.producer.security.protocol = SSL
+    # optional, the global truststore can be used alternatively
     a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
     a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
     a1.channels.channel1.kafka.consumer.security.protocol = SSL
+    # optional, the global truststore can be used alternatively
     a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
     a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
 
+Specyfing the truststore is optional here, the global truststore can be used instead.
+For more details about the global SSL setup, see the `SSL/TLS support`_ section.
 
 Note: By default the property ``ssl.endpoint.identification.algorithm``
 is not defined, so hostname verification is not performed.
@@ -3423,15 +3553,18 @@ against one of the following two fields:
 #) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
 #) Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6
 
-If client side authentication is also required then additionally the following should be added to Flume agent configuration.
+If client side authentication is also required then additionally the following needs to be added to Flume agent
+configuration or the global SSL setup can be used (see `SSL/TLS support`_ section).
 Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either
 individually or by their signature chain. Common example is to sign each client certificate by a single Root CA
 which in turn is trusted by Kafka brokers.
 
 .. code-block:: properties
 
+    # optional, the global keystore can be used alternatively
     a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
     a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore>
+    # optional, the global keystore can be used alternatively
     a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
     a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>
 
@@ -3482,11 +3615,13 @@ Example secure configuration using SASL_SSL:
     a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
     a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
     a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
+    # optional, the global truststore can be used alternatively
     a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
     a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
     a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
     a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
     a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
+    # optional, the global truststore can be used alternatively
     a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
     a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>
 

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
index 7111f60..406bb7d 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
@@ -41,6 +41,7 @@ import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.lifecycle.LifecycleSupervisor;
 import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
+import org.apache.flume.util.SSLUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -251,8 +252,7 @@ public class Application {
   public static void main(String[] args) {
 
     try {
-
-      boolean isZkConfigured = false;
+      SSLUtil.initGlobalSSLParameters();
 
       Options options = new Options();
 
@@ -294,10 +294,12 @@ public class Application {
       String agentName = commandLine.getOptionValue('n');
       boolean reload = !commandLine.hasOption("no-reload-conf");
 
+      boolean isZkConfigured = false;
       if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
         isZkConfigured = true;
       }
-      Application application = null;
+
+      Application application;
       if (isZkConfigured) {
         // get options
         String zkConnectionStr = commandLine.getOptionValue('z');

http://git-wip-us.apache.org/repos/asf/flume/blob/c5168c90/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index b61eb79..9bcdf51 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -65,6 +65,7 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
+import org.apache.flume.util.SSLUtil;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@@ -586,11 +587,13 @@ public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient {
     trustAllCerts = Boolean.parseBoolean(properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_TRUST_ALL_CERTS));
     truststore = properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_TRUSTSTORE);
+        RpcClientConfigurationConstants.CONFIG_TRUSTSTORE, SSLUtil.getGlobalTruststorePath());
     truststorePassword = properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD);
+        RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD,
+        SSLUtil.getGlobalTruststorePassword());
     truststoreType = properties.getProperty(
-        RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS");
+        RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE,
+        SSLUtil.getGlobalTruststoreType("JKS"));
     String excludeProtocolsStr = properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_EXCLUDE_PROTOCOLS);
     if (excludeProtocolsStr == null) {
@@ -716,12 +719,10 @@ public class NettyAvroRpcClient extends AbstractRpcClient implements RpcClient {
             KeyStore keystore = null;
 
             if (truststore != null) {
-              if (truststorePassword == null) {
-                throw new NullPointerException("truststore password is null");
-              }
               InputStream truststoreStream = new FileInputStream(truststore);
               keystore = KeyStore.getInstance(truststoreType);
-              keystore.load(truststoreStream, truststorePassword.toCharArray());
+              keystore.load(truststoreStream,
+                  truststorePassword != null ? truststorePassword.toCharArray() : null);
             }
 
             TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");