You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/06/13 08:04:24 UTC

git commit: FLUME-997. Support secure transport mechanism.

Updated Branches:
  refs/heads/trunk c57ebd1d2 -> a964e7ab3


FLUME-997. Support secure transport mechanism.

(Joey Echeverria via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a964e7ab
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a964e7ab
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a964e7ab

Branch: refs/heads/trunk
Commit: a964e7ab3cfacbafb7e086d49ae2b94195b9c0df
Parents: c57ebd1
Author: Mike Percy <mp...@apache.org>
Authored: Wed Jun 12 23:03:21 2013 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Wed Jun 12 23:03:21 2013 -0700

----------------------------------------------------------------------
 .../org/apache/flume/source/AvroSource.java     |  98 ++++-
 .../org/apache/flume/sink/TestAvroSink.java     | 387 +++++++++++++++++++
 .../org/apache/flume/source/TestAvroSource.java | 120 +++++-
 flume-ng-core/src/test/resources/server.p12     | Bin 0 -> 1637 bytes
 flume-ng-core/src/test/resources/truststore.jks | Bin 0 -> 687 bytes
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   7 +
 .../apache/flume/api/NettyAvroRpcClient.java    | 127 +++++-
 .../api/RpcClientConfigurationConstants.java    |   9 +
 8 files changed, 727 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/a964e7ab/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index 517d545..edc2574 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -20,7 +20,10 @@
 package org.apache.flume.source;
 
 import com.google.common.base.Throwables;
+import java.io.FileInputStream;
 import java.net.InetSocketAddress;
+import java.security.KeyStore;
+import java.security.Security;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -28,6 +31,9 @@ import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 
 import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.Responder;
@@ -52,6 +58,7 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.handler.codec.compression.ZlibDecoder;
 import org.jboss.netty.handler.codec.compression.ZlibEncoder;
+import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,9 +128,16 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   private static final String PORT_KEY = "port";
   private static final String BIND_KEY = "bind";
   private static final String COMPRESSION_TYPE = "compression-type";
+  private static final String KEYSTORE_KEY = "keystore";
+  private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
+  private static final String KEYSTORE_TYPE_KEY = "keystore-type";
   private int port;
   private String bindAddress;
   private String compressionType;
+  private String keystore;
+  private String keystorePassword;
+  private String keystoreType;
+  private boolean enableSsl = false;
 
   private Server server;
   private SourceCounter sourceCounter;
@@ -146,6 +160,19 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
               context.getString(THREADS));
     }
 
+    keystore = context.getString(KEYSTORE_KEY);
+    keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY);
+    keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS");
+    if (keystore != null && keystorePassword != null) {
+      try {
+        KeyStore ks = KeyStore.getInstance(keystoreType);
+        ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());
+        enableSsl = true;
+      } catch (Exception ex) {
+        logger.warn("AVRO source configured with invalid keystore " + keystore, ex);
+      }
+    }
+
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
     }
@@ -196,8 +223,11 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
 
   private ChannelPipelineFactory initChannelPipelineFactory() {
     ChannelPipelineFactory pipelineFactory;
-    if (compressionType.equalsIgnoreCase("deflate")) {
-      pipelineFactory = new CompressionChannelPipelineFactory();
+    boolean enableCompression = compressionType.equalsIgnoreCase("deflate");
+    if (enableCompression || enableSsl) {
+      pipelineFactory = new SSLCompressionChannelPipelineFactory(
+          enableCompression, enableSsl, keystore,
+          keystorePassword, keystoreType);
     } else {
       pipelineFactory = new ChannelPipelineFactory() {
         @Override
@@ -313,15 +343,69 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     return Status.OK;
   }
 
-  private static class CompressionChannelPipelineFactory implements
-  ChannelPipelineFactory {
+  /**
+   * Factory of SSL-enabled server worker channel pipelines
+   * Copied from Avro's org.apache.avro.ipc.TestNettyServerWithSSL test
+   */
+  private static class SSLCompressionChannelPipelineFactory
+      implements ChannelPipelineFactory {
+
+    private boolean enableCompression;
+    private boolean enableSsl;
+    private String keystore;
+    private String keystorePassword;
+    private String keystoreType;
+
+    public SSLCompressionChannelPipelineFactory(boolean enableCompression, boolean enableSsl, String keystore, String keystorePassword, String keystoreType) {
+      this.enableCompression = enableCompression;
+      this.enableSsl = enableSsl;
+      this.keystore = keystore;
+      this.keystorePassword = keystorePassword;
+      this.keystoreType = keystoreType;
+    }
+
+    private SSLContext createServerSSLContext() {
+      try {
+        KeyStore ks = KeyStore.getInstance(keystoreType);
+        ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());
+
+        // Set up key manager factory to use our key store
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm());
+        kmf.init(ks, keystorePassword.toCharArray());
+
+        SSLContext serverContext = SSLContext.getInstance("TLS");
+        serverContext.init(kmf.getKeyManagers(), null, null);
+        return serverContext;
+      } catch (Exception e) {
+        throw new Error("Failed to initialize the server-side SSLContext", e);
+      }
+    }
+
+    private String getAlgorithm() {
+      String algorithm = Security.getProperty(
+          "ssl.KeyManagerFactory.algorithm");
+      if (algorithm == null) {
+        algorithm = "SunX509";
+      }
+      return algorithm;
+    }
 
     @Override
     public ChannelPipeline getPipeline() throws Exception {
       ChannelPipeline pipeline = Channels.pipeline();
-      ZlibEncoder encoder = new ZlibEncoder(6);
-      pipeline.addFirst("deflater", encoder);
-      pipeline.addFirst("inflater", new ZlibDecoder());
+      if (enableCompression) {
+        ZlibEncoder encoder = new ZlibEncoder(6);
+        pipeline.addFirst("deflater", encoder);
+        pipeline.addFirst("inflater", new ZlibDecoder());
+      }
+      if (enableSsl) {
+        SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
+        sslEngine.setUseClientMode(false);
+        // addFirst() will make SSL handling the first stage of decoding
+        // and the last stage of encoding this must be added after
+        // adding compression handling above
+        pipeline.addFirst("ssl", new SslHandler(sslEngine));
+      }
       return pipeline;
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/a964e7ab/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index ac47ee9..202b882 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -29,6 +29,13 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Charsets;
+import java.io.FileInputStream;
+import java.security.KeyStore;
+import java.security.Security;
+import java.util.concurrent.Executors;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.NettyTransceiver;
@@ -55,8 +62,14 @@ import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
 import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.ssl.SslHandler;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -325,6 +338,103 @@ public class TestAvroSink {
     server.close();
   }
 
+  @Test
+  public void testSslProcess() throws InterruptedException,
+      EventDeliveryException, InstantiationException, IllegalAccessException {
+    setUp();
+    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+    Server server = createSslServer(new MockAvroServer());
+
+    server.start();
+
+    Context context = new Context();
+
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("ssl", String.valueOf(true));
+    context.put("trust-all-certs", String.valueOf(true));
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
+
+    Configurables.configure(sink, context);
+
+    sink.start();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.START_OR_ERROR, 5000));
+
+    Transaction transaction = channel.getTransaction();
+
+    transaction.begin();
+    for (int i = 0; i < 10; i++) {
+      channel.put(event);
+    }
+    transaction.commit();
+    transaction.close();
+
+    for (int i = 0; i < 5; i++) {
+      Sink.Status status = sink.process();
+      Assert.assertEquals(Sink.Status.READY, status);
+    }
+
+    Assert.assertEquals(Sink.Status.BACKOFF, sink.process());
+
+    sink.stop();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.STOP_OR_ERROR, 5000));
+
+    server.close();
+  }
+
+  @Test
+  public void testSslProcessWithTrustStore() throws InterruptedException,
+      EventDeliveryException, InstantiationException, IllegalAccessException {
+    setUp();
+    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+    Server server = createSslServer(new MockAvroServer());
+
+    server.start();
+
+    Context context = new Context();
+
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("ssl", String.valueOf(true));
+    context.put("truststore", "src/test/resources/truststore.jks");
+    context.put("truststore-password", "password");
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
+
+    Configurables.configure(sink, context);
+
+    sink.start();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.START_OR_ERROR, 5000));
+
+    Transaction transaction = channel.getTransaction();
+
+    transaction.begin();
+    for (int i = 0; i < 10; i++) {
+      channel.put(event);
+    }
+    transaction.commit();
+    transaction.close();
+
+    for (int i = 0; i < 5; i++) {
+      Sink.Status status = sink.process();
+      Assert.assertEquals(Sink.Status.READY, status);
+    }
+
+    Assert.assertEquals(Sink.Status.BACKOFF, sink.process());
+
+    sink.stop();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.STOP_OR_ERROR, 5000));
+
+    server.close();
+  }
+
   private Server createServer(AvroSourceProtocol protocol)
       throws IllegalAccessException, InstantiationException {
 
@@ -336,6 +446,215 @@ public class TestAvroSink {
   }
 
   @Test
+  public void testSslWithCompression() throws InterruptedException,
+      EventDeliveryException, InstantiationException, IllegalAccessException {
+    setUp("deflate", 6);
+
+    boolean bound = false;
+
+    AvroSource source;
+    Channel sourceChannel;
+    int selectedPort;
+
+    source = new AvroSource();
+    sourceChannel = new MemoryChannel();
+
+    Configurables.configure(sourceChannel, new Context());
+
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(sourceChannel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    source.setChannelProcessor(new ChannelProcessor(rcs));
+
+    Context context = new Context();
+    context.put("port", port.toString());
+    context.put("bind", hostname);
+    context.put("threads", "50");
+    context.put("compression-type", "deflate");
+    context.put("keystore", "src/test/resources/server.p12");
+    context.put("keystore-password", "password");
+    context.put("keystore-type", "PKCS12");
+
+    Configurables.configure(source, context);
+
+    source.start();
+
+    Assert
+        .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+            source, LifecycleState.START_OR_ERROR));
+    Assert.assertEquals("Server is started", LifecycleState.START,
+        source.getLifecycleState());
+
+
+    Event event = EventBuilder.withBody("Hello avro",
+        Charset.forName("UTF8"));
+
+    context = new Context();
+
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("ssl", String.valueOf(true));
+    context.put("trust-all-certs", String.valueOf(true));
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
+    context.put("compression-type", "deflate");
+    context.put("compression-level", Integer.toString(6));
+
+    Configurables.configure(sink, context);
+    sink.start();
+
+    Transaction sickTransaction = channel.getTransaction();
+
+    sickTransaction.begin();
+    for (int i = 0; i < 10; i++) {
+      channel.put(event);
+    }
+    sickTransaction.commit();
+    sickTransaction.close();
+
+    for (int i = 0; i < 5; i++) {
+      Sink.Status status = sink.process();
+      logger.debug("Calling Process " + i + " times:" + status);
+      Assert.assertEquals(Sink.Status.READY, status);
+    }
+
+    sink.stop();
+
+    Transaction sourceTransaction = sourceChannel.getTransaction();
+    sourceTransaction.begin();
+
+    Event sourceEvent = sourceChannel.take();
+    Assert.assertNotNull(sourceEvent);
+    Assert.assertEquals("Channel contained our event", "Hello avro",
+        new String(sourceEvent.getBody()));
+    sourceTransaction.commit();
+    sourceTransaction.close();
+
+    logger.debug("Round trip event:{}", sourceEvent);
+
+    source.stop();
+    Assert.assertTrue("Reached stop or error",
+        LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+        source.getLifecycleState());
+  }
+
+  @Test
+  public void testSslSinkWithNonSslServer() throws InterruptedException,
+      EventDeliveryException, InstantiationException, IllegalAccessException {
+    setUp();
+    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+    Server server = createServer(new MockAvroServer());
+
+    server.start();
+
+    Context context = new Context();
+
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("ssl", String.valueOf(true));
+    context.put("trust-all-certs", String.valueOf(true));
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
+
+    Configurables.configure(sink, context);
+
+    sink.start();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.START_OR_ERROR, 5000));
+
+    Transaction transaction = channel.getTransaction();
+
+    transaction.begin();
+    for (int i = 0; i < 10; i++) {
+      channel.put(event);
+    }
+    transaction.commit();
+    transaction.close();
+
+    boolean failed = false;
+    try {
+      for (int i = 0; i < 5; i++) {
+        sink.process();
+        failed = true;
+      }
+    } catch (EventDeliveryException ex) {
+      logger.info("Correctly failed to send event", ex);
+    }
+
+
+    sink.stop();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.STOP_OR_ERROR, 5000));
+
+    server.close();
+
+    if (failed) {
+      Assert.fail("SSL-enabled sink successfully connected to a non-SSL-enabled server, that's wrong.");
+    }
+  }
+
+  @Test
+  public void testSslSinkWithNonTrustedCert() throws InterruptedException,
+      EventDeliveryException, InstantiationException, IllegalAccessException {
+    setUp();
+    Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+    Server server = createSslServer(new MockAvroServer());
+
+    server.start();
+
+    Context context = new Context();
+
+    context.put("hostname", hostname);
+    context.put("port", String.valueOf(port));
+    context.put("ssl", String.valueOf(true));
+    context.put("batch-size", String.valueOf(2));
+    context.put("connect-timeout", String.valueOf(2000L));
+    context.put("request-timeout", String.valueOf(3000L));
+
+    Configurables.configure(sink, context);
+
+    sink.start();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.START_OR_ERROR, 5000));
+
+    Transaction transaction = channel.getTransaction();
+
+    transaction.begin();
+    for (int i = 0; i < 10; i++) {
+      channel.put(event);
+    }
+    transaction.commit();
+    transaction.close();
+
+    boolean failed = false;
+    try {
+      for (int i = 0; i < 5; i++) {
+        sink.process();
+        failed = true;
+      }
+    } catch (EventDeliveryException ex) {
+      logger.info("Correctly failed to send event", ex);
+    }
+
+
+    sink.stop();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.STOP_OR_ERROR, 5000));
+
+    server.close();
+
+    if (failed) {
+      Assert.fail("SSL-enabled sink successfully connected to a server with an untrusted certificate when it should have failed");
+    }
+  }
+
+  @Test
   public void testRequestWithNoCompression() throws InterruptedException, IOException, EventDeliveryException {
 
     doRequest(false, false, 6);
@@ -505,4 +824,72 @@ public class TestAvroSink {
 
   }
 
+  private Server createSslServer(AvroSourceProtocol protocol)
+      throws IllegalAccessException, InstantiationException {
+    Server server = new NettyServer(new SpecificResponder(
+        AvroSourceProtocol.class, protocol), new InetSocketAddress(hostname, port),
+            new NioServerSocketChannelFactory(
+                    Executors.newCachedThreadPool(),
+                    Executors.newCachedThreadPool()),
+            new SSLChannelPipelineFactory(),
+            null);
+
+    return server;
+  }
+
+  /**
+   * Factory of SSL-enabled server worker channel pipelines
+   * Copied from Avro's org.apache.avro.ipc.TestNettyServerWithSSL test
+   */
+  private class SSLChannelPipelineFactory
+      implements ChannelPipelineFactory {
+
+    String keystore = "src/test/resources/server.p12";
+    String keystorePassword = "password";
+    String keystoreType = "PKCS12";
+
+    public SSLChannelPipelineFactory() {
+    }
+
+    public SSLChannelPipelineFactory(String keystore, String keystorePassword, String keystoreType) {
+      this.keystore = keystore;
+      this.keystorePassword = keystorePassword;
+      this.keystoreType = keystoreType;
+    }
+
+    private SSLContext createServerSSLContext() {
+      try {
+        KeyStore ks = KeyStore.getInstance(keystoreType);
+        ks.load(new FileInputStream(keystore), keystorePassword.toCharArray());
+
+        // Set up key manager factory to use our key store
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm());
+        kmf.init(ks, keystorePassword.toCharArray());
+
+        SSLContext serverContext = SSLContext.getInstance("TLS");
+        serverContext.init(kmf.getKeyManagers(), null, null);
+        return serverContext;
+      } catch (Exception e) {
+        throw new Error("Failed to initialize the server-side SSLContext", e);
+      }
+    }
+
+    private String getAlgorithm() {
+      String algorithm = Security.getProperty(
+          "ssl.KeyManagerFactory.algorithm");
+      if (algorithm == null) {
+        algorithm = "SunX509";
+      }
+      return algorithm;
+    }
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = Channels.pipeline();
+      SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
+      sslEngine.setUseClientMode(false);
+      pipeline.addLast("ssl", new SslHandler(sslEngine));
+      return pipeline;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/a964e7ab/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
index c699241..8fd7072 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
@@ -22,10 +22,16 @@ package org.apache.flume.source;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
 
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
@@ -49,6 +55,7 @@ import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.compression.ZlibDecoder;
 import org.jboss.netty.handler.codec.compression.ZlibEncoder;
+import org.jboss.netty.handler.ssl.SslHandler;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -230,7 +237,6 @@ public class TestAvroSource {
         source.getLifecycleState());
   }
 
-
   private static class CompressionChannelFactory extends
       NioClientSocketChannelFactory {
     private int compressionLevel;
@@ -254,4 +260,116 @@ public class TestAvroSource {
     }
   }
 
+  @Test
+  public void testSslRequest() throws InterruptedException, IOException {
+    boolean bound = false;
+
+    for (int i = 0; i < 10 && !bound; i++) {
+      try {
+        Context context = new Context();
+
+        context.put("port", String.valueOf(selectedPort = 41414 + i));
+        context.put("bind", "0.0.0.0");
+        context.put("threads", "50");
+        context.put("keystore", "src/test/resources/server.p12");
+        context.put("keystore-password", "password");
+        context.put("keystore-type", "PKCS12");
+
+        Configurables.configure(source, context);
+
+        source.start();
+        bound = true;
+      } catch (ChannelException e) {
+        /*
+         * NB: This assume we're using the Netty server under the hood and the
+         * failure is to bind. Yucky.
+         */
+        Thread.sleep(100);
+      }
+    }
+
+    Assert
+        .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
+            source, LifecycleState.START_OR_ERROR));
+    Assert.assertEquals("Server is started", LifecycleState.START,
+        source.getLifecycleState());
+
+    AvroSourceProtocol client = SpecificRequestor.getClient(
+        AvroSourceProtocol.class, new NettyTransceiver(new InetSocketAddress(
+        selectedPort), new SSLChannelFactory()));
+
+    AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+
+    avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
+    avroEvent.setBody(ByteBuffer.wrap("Hello avro ssl".getBytes()));
+
+    Status status = client.append(avroEvent);
+
+    Assert.assertEquals(Status.OK, status);
+
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+
+    Event event = channel.take();
+    Assert.assertNotNull(event);
+    Assert.assertEquals("Channel contained our event", "Hello avro ssl",
+        new String(event.getBody()));
+    transaction.commit();
+    transaction.close();
+
+    logger.debug("Round trip event:{}", event);
+
+    source.stop();
+    Assert.assertTrue("Reached stop or error",
+        LifecycleController.waitForOneOf(source, LifecycleState.STOP_OR_ERROR));
+    Assert.assertEquals("Server is stopped", LifecycleState.STOP,
+        source.getLifecycleState());
+  }
+
+  /**
+   * Factory of SSL-enabled client channels
+   * Copied from Avro's org.apache.avro.ipc.TestNettyServerWithSSL test
+   */
+  private static class SSLChannelFactory extends NioClientSocketChannelFactory {
+    public SSLChannelFactory() {
+      super(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+    }
+
+    @Override
+    public SocketChannel newChannel(ChannelPipeline pipeline) {
+      try {
+        SSLContext sslContext = SSLContext.getInstance("TLS");
+        sslContext.init(null, new TrustManager[]{new PermissiveTrustManager()},
+                        null);
+        SSLEngine sslEngine = sslContext.createSSLEngine();
+        sslEngine.setUseClientMode(true);
+        // addFirst() will make SSL handling the first stage of decoding
+        // and the last stage of encoding
+        pipeline.addFirst("ssl", new SslHandler(sslEngine));
+        return super.newChannel(pipeline);
+      } catch (Exception ex) {
+        throw new RuntimeException("Cannot create SSL channel", ex);
+      }
+    }
+  }
+
+  /**
+   * Bogus trust manager accepting any certificate
+   */
+  private static class PermissiveTrustManager implements X509TrustManager {
+    @Override
+    public void checkClientTrusted(X509Certificate[] certs, String s) {
+      // nothing
+    }
+
+    @Override
+    public void checkServerTrusted(X509Certificate[] certs, String s) {
+      // nothing
+    }
+
+    @Override
+    public X509Certificate[] getAcceptedIssuers() {
+      return new X509Certificate[0];
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/a964e7ab/flume-ng-core/src/test/resources/server.p12
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/resources/server.p12 b/flume-ng-core/src/test/resources/server.p12
new file mode 100644
index 0000000..07eef51
Binary files /dev/null and b/flume-ng-core/src/test/resources/server.p12 differ

http://git-wip-us.apache.org/repos/asf/flume/blob/a964e7ab/flume-ng-core/src/test/resources/truststore.jks
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/resources/truststore.jks b/flume-ng-core/src/test/resources/truststore.jks
new file mode 100644
index 0000000..8c08349
Binary files /dev/null and b/flume-ng-core/src/test/resources/truststore.jks differ

http://git-wip-us.apache.org/repos/asf/flume/blob/a964e7ab/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 1b4d216..ad29f33 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -701,6 +701,9 @@ selector.*
 interceptors         --           Space-separated list of interceptors
 interceptors.*
 compression-type     none         This can be "none" or "deflate".  The compression-type must match the compression-type of matching AvroSource
+keystore             --           The path to a Java keystore. If "keystore" and "keystore-password" are both set, then this AvroSource will us SSL.
+keystore-password    --           The password for the Java keystore.
+keystore-type        JKS          This can be "JKS" or "PKCS12". The type of the Java keystore.
 ==================   ===========  ===================================================
 
 Example for agent named a1:
@@ -1523,6 +1526,10 @@ request-timeout              20000    Amount of time (ms) to allow for requests
 reset-connection-interval    none     Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.
 compression-type             none     This can be "none" or "deflate".  The compression-type must match the compression-type of matching AvroSource
 compression-level            6	      The level of compression to compress event. 0 = no compression and 1-9 is compression.  The higher the number the more compression
+ssl                   false    Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a "truststore", "truststore-password", and "truststore-type".
+truststore            --       The path to a Java truststore file. If you enable SSL without configuring a truststore, the AvroSink will automatically use a permisive trust setting and accept any server certifacte used by the AvroSource it is connected to.
+truststore-password   --       The password for the truststore.
+truststore-type       JKS      This can be "JKS" or other supported Java truststore type. The type of the Java truststore.
 ==========================   =======  ==============================================
 
 Example for agent named a1:

http://git-wip-us.apache.org/repos/asf/flume/blob/a964e7ab/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index 99bd5ae..66be934 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -18,9 +18,13 @@
  */
 package org.apache.flume.api;
 
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -41,6 +45,11 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
 
 import org.apache.avro.ipc.CallFuture;
 import org.apache.avro.ipc.NettyTransceiver;
@@ -52,14 +61,15 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
-import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.compression.ZlibDecoder;
 import org.jboss.netty.handler.codec.compression.ZlibEncoder;
+import org.jboss.netty.handler.ssl.SslHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import sun.security.validator.KeyStores;
 
 /**
  * Avro/Netty implementation of {@link RpcClient}.
@@ -78,6 +88,11 @@ implements RpcClient {
   private ConnState connState;
 
   private InetSocketAddress address;
+  private boolean enableSsl;
+  private boolean trustAllCerts;
+  private String truststore;
+  private String truststorePassword;
+  private String truststoreType;
 
   private Transceiver transceiver;
   private AvroSourceProtocol.Callback avroClient;
@@ -114,12 +129,14 @@ implements RpcClient {
 
     try {
 
-      if (enableDeflateCompression) {
-        socketChannelFactory = new CompressionChannelFactory(
+      if (enableDeflateCompression || enableSsl) {
+        socketChannelFactory = new SSLCompressionChannelFactory(
             Executors.newCachedThreadPool(new TransceiverThreadFactory(
                 "Avro " + NettyTransceiver.class.getSimpleName() + " Boss")),
             Executors.newCachedThreadPool(new TransceiverThreadFactory(
-                "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")), compressionLevel);
+                "Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker")),
+            enableDeflateCompression, enableSsl, trustAllCerts, compressionLevel,
+            truststore, truststorePassword, truststoreType);
       } else {
         socketChannelFactory = new NioClientSocketChannelFactory(
             Executors.newCachedThreadPool(new TransceiverThreadFactory(
@@ -560,6 +577,16 @@ implements RpcClient {
       }
     }
 
+    enableSsl = Boolean.parseBoolean(properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_SSL));
+    trustAllCerts = Boolean.parseBoolean(properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_TRUST_ALL_CERTS));
+    truststore = properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_TRUSTSTORE);
+    truststorePassword = properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_PASSWORD);
+    truststoreType = properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_TRUSTSTORE_TYPE, "JKS");
 
     this.connect();
   }
@@ -596,27 +623,101 @@ implements RpcClient {
     }
   }
 
-  private static class CompressionChannelFactory extends
-      NioClientSocketChannelFactory {
-    private int compressionLevel;
+  /**
+   * Factory of SSL-enabled client channels
+   * Copied from Avro's org.apache.avro.ipc.TestNettyServerWithSSL test
+   */
+  private static class SSLCompressionChannelFactory extends NioClientSocketChannelFactory {
 
-    public CompressionChannelFactory(
-        Executor bossExecutor, Executor workerExecutor, int compressionLevel) {
+    private boolean enableCompression;
+    private int compressionLevel;
+    private boolean enableSsl;
+    private boolean trustAllCerts;
+    private String truststore;
+    private String truststorePassword;
+    private String truststoreType;
+
+    public SSLCompressionChannelFactory(Executor bossExecutor, Executor workerExecutor,
+        boolean enableCompression, boolean enableSsl, boolean trustAllCerts,
+        int compressionLevel, String truststore, String truststorePassword,
+        String truststoreType) {
       super(bossExecutor, workerExecutor);
+      this.enableCompression = enableCompression;
+      this.enableSsl = enableSsl;
       this.compressionLevel = compressionLevel;
+      this.trustAllCerts = trustAllCerts;
+      this.truststore = truststore;
+      this.truststorePassword = truststorePassword;
+      this.truststoreType = truststoreType;
     }
 
     @Override
     public SocketChannel newChannel(ChannelPipeline pipeline) {
+      TrustManager[] managers;
       try {
+        if (enableCompression) {
+          ZlibEncoder encoder = new ZlibEncoder(compressionLevel);
+          pipeline.addFirst("deflater", encoder);
+          pipeline.addFirst("inflater", new ZlibDecoder());
+        }
+        if (enableSsl) {
+          if (trustAllCerts) {
+            logger.warn("No truststore configured, setting TrustManager to accept"
+                + " all server certificates");
+            managers = new TrustManager[] { new PermissiveTrustManager() };
+          } else {
+            InputStream truststoreStream = null;
+            if (truststore == null) {
+              truststoreType = "JKS";
+              truststoreStream = getClass().getClassLoader().getResourceAsStream("cacerts");
+              truststorePassword = "changeit";
+            } else {
+              truststoreStream = new FileInputStream(truststore);
+            }
+            KeyStore keystore = KeyStore.getInstance(truststoreType);
+            keystore.load(truststoreStream, truststorePassword.toCharArray());
+
+            TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+            tmf.init(keystore);
+            managers = tmf.getTrustManagers();
+          }
+
+          SSLContext sslContext = SSLContext.getInstance("TLS");
+          sslContext.init(null, managers,
+                          null);
+          SSLEngine sslEngine = sslContext.createSSLEngine();
+          sslEngine.setUseClientMode(true);
+          // addFirst() will make SSL handling the first stage of decoding
+          // and the last stage of encoding this must be added after
+          // adding compression handling above
+          pipeline.addFirst("ssl", new SslHandler(sslEngine));
+        }
 
-        ZlibEncoder encoder = new ZlibEncoder(compressionLevel);
-        pipeline.addFirst("deflater", encoder);
-        pipeline.addFirst("inflater", new ZlibDecoder());
         return super.newChannel(pipeline);
       } catch (Exception ex) {
-        throw new RuntimeException("Cannot create Compression channel", ex);
+        logger.error("Cannot create SSL channel", ex);
+        throw new RuntimeException("Cannot create SSL channel", ex);
       }
     }
   }
+
+  /**
+   * Permissive trust manager accepting any certificate
+   */
+  private static class PermissiveTrustManager implements X509TrustManager {
+    @Override
+    public void checkClientTrusted(X509Certificate[] certs, String s) {
+      // nothing
+    }
+
+    @Override
+    public void checkServerTrusted(X509Certificate[] certs, String s) {
+      // nothing
+    }
+
+    @Override
+    public X509Certificate[] getAcceptedIssuers() {
+      return new X509Certificate[0];
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/a964e7ab/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index 34d73a3..7aa70cb 100644
--- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -135,6 +135,15 @@ public final class RpcClientConfigurationConstants {
   public static final int DEFAULT_COMPRESSION_LEVEL = 6;
 
 
+  /**
+   * Configuration constants for SSL support
+   */
+  public static final String CONFIG_SSL = "ssl";
+  public static final String CONFIG_TRUST_ALL_CERTS = "trust-all-certs";
+  public static final String CONFIG_TRUSTSTORE = "truststore";
+  public static final String CONFIG_TRUSTSTORE_PASSWORD = "truststore-password";
+  public static final String CONFIG_TRUSTSTORE_TYPE = "truststore-type";
+
   private RpcClientConfigurationConstants() {
     // disable explicit object creation
   }