You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/02 14:08:30 UTC
[1/2] git commit: Add SSL support to the binary protocol
Add SSL support to the binary protocol
patch by jasobrown; reviewed by slebresne for CASSANDRA-5031
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/119c726f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/119c726f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/119c726f
Branch: refs/heads/trunk
Commit: 119c726f490bcc94b4dcbcbe9d8f8f9cc1464edc
Parents: 8219195
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 2 14:06:49 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 2 14:06:49 2013 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/security/SSLFactory.java | 40 +++++----
.../org/apache/cassandra/transport/Client.java | 26 ++++-
.../org/apache/cassandra/transport/Server.java | 73 ++++++++++++--
.../apache/cassandra/transport/SimpleClient.java | 74 +++++++++++++--
5 files changed, 173 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/119c726f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ca4780a..cc9b145 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@
* Expose black-listed directories via JMX (CASSANDRA-4848)
* Log compaction merge counts (CASSANDRA-4894)
* Minimize byte array allocation by AbstractData{Input,Output} (CASSANDRA-5090)
+ * Add SSL support for the binary protocol (CASSANDRA-5031)
1.2.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/119c726f/src/java/org/apache/cassandra/security/SSLFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/SSLFactory.java b/src/java/org/apache/cassandra/security/SSLFactory.java
index 4d21842..5e64c43 100644
--- a/src/java/org/apache/cassandra/security/SSLFactory.java
+++ b/src/java/org/apache/cassandra/security/SSLFactory.java
@@ -29,6 +29,7 @@ import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.SSLSocket;
+import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import org.apache.cassandra.config.EncryptionOptions;
@@ -49,7 +50,7 @@ public final class SSLFactory
public static SSLServerSocket getServerSocket(EncryptionOptions options, InetAddress address, int port) throws IOException
{
- SSLContext ctx = createSSLContext(options);
+ SSLContext ctx = createSSLContext(options, true);
SSLServerSocket serverSocket = (SSLServerSocket)ctx.getServerSocketFactory().createServerSocket();
serverSocket.setReuseAddress(true);
String[] suits = filterCipherSuites(serverSocket.getSupportedCipherSuites(), options.cipher_suites);
@@ -61,7 +62,7 @@ public final class SSLFactory
/** Create a socket and connect */
public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port, InetAddress localAddress, int localPort) throws IOException
{
- SSLContext ctx = createSSLContext(options);
+ SSLContext ctx = createSSLContext(options, true);
SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port, localAddress, localPort);
String[] suits = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites);
socket.setEnabledCipherSuites(suits);
@@ -71,7 +72,7 @@ public final class SSLFactory
/** Create a socket and connect, using any local address */
public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port) throws IOException
{
- SSLContext ctx = createSSLContext(options);
+ SSLContext ctx = createSSLContext(options, true);
SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port);
String[] suits = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites);
socket.setEnabledCipherSuites(suits);
@@ -81,35 +82,40 @@ public final class SSLFactory
/** Just create a socket */
public static SSLSocket getSocket(EncryptionOptions options) throws IOException
{
- SSLContext ctx = createSSLContext(options);
+ SSLContext ctx = createSSLContext(options, true);
SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket();
String[] suits = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites);
socket.setEnabledCipherSuites(suits);
return socket;
}
- private static SSLContext createSSLContext(EncryptionOptions options) throws IOException
+ public static SSLContext createSSLContext(EncryptionOptions options, boolean buildTruststore) throws IOException
{
- FileInputStream tsf = new FileInputStream(options.truststore);
- FileInputStream ksf = new FileInputStream(options.keystore);
+ FileInputStream tsf = null;
+ FileInputStream ksf = null;
SSLContext ctx;
try
{
ctx = SSLContext.getInstance(options.protocol);
- TrustManagerFactory tmf;
- KeyManagerFactory kmf;
-
- tmf = TrustManagerFactory.getInstance(options.algorithm);
- KeyStore ts = KeyStore.getInstance(options.store_type);
- ts.load(tsf, options.truststore_password.toCharArray());
- tmf.init(ts);
-
- kmf = KeyManagerFactory.getInstance(options.algorithm);
+ TrustManager[] trustManagers = null;
+
+ if(buildTruststore)
+ {
+ tsf = new FileInputStream(options.truststore);
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(options.algorithm);
+ KeyStore ts = KeyStore.getInstance(options.store_type);
+ ts.load(tsf, options.truststore_password.toCharArray());
+ tmf.init(ts);
+ trustManagers = tmf.getTrustManagers();
+ }
+
+ ksf = new FileInputStream(options.keystore);
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(options.algorithm);
KeyStore ks = KeyStore.getInstance(options.store_type);
ks.load(ksf, options.keystore_password.toCharArray());
kmf.init(ks, options.keystore_password.toCharArray());
- ctx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+ ctx.init(kmf.getKeyManagers(), trustManagers, null);
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/119c726f/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 46a3297..cde8be5 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -21,20 +21,33 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import com.google.common.base.Splitter;
-import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.transport.messages.*;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.transport.messages.CredentialsMessage;
+import org.apache.cassandra.transport.messages.ExecuteMessage;
+import org.apache.cassandra.transport.messages.OptionsMessage;
+import org.apache.cassandra.transport.messages.PrepareMessage;
+import org.apache.cassandra.transport.messages.QueryMessage;
+import org.apache.cassandra.transport.messages.RegisterMessage;
+import org.apache.cassandra.transport.messages.StartupMessage;
import org.apache.cassandra.utils.Hex;
+import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
public class Client extends SimpleClient
{
- public Client(String host, int port)
+ public Client(String host, int port, ClientEncryptionOptions encryptionOptions)
{
- super(host, port);
+ super(host, port, encryptionOptions);
}
public void run() throws IOException
@@ -181,9 +194,10 @@ public class Client extends SimpleClient
String host = args[0];
int port = Integer.parseInt(args[1]);
+ ClientEncryptionOptions encryptionOptions = new ClientEncryptionOptions();
System.out.println("CQL binary protocol console " + host + "@" + port);
- new Client(host, port).run();
+ new Client(host, port, encryptionOptions).run();
System.exit(0);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/119c726f/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index c3fabfb..0b43a4a 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -17,30 +17,41 @@
*/
package org.apache.cassandra.transport;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.EnumMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.execution.ExecutionHandler;
-import org.jboss.netty.logging.InternalLoggerFactory;
-import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.service.IEndpointLifecycleSubscriber;
import org.apache.cassandra.service.IMigrationListener;
import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.service.IEndpointLifecycleSubscriber;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.messages.EventMessage;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Slf4JLoggerFactory;
public class Server implements CassandraDaemon.Server
{
@@ -109,7 +120,16 @@ public class Server implements CassandraDaemon.Server
bootstrap.setOption("child.tcpNoDelay", true);
// Set up the event pipeline factory.
- bootstrap.setPipelineFactory(new PipelineFactory(this));
+ final EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
+ if (clientEnc.enabled)
+ {
+ logger.info("enabling encrypted CQL connections between client and server");
+ bootstrap.setPipelineFactory(new SecurePipelineFactory(this, clientEnc));
+ }
+ else
+ {
+ bootstrap.setPipelineFactory(new PipelineFactory(this));
+ }
// Bind and start to accept incoming connections.
logger.info("Starting listening for CQL clients on " + socket + "...");
@@ -205,6 +225,39 @@ public class Server implements CassandraDaemon.Server
}
}
+ private static class SecurePipelineFactory extends PipelineFactory
+ {
+ private final SSLContext sslContext;
+ private final EncryptionOptions encryptionOptions;
+
+ public SecurePipelineFactory(Server server, EncryptionOptions encryptionOptions)
+ {
+ super(server);
+ this.encryptionOptions = encryptionOptions;
+ try
+ {
+ this.sslContext = SSLFactory.createSSLContext(encryptionOptions, false);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Failed to setup secure pipeline", e);
+ }
+ }
+
+ public ChannelPipeline getPipeline() throws Exception
+ {
+ SSLEngine sslEngine = sslContext.createSSLEngine();
+ sslEngine.setUseClientMode(false);
+ sslEngine.setEnabledCipherSuites(encryptionOptions.cipher_suites);
+
+ SslHandler sslHandler = new SslHandler(sslEngine);
+ sslHandler.setIssueHandshake(true);
+ ChannelPipeline pipeline = super.getPipeline();
+ pipeline.addFirst("ssl", sslHandler);
+ return pipeline;
+ }
+ }
+
private static class EventNotifier implements IEndpointLifecycleSubscriber, IMigrationListener
{
private final Server server;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/119c726f/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 580a3a0..f2963bd 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -20,24 +20,44 @@ package org.apache.cassandra.transport;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.util.concurrent.*;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.transport.messages.CredentialsMessage;
+import org.apache.cassandra.transport.messages.ErrorMessage;
+import org.apache.cassandra.transport.messages.ExecuteMessage;
+import org.apache.cassandra.transport.messages.PrepareMessage;
+import org.apache.cassandra.transport.messages.QueryMessage;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.transport.messages.StartupMessage;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.logging.LoggingHandler;
+import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
-
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.transport.messages.*;
+import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
public class SimpleClient
{
@@ -46,8 +66,10 @@ public class SimpleClient
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
}
+ private static final Logger logger = LoggerFactory.getLogger(SimpleClient.class);
public final String host;
public final int port;
+ private final ClientEncryptionOptions encryptionOptions;
protected final ResponseHandler responseHandler = new ResponseHandler();
protected final Connection.Tracker tracker = new ConnectionTracker();
@@ -64,10 +86,11 @@ public class SimpleClient
}
};
- public SimpleClient(String host, int port)
+ public SimpleClient(String host, int port, ClientEncryptionOptions encryptionOptions)
{
this.host = host;
this.port = port;
+ this.encryptionOptions = encryptionOptions;
}
public void connect(boolean useCompression) throws IOException
@@ -95,7 +118,14 @@ public class SimpleClient
bootstrap.setOption("tcpNoDelay", true);
// Configure the pipeline factory.
- bootstrap.setPipelineFactory(new PipelineFactory());
+ if(encryptionOptions.enabled)
+ {
+ bootstrap.setPipelineFactory(new SecurePipelineFactory());
+ }
+ else
+ {
+ bootstrap.setPipelineFactory(new PipelineFactory());
+ }
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection attempt succeeds or fails.
@@ -202,6 +232,27 @@ public class SimpleClient
}
}
+ private class SecurePipelineFactory extends PipelineFactory
+ {
+ private final SSLContext sslContext;
+
+ public SecurePipelineFactory() throws IOException
+ {
+ this.sslContext = SSLFactory.createSSLContext(encryptionOptions, true);
+ }
+
+ public ChannelPipeline getPipeline() throws Exception
+ {
+ SSLEngine sslEngine = sslContext.createSSLEngine();
+ sslEngine.setUseClientMode(true);
+ sslEngine.setEnabledCipherSuites(encryptionOptions.cipher_suites);
+ ChannelPipeline pipeline = super.getPipeline();
+
+ pipeline.addFirst("ssl", new SslHandler(sslEngine));
+ return pipeline;
+ }
+ }
+
private static class ResponseHandler extends SimpleChannelUpstreamHandler
{
public final BlockingQueue<Message.Response> responses = new SynchronousQueue<Message.Response>(true);
@@ -219,5 +270,12 @@ public class SimpleClient
throw new RuntimeException(ie);
}
}
+
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
+ {
+ if (this == ctx.getPipeline().getLast())
+ logger.error("Exception in response", e.getCause());
+ ctx.sendUpstream(e);
+ }
}
}