You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/02 14:08:30 UTC

[1/2] git commit: Add SSL support to the binary protocol

Add SSL support to the binary protocol

patch by jasobrown; reviewed by slebresne for CASSANDRA-5031


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/119c726f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/119c726f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/119c726f

Branch: refs/heads/trunk
Commit: 119c726f490bcc94b4dcbcbe9d8f8f9cc1464edc
Parents: 8219195
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 2 14:06:49 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 2 14:06:49 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/security/SSLFactory.java  |   40 +++++----
 .../org/apache/cassandra/transport/Client.java     |   26 ++++-
 .../org/apache/cassandra/transport/Server.java     |   73 ++++++++++++--
 .../apache/cassandra/transport/SimpleClient.java   |   74 +++++++++++++--
 5 files changed, 173 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/119c726f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ca4780a..cc9b145 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@
  * Expose black-listed directories via JMX (CASSANDRA-4848)
  * Log compaction merge counts (CASSANDRA-4894)
  * Minimize byte array allocation by AbstractData{Input,Output} (CASSANDRA-5090)
+ * Add SSL support for the binary protocol (CASSANDRA-5031)
 
 
 1.2.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/119c726f/src/java/org/apache/cassandra/security/SSLFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/SSLFactory.java b/src/java/org/apache/cassandra/security/SSLFactory.java
index 4d21842..5e64c43 100644
--- a/src/java/org/apache/cassandra/security/SSLFactory.java
+++ b/src/java/org/apache/cassandra/security/SSLFactory.java
@@ -29,6 +29,7 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLServerSocket;
 import javax.net.ssl.SSLSocket;
+import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 
 import org.apache.cassandra.config.EncryptionOptions;
@@ -49,7 +50,7 @@ public final class SSLFactory
 
     public static SSLServerSocket getServerSocket(EncryptionOptions options, InetAddress address, int port) throws IOException
     {
-        SSLContext ctx = createSSLContext(options);
+        SSLContext ctx = createSSLContext(options, true);
         SSLServerSocket serverSocket = (SSLServerSocket)ctx.getServerSocketFactory().createServerSocket();
         serverSocket.setReuseAddress(true);
         String[] suits = filterCipherSuites(serverSocket.getSupportedCipherSuites(), options.cipher_suites);
@@ -61,7 +62,7 @@ public final class SSLFactory
     /** Create a socket and connect */
     public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port, InetAddress localAddress, int localPort) throws IOException
     {
-        SSLContext ctx = createSSLContext(options);
+        SSLContext ctx = createSSLContext(options, true);
         SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port, localAddress, localPort);
         String[] suits = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites);
         socket.setEnabledCipherSuites(suits);
@@ -71,7 +72,7 @@ public final class SSLFactory
     /** Create a socket and connect, using any local address */
     public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port) throws IOException
     {
-        SSLContext ctx = createSSLContext(options);
+        SSLContext ctx = createSSLContext(options, true);
         SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port);
         String[] suits = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites);
         socket.setEnabledCipherSuites(suits);
@@ -81,35 +82,40 @@ public final class SSLFactory
     /** Just create a socket */
     public static SSLSocket getSocket(EncryptionOptions options) throws IOException
     {
-        SSLContext ctx = createSSLContext(options);
+        SSLContext ctx = createSSLContext(options, true);
         SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket();
         String[] suits = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites);
         socket.setEnabledCipherSuites(suits);
         return socket;
     }
 
-    private static SSLContext createSSLContext(EncryptionOptions options) throws IOException
+    public static SSLContext createSSLContext(EncryptionOptions options, boolean buildTruststore) throws IOException
     {
-        FileInputStream tsf = new FileInputStream(options.truststore);
-        FileInputStream ksf = new FileInputStream(options.keystore);
+        FileInputStream tsf = null;
+        FileInputStream ksf = null;
         SSLContext ctx;
         try
         {
             ctx = SSLContext.getInstance(options.protocol);
-            TrustManagerFactory tmf;
-            KeyManagerFactory kmf;
-
-            tmf = TrustManagerFactory.getInstance(options.algorithm);
-            KeyStore ts = KeyStore.getInstance(options.store_type);
-            ts.load(tsf, options.truststore_password.toCharArray());
-            tmf.init(ts);
-
-            kmf = KeyManagerFactory.getInstance(options.algorithm);
+            TrustManager[] trustManagers = null;
+
+            if(buildTruststore)
+            {
+                tsf = new FileInputStream(options.truststore);
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(options.algorithm);
+                KeyStore ts = KeyStore.getInstance(options.store_type);
+                ts.load(tsf, options.truststore_password.toCharArray());
+                tmf.init(ts);
+                trustManagers = tmf.getTrustManagers();
+            }
+
+            ksf = new FileInputStream(options.keystore);
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance(options.algorithm);
             KeyStore ks = KeyStore.getInstance(options.store_type);
             ks.load(ksf, options.keystore_password.toCharArray());
             kmf.init(ks, options.keystore_password.toCharArray());
 
-            ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+            ctx.init(kmf.getKeyManagers(), trustManagers, null);
 
         }
         catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/119c726f/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 46a3297..cde8be5 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -21,20 +21,33 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import com.google.common.base.Splitter;
 
-import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.transport.messages.*;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.transport.messages.CredentialsMessage;
+import org.apache.cassandra.transport.messages.ExecuteMessage;
+import org.apache.cassandra.transport.messages.OptionsMessage;
+import org.apache.cassandra.transport.messages.PrepareMessage;
+import org.apache.cassandra.transport.messages.QueryMessage;
+import org.apache.cassandra.transport.messages.RegisterMessage;
+import org.apache.cassandra.transport.messages.StartupMessage;
 import org.apache.cassandra.utils.Hex;
+import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 
 public class Client extends SimpleClient
 {
-    public Client(String host, int port)
+    public Client(String host, int port, ClientEncryptionOptions encryptionOptions)
     {
-        super(host, port);
+        super(host, port, encryptionOptions);
     }
 
     public void run() throws IOException
@@ -181,9 +194,10 @@ public class Client extends SimpleClient
         String host = args[0];
         int port = Integer.parseInt(args[1]);
 
+        ClientEncryptionOptions encryptionOptions = new ClientEncryptionOptions();
         System.out.println("CQL binary protocol console " + host + "@" + port);
 
-        new Client(host, port).run();
+        new Client(host, port, encryptionOptions).run();
         System.exit(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/119c726f/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index c3fabfb..0b43a4a 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -17,30 +17,41 @@
  */
 package org.apache.cassandra.transport;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.EnumMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.execution.ExecutionHandler;
-import org.jboss.netty.logging.InternalLoggerFactory;
-import org.jboss.netty.logging.Slf4JLoggerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.service.IEndpointLifecycleSubscriber;
 import org.apache.cassandra.service.IMigrationListener;
 import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.service.IEndpointLifecycleSubscriber;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.transport.messages.EventMessage;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Slf4JLoggerFactory;
 
 public class Server implements CassandraDaemon.Server
 {
@@ -109,7 +120,16 @@ public class Server implements CassandraDaemon.Server
         bootstrap.setOption("child.tcpNoDelay", true);
 
         // Set up the event pipeline factory.
-        bootstrap.setPipelineFactory(new PipelineFactory(this));
+        final EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
+        if (clientEnc.enabled)
+        {
+            logger.info("enabling encrypted CQL connections between client and server");
+            bootstrap.setPipelineFactory(new SecurePipelineFactory(this, clientEnc));
+        }
+        else
+        {
+            bootstrap.setPipelineFactory(new PipelineFactory(this));
+        }
 
         // Bind and start to accept incoming connections.
         logger.info("Starting listening for CQL clients on " + socket + "...");
@@ -205,6 +225,39 @@ public class Server implements CassandraDaemon.Server
       }
     }
 
+    private static class SecurePipelineFactory extends PipelineFactory
+    {
+        private final SSLContext sslContext;
+        private final EncryptionOptions encryptionOptions;
+
+        public SecurePipelineFactory(Server server, EncryptionOptions encryptionOptions)
+        {
+            super(server);
+            this.encryptionOptions = encryptionOptions;
+            try
+            {
+                this.sslContext = SSLFactory.createSSLContext(encryptionOptions, false);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException("Failed to setup secure pipeline", e);
+            }
+        }
+
+        public ChannelPipeline getPipeline() throws Exception
+        {
+            SSLEngine sslEngine = sslContext.createSSLEngine();
+            sslEngine.setUseClientMode(false);
+            sslEngine.setEnabledCipherSuites(encryptionOptions.cipher_suites);
+
+            SslHandler sslHandler = new SslHandler(sslEngine);
+            sslHandler.setIssueHandshake(true);
+            ChannelPipeline pipeline = super.getPipeline();
+            pipeline.addFirst("ssl", sslHandler);
+            return pipeline;
+        }
+    }
+
     private static class EventNotifier implements IEndpointLifecycleSubscriber, IMigrationListener
     {
         private final Server server;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/119c726f/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 580a3a0..f2963bd 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -20,24 +20,44 @@ package org.apache.cassandra.transport;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.concurrent.*;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.transport.messages.CredentialsMessage;
+import org.apache.cassandra.transport.messages.ErrorMessage;
+import org.apache.cassandra.transport.messages.ExecuteMessage;
+import org.apache.cassandra.transport.messages.PrepareMessage;
+import org.apache.cassandra.transport.messages.QueryMessage;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.transport.messages.StartupMessage;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.logging.LoggingHandler;
+import org.jboss.netty.handler.ssl.SslHandler;
 import org.jboss.netty.logging.InternalLoggerFactory;
 import org.jboss.netty.logging.Slf4JLoggerFactory;
-
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.transport.messages.*;
+import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 
 public class SimpleClient
 {
@@ -46,8 +66,10 @@ public class SimpleClient
         InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
     }
 
+    private static final Logger logger = LoggerFactory.getLogger(SimpleClient.class);
     public final String host;
     public final int port;
+    private final ClientEncryptionOptions encryptionOptions;
 
     protected final ResponseHandler responseHandler = new ResponseHandler();
     protected final Connection.Tracker tracker = new ConnectionTracker();
@@ -64,10 +86,11 @@ public class SimpleClient
         }
     };
 
-    public SimpleClient(String host, int port)
+    public SimpleClient(String host, int port, ClientEncryptionOptions encryptionOptions)
     {
         this.host = host;
         this.port = port;
+        this.encryptionOptions = encryptionOptions;
     }
 
     public void connect(boolean useCompression) throws IOException
@@ -95,7 +118,14 @@ public class SimpleClient
         bootstrap.setOption("tcpNoDelay", true);
 
         // Configure the pipeline factory.
-        bootstrap.setPipelineFactory(new PipelineFactory());
+        if(encryptionOptions.enabled)
+        {
+            bootstrap.setPipelineFactory(new SecurePipelineFactory());
+        }
+        else
+        {
+            bootstrap.setPipelineFactory(new PipelineFactory());
+        }
         ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
 
         // Wait until the connection attempt succeeds or fails.
@@ -202,6 +232,27 @@ public class SimpleClient
         }
     }
 
+    private class SecurePipelineFactory extends PipelineFactory
+    {
+        private final SSLContext sslContext;
+
+        public SecurePipelineFactory() throws IOException
+        {
+            this.sslContext = SSLFactory.createSSLContext(encryptionOptions, true);
+        }
+
+        public ChannelPipeline getPipeline() throws Exception
+        {
+            SSLEngine sslEngine = sslContext.createSSLEngine();
+            sslEngine.setUseClientMode(true);
+            sslEngine.setEnabledCipherSuites(encryptionOptions.cipher_suites);
+            ChannelPipeline pipeline = super.getPipeline();
+
+            pipeline.addFirst("ssl", new SslHandler(sslEngine));
+            return pipeline;
+        }
+    }
+
     private static class ResponseHandler extends SimpleChannelUpstreamHandler
     {
         public final BlockingQueue<Message.Response> responses = new SynchronousQueue<Message.Response>(true);
@@ -219,5 +270,12 @@ public class SimpleClient
                 throw new RuntimeException(ie);
             }
         }
+
+        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
+        {
+            if (this == ctx.getPipeline().getLast())
+                logger.error("Exception in response", e.getCause());
+            ctx.sendUpstream(e);
+        }
     }
 }