You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/18 23:18:05 UTC
[2/3] git commit: add client encryption support to sstableloader
patch by Sam Tunnicliffe; reviewed by Mikhail Stepura for CASSANDRA-6378
add client encryption support to sstableloader
patch by Sam Tunnicliffe; reviewed by Mikhail Stepura for CASSANDRA-6378
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1b2a1903
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1b2a1903
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1b2a1903
Branch: refs/heads/trunk
Commit: 1b2a190379141094a986495bd1386e720786c9b7
Parents: 21bb531
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Dec 18 16:17:13 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Dec 18 16:17:13 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/tools/BulkLoader.java | 130 ++++++++++++++++++-
2 files changed, 124 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b2a1903/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b876204..d6223be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.4
+ * add client encryption support to sstableloader (CASSANDRA-6378)
* Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
* Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
* Fix assertion failure in filterColdSSTables (CASSANDRA-6483)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b2a1903/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index c89bb83..15c8df8 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -24,7 +24,9 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
+import org.apache.cassandra.config.EncryptionOptions;
import org.apache.commons.cli.*;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -58,12 +60,21 @@ public class BulkLoader
private static final String USER_OPTION = "username";
private static final String PASSWD_OPTION = "password";
private static final String THROTTLE_MBITS = "throttle";
+ private static final String TRANSPORT_FACTORY = "transport-factory";
+ private static final String SSL_TRUSTSTORE = "truststore";
+ private static final String SSL_TRUSTSTORE_PW = "truststore-password";
+ private static final String SSL_KEYSTORE = "keystore";
+ private static final String SSL_KEYSTORE_PW = "keystore-password";
+ private static final String SSL_PROTOCOL = "ssl-protocol";
+ private static final String SSL_ALGORITHM = "ssl-alg";
+ private static final String SSL_STORE_TYPE = "store-type";
+ private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
public static void main(String args[])
{
LoaderOptions options = LoaderOptions.parseArgs(args);
OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
- SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd), handler);
+ SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd, options.transportFactory), handler);
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
StreamResultFuture future = loader.stream(options.ignores);
future.addEventListener(new ProgressIndicator());
@@ -175,14 +186,16 @@ public class BulkLoader
private final int rpcPort;
private final String user;
private final String passwd;
+ private final ITransportFactory transportFactory;
- public ExternalClient(Set<InetAddress> hosts, int port, String user, String passwd)
+ public ExternalClient(Set<InetAddress> hosts, int port, String user, String passwd, ITransportFactory transportFactory)
{
super();
this.hosts = hosts;
this.rpcPort = port;
this.user = user;
this.passwd = passwd;
+ this.transportFactory = transportFactory;
}
public void init(String keyspace)
@@ -194,7 +207,7 @@ public class BulkLoader
{
// Query endpoint to ranges map and schemas from thrift
InetAddress host = hostiter.next();
- Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd);
+ Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd, this.transportFactory);
setPartitioner(client.describe_partitioner());
Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
@@ -233,11 +246,9 @@ public class BulkLoader
return knownCfs.get(cfName);
}
- private static Cassandra.Client createThriftClient(String host, int port, String user, String passwd) throws Exception
+ private static Cassandra.Client createThriftClient(String host, int port, String user, String passwd, ITransportFactory transportFactory) throws Exception
{
- TSocket socket = new TSocket(host, port);
- TTransport trans = new TFramedTransport(socket);
- trans.open();
+ TTransport trans = transportFactory.openTransport(host, port);
TProtocol protocol = new TBinaryProtocol(trans);
Cassandra.Client client = new Cassandra.Client(protocol);
if (user != null && passwd != null)
@@ -263,6 +274,8 @@ public class BulkLoader
public String user;
public String passwd;
public int throttle = 0;
+ public ITransportFactory transportFactory = new TFramedTransportFactory();
+ public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
public final Set<InetAddress> hosts = new HashSet<InetAddress>();
public final Set<InetAddress> ignores = new HashSet<InetAddress>();
@@ -367,6 +380,55 @@ public class BulkLoader
}
}
+ if(cmd.hasOption(SSL_TRUSTSTORE))
+ {
+ opts.encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
+ }
+
+ if(cmd.hasOption(SSL_TRUSTSTORE_PW))
+ {
+ opts.encOptions.truststore_password = cmd.getOptionValue(SSL_TRUSTSTORE_PW);
+ }
+
+ if(cmd.hasOption(SSL_KEYSTORE))
+ {
+ opts.encOptions.keystore = cmd.getOptionValue(SSL_KEYSTORE);
+ // if a keystore was provided, lets assume we'll need to use it
+ opts.encOptions.require_client_auth = true;
+ }
+
+ if(cmd.hasOption(SSL_KEYSTORE_PW))
+ {
+ opts.encOptions.keystore_password = cmd.getOptionValue(SSL_KEYSTORE_PW);
+ }
+
+ if(cmd.hasOption(SSL_PROTOCOL))
+ {
+ opts.encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL);
+ }
+
+ if(cmd.hasOption(SSL_ALGORITHM))
+ {
+ opts.encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM);
+ }
+
+ if(cmd.hasOption(SSL_STORE_TYPE))
+ {
+ opts.encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE);
+ }
+
+ if(cmd.hasOption(SSL_CIPHER_SUITES))
+ {
+ opts.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
+ }
+
+ if (cmd.hasOption(TRANSPORT_FACTORY))
+ {
+ ITransportFactory transportFactory = getTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY));
+ configureTransportFactory(transportFactory, opts);
+ opts.transportFactory = transportFactory;
+ }
+
return opts;
}
catch (ParseException e)
@@ -376,6 +438,50 @@ public class BulkLoader
}
}
+ private static ITransportFactory getTransportFactory(String transportFactory)
+ {
+ try
+ {
+ Class<?> factory = Class.forName(transportFactory);
+ if (!ITransportFactory.class.isAssignableFrom(factory))
+ throw new IllegalArgumentException(String.format("transport factory '%s' " +
+ "not derived from ITransportFactory", transportFactory));
+ return (ITransportFactory) factory.newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException(String.format("Cannot create a transport factory '%s'.", transportFactory), e);
+ }
+ }
+
+ private static void configureTransportFactory(ITransportFactory transportFactory, LoaderOptions opts)
+ {
+ Map<String, String> options = new HashMap<>();
+ // If the supplied factory supports the same set of options as our SSL impl, set those
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE))
+ options.put(SSLTransportFactory.TRUSTSTORE, opts.encOptions.truststore);
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD))
+ options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, opts.encOptions.truststore_password);
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL))
+ options.put(SSLTransportFactory.PROTOCOL, opts.encOptions.protocol);
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES))
+ options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(opts.encOptions.cipher_suites));
+
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE)
+ && opts.encOptions.require_client_auth)
+ options.put(SSLTransportFactory.KEYSTORE, opts.encOptions.keystore);
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD)
+ && opts.encOptions.require_client_auth)
+ options.put(SSLTransportFactory.KEYSTORE_PASSWORD, opts.encOptions.keystore_password);
+
+ // Now check if any of the factory's supported options are set as system properties
+ for (String optionKey : transportFactory.supportedOptions())
+ if (System.getProperty(optionKey) != null)
+ options.put(optionKey, System.getProperty(optionKey));
+
+ transportFactory.setOptions(options);
+ }
+
private static void errorMsg(String msg, CmdLineOptions options)
{
System.err.println(msg);
@@ -395,6 +501,16 @@ public class BulkLoader
options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)");
options.addOption("u", USER_OPTION, "username", "username for cassandra authentication");
options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");
+ options.addOption("tf", TRANSPORT_FACTORY, "transport factory", "Fully-qualified ITransportFactory class name for creating a connection to cassandra");
+ // ssl connection-related options
+ options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore");
+ options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "SSL: password of the truststore");
+ options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "SSL: full path to keystore");
+ options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", "SSL: password of the keystore");
+ options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "SSL: connections protocol to use (default: TLS)");
+ options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "SSL: algorithm (default: SunX509)");
+ options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "SSL: type of store");
+ options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "SSL: comma-separated list of encryption suites to use");
return options;
}