You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/01/31 07:39:51 UTC
git commit: Switch stress to use ITransportFactory
Updated Branches:
refs/heads/cassandra-2.0 36af40925 -> e7b23cc74
Switch stress to use ITransportFactory
patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-6641
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7b23cc7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7b23cc7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7b23cc7
Branch: refs/heads/cassandra-2.0
Commit: e7b23cc741622ccf8b1487d8622cca47dcb9cc34
Parents: 36af409
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Jan 31 01:38:54 2014 -0500
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Jan 31 01:38:54 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/cli/CliOptions.java | 45 +++++++++++--
.../org/apache/cassandra/stress/Session.java | 69 +++++++++++++-------
3 files changed, 88 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b23cc7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 56a72ef..94b21d4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
* Add AbstractCompactionStrategy.startup() method (CASSANDRA-6637)
* SSTableScanner may skip rows during cleanup (CASSANDRA-6638)
* sstables from stalled repair sessions can resurrect deleted data (CASSANDRA-6503)
+ * Switch stress to use ITransportFactory (CASSANDRA-6641)
Merged from 1.2:
* fsync compression metadata (CASSANDRA-6531)
* Validate CF existence on execution for prepared statement (CASSANDRA-6535)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b23cc7/src/java/org/apache/cassandra/cli/CliOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliOptions.java b/src/java/org/apache/cassandra/cli/CliOptions.java
index 68f17c9..7894bf9 100644
--- a/src/java/org/apache/cassandra/cli/CliOptions.java
+++ b/src/java/org/apache/cassandra/cli/CliOptions.java
@@ -17,9 +17,15 @@
*/
package org.apache.cassandra.cli;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
import org.apache.commons.cli.*;
+import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.thrift.ITransportFactory;
+import org.apache.cassandra.thrift.SSLTransportFactory;
/**
*
@@ -114,11 +120,6 @@ public class CliOptions
css.hostName = DEFAULT_HOST;
}
- if (cmd.hasOption(TRANSPORT_FACTORY))
- {
- css.transportFactory = validateAndSetTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY));
- }
-
if (cmd.hasOption(DEBUG_OPTION))
{
css.debug = true;
@@ -217,6 +218,12 @@ public class CliOptions
css.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
}
+ if (cmd.hasOption(TRANSPORT_FACTORY))
+ {
+ css.transportFactory = validateAndSetTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY));
+ configureTransportFactory(css.transportFactory, css.encOptions);
+ }
+
// Abort if there are any unrecognized arguments left
if (cmd.getArgs().length > 0)
{
@@ -281,4 +288,32 @@ public class CliOptions
throw new IllegalArgumentException(String.format("Cannot create a transport factory '%s'.", transportFactory), e);
}
}
+
+ private static void configureTransportFactory(ITransportFactory transportFactory, EncryptionOptions encOptions)
+ {
+ Map<String, String> options = new HashMap<>();
+ // If the supplied factory supports the same set of options as our SSL impl, set those
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE))
+ options.put(SSLTransportFactory.TRUSTSTORE, encOptions.truststore);
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD))
+ options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, encOptions.truststore_password);
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL))
+ options.put(SSLTransportFactory.PROTOCOL, encOptions.protocol);
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES))
+ options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(encOptions.cipher_suites));
+
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE)
+ && encOptions.require_client_auth)
+ options.put(SSLTransportFactory.KEYSTORE, encOptions.keystore);
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD)
+ && encOptions.require_client_auth)
+ options.put(SSLTransportFactory.KEYSTORE_PASSWORD, encOptions.keystore_password);
+
+ // Now check if any of the factory's supported options are set as system properties
+ for (String optionKey : transportFactory.supportedOptions())
+ if (System.getProperty(optionKey) != null)
+ options.put(optionKey, System.getProperty(optionKey));
+
+ transportFactory.setOptions(options);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b23cc7/tools/stress/src/org/apache/cassandra/stress/Session.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java
index 242fa14..9ac865d 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Session.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Session.java
@@ -24,29 +24,25 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.commons.lang3.StringUtils;
import com.yammer.metrics.Metrics;
import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.cli.transport.FramedTransportFactory;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.commons.cli.*;
-
-import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.transport.SimpleClient;
import org.apache.cassandra.thrift.*;
-import org.apache.commons.lang3.StringUtils;
-
+import org.apache.cassandra.transport.SimpleClient;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportFactory;
public class Session implements Serializable
{
@@ -175,7 +171,7 @@ public class Session implements Serializable
public final boolean timeUUIDComparator;
public double traceProbability = 0.0;
public EncryptionOptions encOptions = new ClientEncryptionOptions();
- public TTransportFactory transportFactory = new FramedTransportFactory();
+ public ITransportFactory transportFactory = new TFramedTransportFactory();
public Session(String[] arguments) throws IllegalArgumentException, SyntaxException
{
@@ -455,7 +451,10 @@ public class Session implements Serializable
encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
if (cmd.hasOption("tf"))
+ {
transportFactory = validateAndSetTransportFactory(cmd.getOptionValue("tf"));
+ configureTransportFactory(transportFactory, encOptions);
+ }
if (cmd.hasOption("un"))
username = cmd.getOptionValue("un");
@@ -476,17 +475,17 @@ public class Session implements Serializable
sigma = numDifferentKeys * STDev;
}
- private TTransportFactory validateAndSetTransportFactory(String transportFactory)
+ private ITransportFactory validateAndSetTransportFactory(String transportFactory)
{
try
{
Class factory = Class.forName(transportFactory);
- if(!TTransportFactory.class.isAssignableFrom(factory))
+ if(!ITransportFactory.class.isAssignableFrom(factory))
throw new IllegalArgumentException(String.format("transport factory '%s' " +
- "not derived from TTransportFactory", transportFactory));
+ "not derived from ITransportFactory", transportFactory));
- return (TTransportFactory) factory.newInstance();
+ return (ITransportFactory) factory.newInstance();
}
catch (Exception e)
{
@@ -494,6 +493,34 @@ public class Session implements Serializable
}
}
+ private void configureTransportFactory(ITransportFactory transportFactory, EncryptionOptions encOptions)
+ {
+ Map<String, String> options = new HashMap<>();
+ // If the supplied factory supports the same set of options as our SSL impl, set those
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE))
+ options.put(SSLTransportFactory.TRUSTSTORE, encOptions.truststore);
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD))
+ options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, encOptions.truststore_password);
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL))
+ options.put(SSLTransportFactory.PROTOCOL, encOptions.protocol);
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES))
+ options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(encOptions.cipher_suites));
+
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE)
+ && encOptions.require_client_auth)
+ options.put(SSLTransportFactory.KEYSTORE, encOptions.keystore);
+ if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD)
+ && encOptions.require_client_auth)
+ options.put(SSLTransportFactory.KEYSTORE_PASSWORD, encOptions.keystore_password);
+
+ // Now check if any of the factory's supported options are set as system properties
+ for (String optionKey : transportFactory.supportedOptions())
+ if (System.getProperty(optionKey) != null)
+ options.put(optionKey, System.getProperty(optionKey));
+
+ transportFactory.setOptions(options);
+ }
+
public int getCardinality()
{
return cardinality;
@@ -748,12 +775,11 @@ public class Session implements Serializable
// random node selection for fake load balancing
String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)];
- TSocket socket = new TSocket(currentNode, port);
- TTransport transport = transportFactory.getTransport(socket);
- CassandraClient client = new CassandraClient(new TBinaryProtocol(transport));
-
try
{
+ TTransport transport = transportFactory.openTransport(currentNode, port);
+ CassandraClient client = new CassandraClient(new TBinaryProtocol(transport));
+
if (!transport.isOpen())
transport.open();
@@ -771,6 +797,7 @@ public class Session implements Serializable
AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
client.login(authenticationRequest);
}
+ return client;
}
catch (AuthenticationException e)
{
@@ -788,8 +815,6 @@ public class Session implements Serializable
{
throw new RuntimeException(e.getMessage());
}
-
- return client;
}
public SimpleClient getNativeClient()