You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/01/31 07:39:51 UTC

git commit: Switch stress to use ITransportFactory

Updated Branches:
  refs/heads/cassandra-2.0 36af40925 -> e7b23cc74


Switch stress to use ITransportFactory

patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-6641


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7b23cc7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7b23cc7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7b23cc7

Branch: refs/heads/cassandra-2.0
Commit: e7b23cc741622ccf8b1487d8622cca47dcb9cc34
Parents: 36af409
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Jan 31 01:38:54 2014 -0500
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Jan 31 01:38:54 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/cli/CliOptions.java    | 45 +++++++++++--
 .../org/apache/cassandra/stress/Session.java    | 69 +++++++++++++-------
 3 files changed, 88 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b23cc7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 56a72ef..94b21d4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
  * Add AbstractCompactionStrategy.startup() method (CASSANDRA-6637)
  * SSTableScanner may skip rows during cleanup (CASSANDRA-6638)
  * sstables from stalled repair sessions can resurrect deleted data (CASSANDRA-6503)
+ * Switch stress to use ITransportFactory (CASSANDRA-6641)
 Merged from 1.2:
  * fsync compression metadata (CASSANDRA-6531)
  * Validate CF existence on execution for prepared statement (CASSANDRA-6535)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b23cc7/src/java/org/apache/cassandra/cli/CliOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliOptions.java b/src/java/org/apache/cassandra/cli/CliOptions.java
index 68f17c9..7894bf9 100644
--- a/src/java/org/apache/cassandra/cli/CliOptions.java
+++ b/src/java/org/apache/cassandra/cli/CliOptions.java
@@ -17,9 +17,15 @@
  */
 package org.apache.cassandra.cli;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
 import org.apache.commons.cli.*;
 
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.thrift.ITransportFactory;
+import org.apache.cassandra.thrift.SSLTransportFactory;
 
 /**
  *
@@ -114,11 +120,6 @@ public class CliOptions
                 css.hostName = DEFAULT_HOST;
             }
 
-            if (cmd.hasOption(TRANSPORT_FACTORY))
-            {
-                css.transportFactory = validateAndSetTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY));
-            }
-
             if (cmd.hasOption(DEBUG_OPTION))
             {
                 css.debug = true;
@@ -217,6 +218,12 @@ public class CliOptions
                 css.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
             }
 
+            if (cmd.hasOption(TRANSPORT_FACTORY))
+            {
+                css.transportFactory = validateAndSetTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY));
+                configureTransportFactory(css.transportFactory, css.encOptions);
+            }
+
             // Abort if there are any unrecognized arguments left
             if (cmd.getArgs().length > 0)
             {
@@ -281,4 +288,32 @@ public class CliOptions
             throw new IllegalArgumentException(String.format("Cannot create a transport factory '%s'.", transportFactory), e);
         }
     }
+
+    private static void configureTransportFactory(ITransportFactory transportFactory, EncryptionOptions encOptions)
+    {
+        Map<String, String> options = new HashMap<>();
+        // If the supplied factory supports the same set of options as our SSL impl, set those
+        if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE))
+            options.put(SSLTransportFactory.TRUSTSTORE, encOptions.truststore);
+        if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD))
+            options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, encOptions.truststore_password);
+        if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL))
+            options.put(SSLTransportFactory.PROTOCOL, encOptions.protocol);
+        if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES))
+            options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(encOptions.cipher_suites));
+
+        if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE)
+                && encOptions.require_client_auth)
+            options.put(SSLTransportFactory.KEYSTORE, encOptions.keystore);
+        if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD)
+                && encOptions.require_client_auth)
+            options.put(SSLTransportFactory.KEYSTORE_PASSWORD, encOptions.keystore_password);
+
+        // Now check if any of the factory's supported options are set as system properties
+        for (String optionKey : transportFactory.supportedOptions())
+            if (System.getProperty(optionKey) != null)
+                options.put(optionKey, System.getProperty(optionKey));
+
+        transportFactory.setOptions(options);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b23cc7/tools/stress/src/org/apache/cassandra/stress/Session.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java
index 242fa14..9ac865d 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Session.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Session.java
@@ -24,29 +24,25 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.commons.lang3.StringUtils;
 import com.yammer.metrics.Metrics;
 
 import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.cli.transport.FramedTransportFactory;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.commons.cli.*;
-
-import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.transport.SimpleClient;
 import org.apache.cassandra.thrift.*;
-import org.apache.commons.lang3.StringUtils;
-
+import org.apache.cassandra.transport.SimpleClient;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportFactory;
 
 public class Session implements Serializable
 {
@@ -175,7 +171,7 @@ public class Session implements Serializable
     public final boolean timeUUIDComparator;
     public double traceProbability = 0.0;
     public EncryptionOptions encOptions = new ClientEncryptionOptions();
-    public TTransportFactory transportFactory = new FramedTransportFactory();
+    public ITransportFactory transportFactory = new TFramedTransportFactory();
 
     public Session(String[] arguments) throws IllegalArgumentException, SyntaxException
     {
@@ -455,7 +451,10 @@ public class Session implements Serializable
                 encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
 
             if (cmd.hasOption("tf"))
+            {
                 transportFactory = validateAndSetTransportFactory(cmd.getOptionValue("tf"));
+                configureTransportFactory(transportFactory, encOptions);
+            }
 
             if (cmd.hasOption("un"))
                 username = cmd.getOptionValue("un");
@@ -476,17 +475,17 @@ public class Session implements Serializable
         sigma = numDifferentKeys * STDev;
     }
 
-    private TTransportFactory validateAndSetTransportFactory(String transportFactory)
+    private ITransportFactory validateAndSetTransportFactory(String transportFactory)
     {
         try
         {
             Class factory = Class.forName(transportFactory);
 
-            if(!TTransportFactory.class.isAssignableFrom(factory))
+            if(!ITransportFactory.class.isAssignableFrom(factory))
                 throw new IllegalArgumentException(String.format("transport factory '%s' " +
-                        "not derived from TTransportFactory", transportFactory));
+                        "not derived from ITransportFactory", transportFactory));
 
-            return (TTransportFactory) factory.newInstance();
+            return (ITransportFactory) factory.newInstance();
         }
         catch (Exception e)
         {
@@ -494,6 +493,34 @@ public class Session implements Serializable
         }
     }
 
+    private void configureTransportFactory(ITransportFactory transportFactory, EncryptionOptions encOptions)
+    {
+        Map<String, String> options = new HashMap<>();
+        // If the supplied factory supports the same set of options as our SSL impl, set those
+        if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE))
+            options.put(SSLTransportFactory.TRUSTSTORE, encOptions.truststore);
+        if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD))
+            options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, encOptions.truststore_password);
+        if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL))
+            options.put(SSLTransportFactory.PROTOCOL, encOptions.protocol);
+        if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES))
+            options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(encOptions.cipher_suites));
+
+        if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE)
+                && encOptions.require_client_auth)
+            options.put(SSLTransportFactory.KEYSTORE, encOptions.keystore);
+        if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD)
+                && encOptions.require_client_auth)
+            options.put(SSLTransportFactory.KEYSTORE_PASSWORD, encOptions.keystore_password);
+
+        // Now check if any of the factory's supported options are set as system properties
+        for (String optionKey : transportFactory.supportedOptions())
+            if (System.getProperty(optionKey) != null)
+                options.put(optionKey, System.getProperty(optionKey));
+
+        transportFactory.setOptions(options);
+    }
+
     public int getCardinality()
     {
         return cardinality;
@@ -748,12 +775,11 @@ public class Session implements Serializable
         // random node selection for fake load balancing
         String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)];
 
-        TSocket socket = new TSocket(currentNode, port);
-        TTransport transport = transportFactory.getTransport(socket);
-        CassandraClient client = new CassandraClient(new TBinaryProtocol(transport));
-
         try
         {
+            TTransport transport = transportFactory.openTransport(currentNode, port);
+            CassandraClient client = new CassandraClient(new TBinaryProtocol(transport));
+
             if (!transport.isOpen())
                 transport.open();
 
@@ -771,6 +797,7 @@ public class Session implements Serializable
                 AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
                 client.login(authenticationRequest);
             }
+            return client;
         }
         catch (AuthenticationException e)
         {
@@ -788,8 +815,6 @@ public class Session implements Serializable
         {
             throw new RuntimeException(e.getMessage());
         }
-
-        return client;
     }
 
     public SimpleClient getNativeClient()