You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2013/12/24 03:08:45 UTC

[6/6] git commit: Improve Stress Tool patch by Benedict; reviewed by Pavel Yaskevich for CASSANDRA-6199

Improve Stress Tool
patch by Benedict; reviewed by Pavel Yaskevich for CASSANDRA-6199


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2e1e98ad
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2e1e98ad
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2e1e98ad

Branch: refs/heads/trunk
Commit: 2e1e98ad04c81900524763eddf560edc55dfb299
Parents: 34235ad
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Tue Dec 24 00:33:38 2013 +0000
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Mon Dec 23 18:05:43 2013 -0800

----------------------------------------------------------------------
 build.xml                                       |   6 +
 lib/commons-math3-3.2.jar                       | Bin 0 -> 1692782 bytes
 tools/bin/cassandra-stressd                     |  34 +-
 ...2.0.0-rc2-SNAPSHOT-jar-with-dependencies.jar | Bin 0 -> 5869229 bytes
 ...cassandra-driver-core-2.0.0-rc2-SNAPSHOT.jar | Bin 0 -> 490145 bytes
 .../org/apache/cassandra/stress/Operation.java  | 204 +++++
 .../org/apache/cassandra/stress/Session.java    | 841 -------------------
 .../src/org/apache/cassandra/stress/Stress.java |  63 +-
 .../apache/cassandra/stress/StressAction.java   | 639 +++++++++-----
 .../apache/cassandra/stress/StressMetrics.java  | 178 ++++
 .../apache/cassandra/stress/StressServer.java   |  90 +-
 .../cassandra/stress/StressStatistics.java      | 126 ---
 .../cassandra/stress/generatedata/DataGen.java  |  18 +
 .../stress/generatedata/DataGenBytesRandom.java |  24 +
 .../stress/generatedata/DataGenFactory.java     |   9 +
 .../stress/generatedata/DataGenHex.java         |  39 +
 .../DataGenHexFromDistribution.java             |  45 +
 .../generatedata/DataGenHexFromOpIndex.java     |  27 +
 .../generatedata/DataGenStringDictionary.java   |  84 ++
 .../generatedata/DataGenStringRepeats.java      |  69 ++
 .../stress/generatedata/Distribution.java       |  19 +
 .../generatedata/DistributionBoundApache.java   |  42 +
 .../generatedata/DistributionFactory.java       |  10 +
 .../stress/generatedata/DistributionFixed.java  |  25 +
 .../generatedata/DistributionOffsetApache.java  |  40 +
 .../generatedata/DistributionSeqBatch.java      |  47 ++
 .../cassandra/stress/generatedata/KeyGen.java   |  33 +
 .../cassandra/stress/generatedata/RowGen.java   |  31 +
 .../generatedata/RowGenDistributedSize.java     |  84 ++
 .../stress/operations/CQLOperation.java         |  96 ---
 .../stress/operations/CounterAdder.java         | 141 ----
 .../stress/operations/CounterGetter.java        | 152 ----
 .../stress/operations/CqlCounterAdder.java      |  98 +--
 .../stress/operations/CqlCounterGetter.java     |  98 +--
 .../operations/CqlIndexedRangeSlicer.java       | 224 ++---
 .../stress/operations/CqlInserter.java          | 124 +--
 .../stress/operations/CqlMultiGetter.java       |  19 +-
 .../stress/operations/CqlOperation.java         | 566 +++++++++++++
 .../stress/operations/CqlRangeSlicer.java       |  95 +--
 .../cassandra/stress/operations/CqlReader.java  | 121 +--
 .../stress/operations/IndexedRangeSlicer.java   | 135 ---
 .../cassandra/stress/operations/Inserter.java   | 135 ---
 .../stress/operations/MultiGetter.java          | 152 ----
 .../stress/operations/RangeSlicer.java          | 144 ----
 .../cassandra/stress/operations/Reader.java     | 159 ----
 .../stress/operations/ThriftCounterAdder.java   |  95 +++
 .../stress/operations/ThriftCounterGetter.java  |  75 ++
 .../operations/ThriftIndexedRangeSlicer.java    | 115 +++
 .../stress/operations/ThriftInserter.java       | 117 +++
 .../stress/operations/ThriftMultiGetter.java    |  81 ++
 .../stress/operations/ThriftRangeSlicer.java    |  86 ++
 .../stress/operations/ThriftReader.java         |  76 ++
 .../cassandra/stress/server/StressThread.java   |  77 --
 .../cassandra/stress/settings/CliOption.java    |  58 ++
 .../cassandra/stress/settings/Command.java      | 101 +++
 .../stress/settings/CommandCategory.java        |   8 +
 .../stress/settings/ConnectionAPI.java          |   7 +
 .../stress/settings/ConnectionStyle.java        |   9 +
 .../cassandra/stress/settings/CqlVersion.java   |  48 ++
 .../stress/settings/GroupedOptions.java         | 104 +++
 .../cassandra/stress/settings/Legacy.java       | 369 ++++++++
 .../cassandra/stress/settings/Option.java       |  24 +
 .../stress/settings/OptionDataGen.java          | 177 ++++
 .../stress/settings/OptionDistribution.java     | 340 ++++++++
 .../cassandra/stress/settings/OptionMulti.java  | 107 +++
 .../stress/settings/OptionReplication.java      | 114 +++
 .../cassandra/stress/settings/OptionSimple.java | 131 +++
 .../stress/settings/SettingsColumn.java         | 176 ++++
 .../stress/settings/SettingsCommand.java        | 159 ++++
 .../stress/settings/SettingsCommandMixed.java   | 184 ++++
 .../stress/settings/SettingsCommandMulti.java   |  69 ++
 .../cassandra/stress/settings/SettingsKey.java  | 130 +++
 .../cassandra/stress/settings/SettingsLog.java  |  92 ++
 .../cassandra/stress/settings/SettingsMisc.java | 200 +++++
 .../cassandra/stress/settings/SettingsMode.java | 154 ++++
 .../cassandra/stress/settings/SettingsNode.java | 103 +++
 .../cassandra/stress/settings/SettingsPort.java |  70 ++
 .../cassandra/stress/settings/SettingsRate.java | 116 +++
 .../stress/settings/SettingsSchema.java         | 236 ++++++
 .../stress/settings/SettingsTransport.java      | 121 +++
 .../stress/settings/StressSettings.java         | 239 ++++++
 .../cassandra/stress/util/CassandraClient.java  |  34 -
 .../cassandra/stress/util/JavaDriverClient.java | 148 ++++
 .../apache/cassandra/stress/util/Operation.java | 334 --------
 .../cassandra/stress/util/SampleOfLongs.java    | 107 +++
 .../stress/util/SimpleThriftClient.java         |  90 ++
 .../stress/util/SmartThriftClient.java          | 235 ++++++
 .../cassandra/stress/util/ThriftClient.java     |  36 +
 .../org/apache/cassandra/stress/util/Timer.java | 129 +++
 .../apache/cassandra/stress/util/Timing.java    |  72 ++
 .../cassandra/stress/util/TimingInterval.java   | 132 +++
 .../cassandra/stress/util/Uncertainty.java      |  81 ++
 92 files changed, 7692 insertions(+), 3360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 1ffb908..347323f 100644
--- a/build.xml
+++ b/build.xml
@@ -37,6 +37,7 @@
     <property name="build.src.resources" value="${basedir}/src/resources"/>
     <property name="build.src.gen-java" value="${basedir}/src/gen-java"/>
     <property name="build.lib" value="${basedir}/lib"/>
+    <property name="build.tools.lib" value="${basedir}/tools/lib"/>
     <property name="build.dir" value="${basedir}/build"/>
     <property name="build.dir.lib" value="${basedir}/build/lib"/>
     <property name="build.test.dir" value="${build.dir}/test"/>
@@ -343,6 +344,7 @@
           <dependency groupId="commons-cli" artifactId="commons-cli" version="1.1"/>
           <dependency groupId="commons-codec" artifactId="commons-codec" version="1.2"/>
           <dependency groupId="org.apache.commons" artifactId="commons-lang3" version="3.1"/>
+          <dependency groupId="org.apache.commons" artifactId="commons-math3" version="3.2"/>
           <dependency groupId="com.googlecode.concurrentlinkedhashmap" artifactId="concurrentlinkedhashmap-lru" version="1.3"/>
           <dependency groupId="org.antlr" artifactId="antlr" version="3.2"/>
           <dependency groupId="org.slf4j" artifactId="slf4j-api" version="1.7.2"/>
@@ -453,6 +455,7 @@
         <dependency groupId="commons-cli" artifactId="commons-cli"/>
         <dependency groupId="commons-codec" artifactId="commons-codec"/>
         <dependency groupId="org.apache.commons" artifactId="commons-lang3"/>
+        <dependency groupId="org.apache.commons" artifactId="commons-math3"/>
         <dependency groupId="com.googlecode.concurrentlinkedhashmap" artifactId="concurrentlinkedhashmap-lru"/>
         <dependency groupId="org.antlr" artifactId="antlr"/>
         <dependency groupId="org.slf4j" artifactId="slf4j-api"/>
@@ -703,6 +706,9 @@
                     <fileset dir="${build.lib}">
                         <include name="**/*.jar" />
                     </fileset>
+                    <fileset dir="${build.tools.lib}">
+                        <include name="**/*.jar" />
+                    </fileset>
                 </path>
             </classpath>
         </javac>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/lib/commons-math3-3.2.jar
----------------------------------------------------------------------
diff --git a/lib/commons-math3-3.2.jar b/lib/commons-math3-3.2.jar
new file mode 100644
index 0000000..f8b7db2
Binary files /dev/null and b/lib/commons-math3-3.2.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/bin/cassandra-stressd
----------------------------------------------------------------------
diff --git a/tools/bin/cassandra-stressd b/tools/bin/cassandra-stressd
index 8d337e5..9110c5d 100755
--- a/tools/bin/cassandra-stressd
+++ b/tools/bin/cassandra-stressd
@@ -17,23 +17,25 @@
 # limitations under the License.
 
 DESC="Cassandra Stress Test Daemon"
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then 
+    for include in "`dirname $0`/cassandra.in.sh" \
+                   "$HOME/.cassandra.in.sh" \
+                   /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh; do
+        if [ -r $include ]; then 
+            . $include
+            break   
+        fi      
+    done    
+elif [ -r $CASSANDRA_INCLUDE ]; then 
+    . $CASSANDRA_INCLUDE
+fi
 
-if [ "x$CLASSPATH" = "x" ]; then
-    
-    # execute from the build dir.
-    if [ -d `dirname $0`/../../build/classes ]; then
-        for directory in `dirname $0`/../../build/classes/*; do
-            CLASSPATH=$CLASSPATH:$directory
-        done
-    else
-        if [ -f `dirname $0`/../lib/stress.jar ]; then
-            CLASSPATH=`dirname $0`/../lib/stress.jar
-        fi
-    fi
-
-    for jar in `dirname $0`/../../lib/*.jar; do
-        CLASSPATH=$CLASSPATH:$jar
-    done
+if [ -x $JAVA_HOME/bin/java ]; then 
+    JAVA=$JAVA_HOME/bin/java
+else
+    JAVA=`which java`
 fi
 
 if [ -x $JAVA_HOME/bin/java ]; then

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT-jar-with-dependencies.jar
----------------------------------------------------------------------
diff --git a/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT-jar-with-dependencies.jar b/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT-jar-with-dependencies.jar
new file mode 100644
index 0000000..1f4dafd
Binary files /dev/null and b/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT-jar-with-dependencies.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT.jar
----------------------------------------------------------------------
diff --git a/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT.jar b/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT.jar
new file mode 100644
index 0000000..c0d4242
Binary files /dev/null and b/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
new file mode 100644
index 0000000..fa7a453
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.stress.generatedata.KeyGen;
+import org.apache.cassandra.stress.generatedata.RowGen;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.CqlVersion;
+import org.apache.cassandra.stress.settings.SettingsCommandMixed;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public abstract class Operation
+{
+    public final long index;
+    protected final State state;
+
+    public Operation(State state, long idx)
+    {
+        index = idx;
+        this.state = state;
+    }
+
+    public static interface RunOp
+    {
+        public boolean run() throws Exception;
+        public String key();
+        public int keyCount();
+    }
+
+    // one per thread!
+    public static final class State
+    {
+
+        public final StressSettings settings;
+        public final Timer timer;
+        public final Command type;
+        public final KeyGen keyGen;
+        public final RowGen rowGen;
+        public final List<ColumnParent> columnParents;
+        public final StressMetrics metrics;
+        public final SettingsCommandMixed.CommandSelector readWriteSelector;
+        private Object cqlCache;
+
+        public State(Command type, StressSettings settings, StressMetrics metrics)
+        {
+            this.type = type;
+            this.timer = metrics.getTiming().newTimer();
+            if (type == Command.MIXED)
+                readWriteSelector = ((SettingsCommandMixed) settings.command).selector();
+            else
+                readWriteSelector = null;
+            this.settings = settings;
+            this.keyGen = settings.keys.newKeyGen();
+            this.rowGen = settings.columns.newRowGen();
+            this.metrics = metrics;
+            if (!settings.columns.useSuperColumns)
+                columnParents = Collections.singletonList(new ColumnParent(settings.schema.columnFamily));
+            else
+            {
+                ColumnParent[] cp = new ColumnParent[settings.columns.superColumns];
+                for (int i = 0 ; i < cp.length ; i++)
+                    cp[i] = new ColumnParent("Super1").setSuper_column(ByteBufferUtil.bytes("S" + i));
+                columnParents = Arrays.asList(cp);
+            }
+        }
+        public boolean isCql3()
+        {
+            return settings.mode.cqlVersion == CqlVersion.CQL3;
+        }
+        public boolean isCql2()
+        {
+            return settings.mode.cqlVersion == CqlVersion.CQL2;
+        }
+        public Object getCqlCache()
+        {
+            return cqlCache;
+        }
+        public void storeCqlCache(Object val)
+        {
+            cqlCache = val;
+        }
+    }
+
+    protected ByteBuffer getKey()
+    {
+        return state.keyGen.getKeys(1, index).get(0);
+    }
+
+    protected List<ByteBuffer> getKeys(int count)
+    {
+        return state.keyGen.getKeys(count, index);
+    }
+
+    protected List<ByteBuffer> generateColumnValues()
+    {
+        return state.rowGen.generate(index);
+    }
+
+    /**
+     * Run operation
+     * @param client Cassandra Thrift client connection
+     * @throws IOException on any I/O error.
+     */
+    public abstract void run(ThriftClient client) throws IOException;
+
+    public void run(SimpleClient client) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    public void run(JavaDriverClient client) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    public void timeWithRetry(RunOp run) throws IOException
+    {
+        state.timer.start();
+
+        boolean success = false;
+        String exceptionMessage = null;
+
+        for (int t = 0; t < state.settings.command.tries; t++)
+        {
+            if (success)
+                break;
+
+            try
+            {
+                success = run.run();
+            }
+            catch (Exception e)
+            {
+                System.err.println(e);
+                exceptionMessage = getExceptionMessage(e);
+                success = false;
+            }
+        }
+
+        state.timer.stop(run.keyCount());
+
+        if (!success)
+        {
+            error(String.format("Operation [%d] retried %d times - error executing for key %s %s%n",
+                    index,
+                    state.settings.command.tries,
+                    run.key(),
+                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+        }
+
+    }
+
+    protected String getExceptionMessage(Exception e)
+    {
+        String className = e.getClass().getSimpleName();
+        String message = (e instanceof InvalidRequestException) ? ((InvalidRequestException) e).getWhy() : e.getMessage();
+        return (message == null) ? "(" + className + ")" : String.format("(%s): %s", className, message);
+    }
+
+    protected void error(String message) throws IOException
+    {
+        if (!state.settings.command.ignoreErrors)
+            throw new IOException(message);
+        else
+            System.err.println(message);
+    }
+
+    public static ByteBuffer getColumnNameBytes(int i)
+    {
+        return ByteBufferUtil.bytes("C" + i);
+    }
+
+    public static String getColumnName(int i)
+    {
+        return "C" + i;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/Session.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java
deleted file mode 100644
index 8d138f5..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/Session.java
+++ /dev/null
@@ -1,841 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.cli.*;
-import org.apache.commons.lang3.StringUtils;
-
-import com.yammer.metrics.Metrics;
-
-import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.cli.transport.FramedTransportFactory;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportFactory;
-
-public class Session implements Serializable
-{
-    // command line options
-    public static final Options availableOptions = new Options();
-
-    public static final String KEYSPACE_NAME = "Keyspace1";
-    public static final String DEFAULT_COMPARATOR = "AsciiType";
-    public static final String DEFAULT_VALIDATOR  = "BytesType";
-
-    private static InetAddress localInetAddress;
-
-    public final AtomicInteger operations = new AtomicInteger();
-    public final AtomicInteger keys = new AtomicInteger();
-    public final com.yammer.metrics.core.Timer latency = Metrics.newTimer(Session.class, "latency");
-
-    private static final String SSL_TRUSTSTORE = "truststore";
-    private static final String SSL_TRUSTSTORE_PW = "truststore-password";
-    private static final String SSL_PROTOCOL = "ssl-protocol";
-    private static final String SSL_ALGORITHM = "ssl-alg";
-    private static final String SSL_STORE_TYPE = "store-type";
-    private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
-
-    static
-    {
-        availableOptions.addOption("h",  "help",                 false,  "Show this help message and exit");
-        availableOptions.addOption("n",  "num-keys",             true,   "Number of keys, default:1000000");
-        availableOptions.addOption("F",  "num-different-keys",   true,   "Number of different keys (if < NUM-KEYS, the same key will re-used multiple times), default:NUM-KEYS");
-        availableOptions.addOption("N",  "skip-keys",            true,   "Fraction of keys to skip initially, default:0");
-        availableOptions.addOption("t",  "threads",              true,   "Number of threads to use, default:50");
-        availableOptions.addOption("c",  "cells",                true,   "Number of cells per key, default:5");
-        availableOptions.addOption("S",  "column-size",          true,   "Size of column values in bytes, default:34");
-        availableOptions.addOption("C",  "cardinality",          true,   "Number of unique values stored in cells, default:50");
-        availableOptions.addOption("d",  "nodes",                true,   "Host nodes (comma separated), default:locahost");
-        availableOptions.addOption("D",  "nodesfile",            true,   "File containing host nodes (one per line)");
-        availableOptions.addOption("s",  "stdev",                true,   "Standard Deviation Factor, default:0.1");
-        availableOptions.addOption("r",  "random",               false,  "Use random key generator (STDEV will have no effect), default:false");
-        availableOptions.addOption("f",  "file",                 true,   "Write output to given file");
-        availableOptions.addOption("p",  "port",                 true,   "Thrift port, default:9160");
-        availableOptions.addOption("o",  "operation",            true,   "Operation to perform (INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET, COUNTER_ADD, COUNTER_GET), default:INSERT");
-        availableOptions.addOption("u",  "supercolumns",         true,   "Number of super columns per key, default:1");
-        availableOptions.addOption("y",  "family-type",          true,   "Column Family Type (Super, Standard), default:Standard");
-        availableOptions.addOption("K",  "keep-trying",          true,   "Retry on-going operation N times (in case of failure). positive integer, default:10");
-        availableOptions.addOption("k",  "keep-going",           false,  "Ignore errors inserting or reading (when set, --keep-trying has no effect), default:false");
-        availableOptions.addOption("i",  "progress-interval",    true,   "Progress Report Interval (seconds), default:10");
-        availableOptions.addOption("g",  "keys-per-call",        true,   "Number of keys to get_range_slices or multiget per call, default:1000");
-        availableOptions.addOption("l",  "replication-factor",   true,   "Replication Factor to use when creating needed column families, default:1");
-        availableOptions.addOption("L",  "enable-cql",           false,  "Perform queries using CQL2 (Cassandra Query Language v 2.0.0)");
-        availableOptions.addOption("L3", "enable-cql3",          false,  "Perform queries using CQL3 (Cassandra Query Language v 3.0.0)");
-        availableOptions.addOption("b",  "enable-native-protocol",  false,  "Use the binary native protocol (only work along with -L3)");
-        availableOptions.addOption("P",  "use-prepared-statements", false, "Perform queries using prepared statements (only applicable to CQL).");
-        availableOptions.addOption("e",  "consistency-level",    true,   "Consistency Level to use (ONE, QUORUM, LOCAL_QUORUM, EACH_QUORUM, ALL, ANY), default:ONE");
-        availableOptions.addOption("x",  "create-index",         true,   "Type of index to create on needed column families (KEYS)");
-        availableOptions.addOption("R",  "replication-strategy", true,   "Replication strategy to use (only on insert if keyspace does not exist), default:org.apache.cassandra.locator.SimpleStrategy");
-        availableOptions.addOption("O",  "strategy-properties",  true,   "Replication strategy properties in the following format <dc_name>:<num>,<dc_name>:<num>,...");
-        availableOptions.addOption("W",  "no-replicate-on-write",false,  "Set replicate_on_write to false for counters. Only counter add with CL=ONE will work");
-        availableOptions.addOption("V",  "average-size-values",  false,  "Generate column values of average rather than specific size");
-        availableOptions.addOption("T",  "send-to",              true,   "Send this as a request to the stress daemon at specified address.");
-        availableOptions.addOption("I",  "compression",          true,   "Specify the compression to use for sstable, default:no compression");
-        availableOptions.addOption("Q",  "query-names",          true,   "Comma-separated list of column names to retrieve from each row.");
-        availableOptions.addOption("Z",  "compaction-strategy",  true,   "CompactionStrategy to use.");
-        availableOptions.addOption("U",  "comparator",           true,   "Cell Comparator to use. Currently supported types are: TimeUUIDType, AsciiType, UTF8Type.");
-        availableOptions.addOption("tf", "transport-factory",    true,   "Fully-qualified TTransportFactory class name for creating a connection. Note: For Thrift over SSL, use org.apache.cassandra.stress.SSLTransportFactory.");
-        availableOptions.addOption("ns", "no-statistics",        false,  "Turn off the aggegate statistics that is normally output after completion.");
-        availableOptions.addOption("ts", SSL_TRUSTSTORE,         true, "SSL: full path to truststore");
-        availableOptions.addOption("tspw", SSL_TRUSTSTORE_PW,    true, "SSL: full path to truststore");
-        availableOptions.addOption("prtcl", SSL_PROTOCOL,        true, "SSL: connections protocol to use (default: TLS)");
-        availableOptions.addOption("alg", SSL_ALGORITHM,         true, "SSL: algorithm (default: SunX509)");
-        availableOptions.addOption("st", SSL_STORE_TYPE,         true, "SSL: type of store");
-        availableOptions.addOption("ciphers", SSL_CIPHER_SUITES, true, "SSL: comma-separated list of encryption suites to use");
-        availableOptions.addOption("th", "throttle",             true, "Throttle the total number of operations per second to a maximum amount.");
-        availableOptions.addOption("un", "username",             true, "Username for authentication.");
-        availableOptions.addOption("pw", "password",             true, "Password for authentication.");
-    }
-
-    private int numKeys          = 1000 * 1000;
-    private int numDifferentKeys = numKeys;
-    private float skipKeys       = 0;
-    private int threads          = 50;
-    private int columns          = 5;
-    private int columnSize       = 34;
-    private int cardinality      = 50;
-    public String[] nodes        = new String[] { "127.0.0.1" };
-    private boolean random       = false;
-    private int retryTimes       = 10;
-    public int port              = 9160;
-    private int superColumns     = 1;
-    private String compression   = null;
-    private String compactionStrategy = null;
-    private String username      = null;
-    private String password      = null;
-
-    private int progressInterval  = 10;
-    private int keysPerCall       = 1000;
-    private boolean replicateOnWrite = true;
-    private boolean ignoreErrors  = false;
-    private boolean enable_cql    = false;
-    private boolean use_prepared  = false;
-    private boolean trace         = false;
-    private boolean captureStatistics = true;
-    public boolean use_native_protocol = false;
-    private double maxOpsPerSecond = Double.MAX_VALUE;
-
-    private final String outFileName;
-
-    private IndexType indexType = null;
-    private Stress.Operations operation = Stress.Operations.INSERT;
-    private ColumnFamilyType columnFamilyType = ColumnFamilyType.Standard;
-    private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
-    private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy";
-    private Map<String, String> replicationStrategyOptions = new HashMap<String, String>();
-
-    // if we know exactly column names that we want to read (set by -Q option)
-    public final List<ByteBuffer> columnNames;
-
-    public String cqlVersion;
-
-    public final boolean averageSizeValues;
-
-    // required by Gaussian distribution.
-    protected int   mean;
-    protected float sigma;
-
-    public final InetAddress sendToDaemon;
-    public final String comparator;
-    public final boolean timeUUIDComparator;
-    public double traceProbability = 0.0;
-    public EncryptionOptions encOptions = new ClientEncryptionOptions();
-    public TTransportFactory transportFactory = new FramedTransportFactory();
-
-    public Session(String[] arguments) throws IllegalArgumentException, SyntaxException
-    {
-        float STDev = 0.1f;
-        CommandLineParser parser = new PosixParser();
-
-        try
-        {
-            CommandLine cmd = parser.parse(availableOptions, arguments);
-
-            if (cmd.getArgs().length > 0)
-            {
-                System.err.println("Application does not allow arbitrary arguments: " + StringUtils.join(cmd.getArgList(), ", "));
-                System.exit(1);
-            }
-
-            if (cmd.hasOption("h"))
-                throw new IllegalArgumentException("help");
-
-            if (cmd.hasOption("n"))
-                numKeys = Integer.parseInt(cmd.getOptionValue("n"));
-
-            if (cmd.hasOption("F"))
-                numDifferentKeys = Integer.parseInt(cmd.getOptionValue("F"));
-            else
-                numDifferentKeys = numKeys;
-
-            if (cmd.hasOption("N"))
-                skipKeys = Float.parseFloat(cmd.getOptionValue("N"));
-
-            if (cmd.hasOption("t"))
-                threads = Integer.parseInt(cmd.getOptionValue("t"));
-
-            if (cmd.hasOption("c"))
-                columns = Integer.parseInt(cmd.getOptionValue("c"));
-
-            if (cmd.hasOption("S"))
-                columnSize = Integer.parseInt(cmd.getOptionValue("S"));
-
-            if (cmd.hasOption("C"))
-                cardinality = Integer.parseInt(cmd.getOptionValue("C"));
-
-            if (cmd.hasOption("d"))
-                nodes = cmd.getOptionValue("d").split(",");
-
-            if (cmd.hasOption("D"))
-            {
-                try
-                {
-                    String node;
-                    List<String> tmpNodes = new ArrayList<String>();
-                    BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(cmd.getOptionValue("D"))));
-                    try
-                    {
-                        while ((node = in.readLine()) != null)
-                        {
-                            if (node.length() > 0)
-                                tmpNodes.add(node);
-                        }
-                        nodes = tmpNodes.toArray(new String[tmpNodes.size()]);
-                    }
-                    finally
-                    {
-                        in.close();
-                    }
-                }
-                catch(IOException ioe)
-                {
-                    throw new RuntimeException(ioe);
-                }
-            }
-
-            if (cmd.hasOption("s"))
-                STDev = Float.parseFloat(cmd.getOptionValue("s"));
-
-            if (cmd.hasOption("r"))
-                random = true;
-
-            outFileName = (cmd.hasOption("f")) ? cmd.getOptionValue("f") : null;
-
-            if (cmd.hasOption("p"))
-                port = Integer.parseInt(cmd.getOptionValue("p"));
-
-            if (cmd.hasOption("o"))
-                operation = Stress.Operations.valueOf(cmd.getOptionValue("o").toUpperCase());
-
-            if (cmd.hasOption("u"))
-                superColumns = Integer.parseInt(cmd.getOptionValue("u"));
-
-            if (cmd.hasOption("y"))
-                columnFamilyType = ColumnFamilyType.valueOf(cmd.getOptionValue("y"));
-
-            if (cmd.hasOption("K"))
-            {
-                retryTimes = Integer.valueOf(cmd.getOptionValue("K"));
-
-                if (retryTimes <= 0)
-                {
-                    throw new RuntimeException("--keep-trying option value should be > 0");
-                }
-            }
-
-            if (cmd.hasOption("k"))
-            {
-                retryTimes = 1;
-                ignoreErrors = true;
-            }
-
-
-            if (cmd.hasOption("i"))
-                progressInterval = Integer.parseInt(cmd.getOptionValue("i"));
-
-            if (cmd.hasOption("g"))
-                keysPerCall = Integer.parseInt(cmd.getOptionValue("g"));
-
-            if (cmd.hasOption("th"))
-                maxOpsPerSecond = Double.parseDouble(cmd.getOptionValue("th"));
-
-            if (cmd.hasOption("e"))
-                consistencyLevel = ConsistencyLevel.valueOf(cmd.getOptionValue("e").toUpperCase());
-
-            if (cmd.hasOption("x"))
-                indexType = IndexType.valueOf(cmd.getOptionValue("x").toUpperCase());
-
-            if (cmd.hasOption("R"))
-                replicationStrategy = cmd.getOptionValue("R");
-
-            if (cmd.hasOption("l"))
-                replicationStrategyOptions.put("replication_factor", String.valueOf(Integer.parseInt(cmd.getOptionValue("l"))));
-            else if (replicationStrategy.endsWith("SimpleStrategy"))
-                replicationStrategyOptions.put("replication_factor", "1");
-
-            if (cmd.hasOption("L"))
-            {
-                enable_cql = true;
-                cqlVersion = "2.0.0";
-            }
-
-            if (cmd.hasOption("L3"))
-            {
-                enable_cql = true;
-                cqlVersion = "3.0.0";
-            }
-
-            if (cmd.hasOption("b"))
-            {
-                if (!(enable_cql && cqlVersion.startsWith("3")))
-                    throw new IllegalArgumentException("Cannot use binary protocol without -L3");
-                use_native_protocol = true;
-            }
-
-            if (cmd.hasOption("P"))
-            {
-                if (!enable_cql)
-                {
-                    System.err.println("-P/--use-prepared-statements is only applicable with CQL (-L/--enable-cql)");
-                    System.exit(-1);
-                }
-                use_prepared = true;
-            }
-
-            if (cmd.hasOption("O"))
-            {
-                String[] pairs = StringUtils.split(cmd.getOptionValue("O"), ',');
-
-                for (String pair : pairs)
-                {
-                    String[] keyAndValue = StringUtils.split(pair, ':');
-
-                    if (keyAndValue.length != 2)
-                        throw new RuntimeException("Invalid --strategy-properties value.");
-
-                    replicationStrategyOptions.put(keyAndValue[0], keyAndValue[1]);
-                }
-            }
-
-            if (cmd.hasOption("W"))
-                replicateOnWrite = false;
-
-            if (cmd.hasOption("I"))
-                compression = cmd.getOptionValue("I");
-
-            averageSizeValues = cmd.hasOption("V");
-
-            try
-            {
-                sendToDaemon = cmd.hasOption("send-to")
-                                ? InetAddress.getByName(cmd.getOptionValue("send-to"))
-                                : null;
-            }
-            catch (UnknownHostException e)
-            {
-                throw new RuntimeException(e);
-            }
-
-            if (cmd.hasOption("Q"))
-            {
-                AbstractType comparator = TypeParser.parse(DEFAULT_COMPARATOR);
-
-                String[] names = StringUtils.split(cmd.getOptionValue("Q"), ",");
-                columnNames = new ArrayList<ByteBuffer>(names.length);
-
-                for (String columnName : names)
-                    columnNames.add(comparator.fromString(columnName));
-            }
-            else
-            {
-                columnNames = null;
-            }
-
-            if (cmd.hasOption("Z"))
-            {
-                compactionStrategy = cmd.getOptionValue("Z");
-
-                try
-                {
-                    // validate compaction strategy class
-                    CFMetaData.createCompactionStrategy(compactionStrategy);
-                }
-                catch (ConfigurationException e)
-                {
-                    System.err.println(e.getMessage());
-                    System.exit(1);
-                }
-            }
-
-            if (cmd.hasOption("U"))
-            {
-                AbstractType parsed = null;
-
-                try
-                {
-                    parsed = TypeParser.parse(cmd.getOptionValue("U"));
-                }
-                catch (ConfigurationException e)
-                {
-                    System.err.println(e.getMessage());
-                    System.exit(1);
-                }
-
-                comparator = cmd.getOptionValue("U");
-                timeUUIDComparator = parsed instanceof TimeUUIDType;
-
-                if (!(parsed instanceof TimeUUIDType || parsed instanceof AsciiType || parsed instanceof UTF8Type))
-                {
-                    System.err.println("Currently supported types are: TimeUUIDType, AsciiType, UTF8Type.");
-                    System.exit(1);
-                }
-            }
-            else
-            {
-                comparator = null;
-                timeUUIDComparator = false;
-            }
-
-            if (cmd.hasOption("ns"))
-            {
-                captureStatistics = false;
-            }
-
-            if(cmd.hasOption(SSL_TRUSTSTORE))
-                encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
-
-            if(cmd.hasOption(SSL_TRUSTSTORE_PW))
-                encOptions.truststore_password = cmd.getOptionValue(SSL_TRUSTSTORE_PW);
-
-            if(cmd.hasOption(SSL_PROTOCOL))
-                encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL);
-
-            if(cmd.hasOption(SSL_ALGORITHM))
-                encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM);
-
-            if(cmd.hasOption(SSL_STORE_TYPE))
-                encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE);
-
-            if(cmd.hasOption(SSL_CIPHER_SUITES))
-                encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
-
-            if (cmd.hasOption("tf"))
-                transportFactory = validateAndSetTransportFactory(cmd.getOptionValue("tf"));
-
-            if (cmd.hasOption("un"))
-                username = cmd.getOptionValue("un");
-
-            if (cmd.hasOption("pw"))
-                password = cmd.getOptionValue("pw");
-        }
-        catch (ParseException e)
-        {
-            throw new IllegalArgumentException(e.getMessage(), e);
-        }
-        catch (ConfigurationException e)
-        {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-
-        mean  = numDifferentKeys / 2;
-        sigma = numDifferentKeys * STDev;
-    }
-
-    private TTransportFactory validateAndSetTransportFactory(String transportFactory)
-    {
-        try
-        {
-            Class factory = Class.forName(transportFactory);
-
-            if(!TTransportFactory.class.isAssignableFrom(factory))
-                throw new IllegalArgumentException(String.format("transport factory '%s' " +
-                        "not derived from TTransportFactory", transportFactory));
-
-            return (TTransportFactory) factory.newInstance();
-        }
-        catch (Exception e)
-        {
-            throw new IllegalArgumentException(String.format("Cannot create a transport factory '%s'.", transportFactory), e);
-        }
-    }
-
-    public int getCardinality()
-    {
-        return cardinality;
-    }
-
-    public int getColumnSize()
-    {
-        return columnSize;
-    }
-
-    public int getColumnsPerKey()
-    {
-        return columns;
-    }
-
-    public ColumnFamilyType getColumnFamilyType()
-    {
-        return columnFamilyType;
-    }
-
-    public int getNumKeys()
-    {
-        return numKeys;
-    }
-
-    public int getNumDifferentKeys()
-    {
-        return numDifferentKeys;
-    }
-
-    public int getThreads()
-    {
-        return threads;
-    }
-
-    public double getMaxOpsPerSecond()
-    {
-        return maxOpsPerSecond;
-    }
-
-    public float getSkipKeys()
-    {
-        return skipKeys;
-    }
-
-    public int getSuperColumns()
-    {
-        return superColumns;
-    }
-
-    public int getKeysPerThread()
-    {
-        return numKeys / threads;
-    }
-
-    public int getTotalKeysLength()
-    {
-        return Integer.toString(numDifferentKeys).length();
-    }
-
-    public ConsistencyLevel getConsistencyLevel()
-    {
-        return consistencyLevel;
-    }
-
-    public int getRetryTimes()
-    {
-        return retryTimes;
-    }
-
-    public boolean ignoreErrors()
-    {
-        return ignoreErrors;
-    }
-
-    public Stress.Operations getOperation()
-    {
-        return operation;
-    }
-
-    public PrintStream getOutputStream()
-    {
-        try
-        {
-            return (outFileName == null) ? System.out : new PrintStream(new FileOutputStream(outFileName));
-        }
-        catch (FileNotFoundException e)
-        {
-            throw new RuntimeException(e.getMessage(), e);
-        }
-    }
-
-    public int getProgressInterval()
-    {
-        return progressInterval;
-    }
-
-    public boolean useRandomGenerator()
-    {
-        return random;
-    }
-
-    public int getKeysPerCall()
-    {
-        return keysPerCall;
-    }
-
-    // required by Gaussian distribution
-    public int getMean()
-    {
-        return mean;
-    }
-
-    // required by Gaussian distribution
-    public float getSigma()
-    {
-        return sigma;
-    }
-
-    public boolean isCQL()
-    {
-        return enable_cql;
-    }
-
-    public boolean usePreparedStatements()
-    {
-        return use_prepared;
-    }
-
-    public boolean outputStatistics()
-    {
-        return captureStatistics;
-    }
-
-    /**
-     * Create Keyspace with Standard and Super/Counter column families
-     */
-    public void createKeySpaces()
-    {
-        KsDef keyspace = new KsDef();
-        String defaultComparator = comparator == null ? DEFAULT_COMPARATOR : comparator;
-
-        // column family for standard columns
-        CfDef standardCfDef = new CfDef(KEYSPACE_NAME, "Standard1");
-        Map<String, String> compressionOptions = new HashMap<String, String>();
-        if (compression != null)
-            compressionOptions.put("sstable_compression", compression);
-
-        standardCfDef.setComparator_type(defaultComparator)
-                     .setDefault_validation_class(DEFAULT_VALIDATOR)
-                     .setCompression_options(compressionOptions);
-
-        if (!timeUUIDComparator)
-        {
-            for (int i = 0; i < getColumnsPerKey(); i++)
-            {
-                standardCfDef.addToColumn_metadata(new ColumnDef(ByteBufferUtil.bytes("C" + i), "BytesType"));
-            }
-        }
-
-        if (indexType != null)
-        {
-            ColumnDef standardColumn = new ColumnDef(ByteBufferUtil.bytes("C1"), "BytesType");
-            standardColumn.setIndex_type(indexType).setIndex_name("Idx1");
-            standardCfDef.setColumn_metadata(Arrays.asList(standardColumn));
-        }
-
-        // column family with super columns
-        CfDef superCfDef = new CfDef(KEYSPACE_NAME, "Super1").setColumn_type("Super");
-        superCfDef.setComparator_type(DEFAULT_COMPARATOR)
-                  .setSubcomparator_type(defaultComparator)
-                  .setDefault_validation_class(DEFAULT_VALIDATOR)
-                  .setCompression_options(compressionOptions);
-
-        // column family for standard counters
-        CfDef counterCfDef = new CfDef(KEYSPACE_NAME, "Counter1").setComparator_type(defaultComparator)
-                                                                 .setComparator_type(defaultComparator)
-                                                                 .setDefault_validation_class("CounterColumnType")
-                                                                 .setReplicate_on_write(replicateOnWrite)
-                                                                 .setCompression_options(compressionOptions);
-
-        // column family with counter super columns
-        CfDef counterSuperCfDef = new CfDef(KEYSPACE_NAME, "SuperCounter1").setComparator_type(defaultComparator)
-                                                                           .setDefault_validation_class("CounterColumnType")
-                                                                           .setReplicate_on_write(replicateOnWrite)
-                                                                           .setColumn_type("Super")
-                                                                           .setCompression_options(compressionOptions);
-
-        keyspace.setName(KEYSPACE_NAME);
-        keyspace.setStrategy_class(replicationStrategy);
-
-        if (!replicationStrategyOptions.isEmpty())
-        {
-            keyspace.setStrategy_options(replicationStrategyOptions);
-        }
-
-        if (compactionStrategy != null)
-        {
-            standardCfDef.setCompaction_strategy(compactionStrategy);
-            superCfDef.setCompaction_strategy(compactionStrategy);
-            counterCfDef.setCompaction_strategy(compactionStrategy);
-            counterSuperCfDef.setCompaction_strategy(compactionStrategy);
-        }
-
-        keyspace.setCf_defs(new ArrayList<CfDef>(Arrays.asList(standardCfDef, superCfDef, counterCfDef, counterSuperCfDef)));
-
-        CassandraClient client = getClient(false);
-
-        try
-        {
-            client.system_add_keyspace(keyspace);
-
-            /* CQL3 counter cf */
-            client.set_cql_version("3.0.0"); // just to create counter cf for cql3
-
-            client.set_keyspace(KEYSPACE_NAME);
-            client.execute_cql3_query(createCounterCFStatementForCQL3(), Compression.NONE, ConsistencyLevel.ONE);
-
-            if (enable_cql)
-                client.set_cql_version(cqlVersion);
-            /* end */
-
-            System.out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length));
-            Thread.sleep(nodes.length * 1000); // seconds
-        }
-        catch (InvalidRequestException e)
-        {
-            System.err.println("Unable to create stress keyspace: " + e.getWhy());
-        }
-        catch (Exception e)
-        {
-            System.err.println(e.getMessage());
-        }
-    }
-
-    /**
-     * Thrift client connection with Keyspace1 set.
-     * @return cassandra client connection
-     */
-    public CassandraClient getClient()
-    {
-        return getClient(true);
-    }
-
-    /**
-     * Thrift client connection
-     * @param setKeyspace - should we set keyspace for client or not
-     * @return cassandra client connection
-     */
-    public CassandraClient getClient(boolean setKeyspace)
-    {
-        // random node selection for fake load balancing
-        String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)];
-
-        TSocket socket = new TSocket(currentNode, port);
-        TTransport transport = transportFactory.getTransport(socket);
-        CassandraClient client = new CassandraClient(new TBinaryProtocol(transport));
-
-        try
-        {
-            if (!transport.isOpen())
-                transport.open();
-
-            if (enable_cql)
-                client.set_cql_version(cqlVersion);
-
-            if (setKeyspace)
-                client.set_keyspace("Keyspace1");
-
-            if (username != null && password != null)
-            {
-                Map<String, String> credentials = new HashMap<String, String>();
-                credentials.put(IAuthenticator.USERNAME_KEY, username);
-                credentials.put(IAuthenticator.PASSWORD_KEY, password);
-                AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
-                client.login(authenticationRequest);
-            }
-        }
-        catch (AuthenticationException e)
-        {
-            throw new RuntimeException(e.getWhy());
-        }
-        catch (AuthorizationException e)
-        {
-            throw new RuntimeException(e.getWhy());
-        }
-        catch (InvalidRequestException e)
-        {
-            throw new RuntimeException(e.getWhy());
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e.getMessage());
-        }
-
-        return client;
-    }
-
-    public SimpleClient getNativeClient()
-    {
-        try
-        {
-            String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)];
-            SimpleClient client = new SimpleClient(currentNode, 9042);
-            client.connect(false);
-            client.execute("USE \"Keyspace1\";", org.apache.cassandra.db.ConsistencyLevel.ONE);
-            return client;
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e.getMessage());
-        }
-    }
-
-    public static InetAddress getLocalAddress()
-    {
-        if (localInetAddress == null)
-        {
-            try
-            {
-                localInetAddress = InetAddress.getLocalHost();
-            }
-            catch (UnknownHostException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        return localInetAddress;
-    }
-
-    private ByteBuffer createCounterCFStatementForCQL3()
-    {
-        StringBuilder counter3 = new StringBuilder("CREATE TABLE \"Counter3\" (KEY blob PRIMARY KEY, ");
-
-        for (int i = 0; i < getColumnsPerKey(); i++)
-        {
-            counter3.append("c").append(i).append(" counter");
-            if (i != getColumnsPerKey() - 1)
-                counter3.append(", ");
-        }
-        counter3.append(");");
-
-        return ByteBufferUtil.bytes(counter3.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/Stress.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Stress.java b/tools/stress/src/org/apache/cassandra/stress/Stress.java
index 738a1c0..38af4f6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Stress.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Stress.java
@@ -17,48 +17,65 @@
  */
 package org.apache.cassandra.stress;
 
-import org.apache.commons.cli.Option;
-
 import java.io.*;
 import java.net.Socket;
 import java.net.SocketException;
-import java.util.Random;
+
+import org.apache.cassandra.stress.settings.StressSettings;
 
 public final class Stress
 {
-    public static enum Operations
-    {
-        INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET, COUNTER_ADD, COUNTER_GET
-    }
 
-    public static Session session;
-    public static Random randomizer = new Random();
+    /**
+     * Known issues:
+     * - uncertainty/stderr assumes op-rates are normally distributed. Due to GC (and possibly latency stepping from
+     * different media, though the variance of request ratio across media should be normally distributed), they are not.
+     * Should attempt to account for pauses in stderr calculation, possibly by assuming these pauses are a separate
+     * normally distributed occurrence
+     * - Under very mixed work loads, the uncertainty calculations and op/s reporting really don't mean much. Should
+     * consider breaking op/s down per workload, or should have a lower-bound on inspection interval based on clustering
+     * of operations and thread count.
+     *
+     *
+     * Future improvements:
+     * - Configurable connection compression
+     * - Java driver support
+     * - Per column data generators
+     * - Automatic column/schema detection if provided with a CF
+     * - target rate produces a very steady work rate, and if we want to simulate a real op rate for an
+     *   application we should have some variation in the actual op rate within any time-slice.
+     * - auto rate should vary the thread count based on performance improvement, potentially starting on a very low
+     *   thread count with a high error rate / low count to get some basic numbers
+     */
+
     private static volatile boolean stopped = false;
 
     public static void main(String[] arguments) throws Exception
     {
+        final StressSettings settings;
         try
         {
-            session = new Session(arguments);
+            settings = StressSettings.parse(arguments);
         }
         catch (IllegalArgumentException e)
         {
             printHelpMessage();
+            e.printStackTrace();
             return;
         }
 
-        PrintStream outStream = session.getOutputStream();
+        PrintStream logout = settings.log.getOutput();
 
-        if (session.sendToDaemon != null)
+        if (settings.sendToDaemon != null)
         {
-            Socket socket = new Socket(session.sendToDaemon, 2159);
+            Socket socket = new Socket(settings.sendToDaemon, 2159);
 
             ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
             BufferedReader inp = new BufferedReader(new InputStreamReader(socket.getInputStream()));
 
             Runtime.getRuntime().addShutdownHook(new ShutDown(socket, out));
 
-            out.writeObject(session);
+            out.writeObject(settings);
 
             String line;
 
@@ -72,7 +89,7 @@ public final class Stress
                         break;
                     }
 
-                    outStream.println(line);
+                    logout.println(line);
                 }
             }
             catch (SocketException e)
@@ -88,10 +105,8 @@ public final class Stress
         }
         else
         {
-            StressAction stressAction = new StressAction(session, outStream);
-            stressAction.start();
-            stressAction.join();
-            System.exit(stressAction.getReturnCode());
+            StressAction stressAction = new StressAction(settings, logout);
+            stressAction.run();
         }
     }
 
@@ -100,15 +115,7 @@ public final class Stress
      */
     public static void printHelpMessage()
     {
-        System.out.println("Usage: ./bin/cassandra-stress [options]\n\nOptions:");
-
-        for(Object o : Session.availableOptions.getOptions())
-        {
-            Option option = (Option) o;
-            String upperCaseName = option.getLongOpt().toUpperCase();
-            System.out.println(String.format("-%s%s, --%s%s%n\t\t%s%n", option.getOpt(), (option.hasArg()) ? " "+upperCaseName : "",
-                                                            option.getLongOpt(), (option.hasArg()) ? "="+upperCaseName : "", option.getDescription()));
-        }
+        StressSettings.printHelp();
     }
 
     private static class ShutDown extends Thread

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 7098d0b..0312093 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -17,322 +17,527 @@
  */
 package org.apache.cassandra.stress;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.io.PrintStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.common.util.concurrent.RateLimiter;
-import com.yammer.metrics.stats.Snapshot;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.cassandra.stress.operations.*;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.stress.settings.*;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.transport.SimpleClient;
 
-public class StressAction extends Thread
+public class StressAction implements Runnable
 {
-    /**
-     * Producer-Consumer model: 1 producer, N consumers
-     */
-    private final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
 
-    private final Session client;
+    private final StressSettings settings;
     private final PrintStream output;
 
-    private volatile boolean stop = false;
-
-    public static final int SUCCESS = 0;
-    public static final int FAILURE = 1;
-
-    private volatile int returnCode = -1;
-
-    public StressAction(Session session, PrintStream out)
+    public StressAction(StressSettings settings, PrintStream out)
     {
-        client = session;
+        this.settings = settings;
         output = out;
     }
 
     public void run()
     {
-        Snapshot latency;
-        long oldLatency;
-        int epoch, total, oldTotal, keyCount, oldKeyCount;
-
         // creating keyspace and column families
-        if (client.getOperation() == Stress.Operations.INSERT || client.getOperation() == Stress.Operations.COUNTER_ADD)
-            client.createKeySpaces();
+        settings.maybeCreateKeyspaces();
 
-        int threadCount = client.getThreads();
-        Consumer[] consumers = new Consumer[threadCount];
+        warmup(settings.command.type, settings.command);
 
-        output.println("total,interval_op_rate,interval_key_rate,latency,95th,99.9th,elapsed_time");
+        output.println("Sleeping 2s...");
+        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
 
-        int itemsPerThread = client.getKeysPerThread();
-        int modulo = client.getNumKeys() % threadCount;
-        RateLimiter rateLimiter = RateLimiter.create(client.getMaxOpsPerSecond());
+        boolean success;
+        if (settings.rate.auto)
+            success = runAuto();
+        else
+            success = null != run(settings.command.type, settings.rate.threadCount, settings.command.count, output);
 
-        // creating required type of the threads for the test
-        for (int i = 0; i < threadCount; i++) {
-            if (i == threadCount - 1)
-                itemsPerThread += modulo; // last one is going to handle N + modulo items
+        if (success)
+            output.println("END");
+        else
+            output.println("FAILURE");
 
-            consumers[i] = new Consumer(itemsPerThread, rateLimiter);
-        }
+        settings.disconnect();
+    }
 
-        Producer producer = new Producer();
-        producer.start();
+    // type provided separately to support recursive call for mixed command with each command type it is performing
+    private void warmup(Command type, SettingsCommand command)
+    {
+        // warmup - do 50k iterations; by default hotspot compiles methods after 10k invocations
+        PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } );
+        int iterations;
+        switch (type.category)
+        {
+            case BASIC:
+                iterations = 50000;
+                break;
+            case MIXED:
+                for (Command subtype : ((SettingsCommandMixed) command).getCommands())
+                    warmup(subtype, command);
+                return;
+            case MULTI:
+                int keysAtOnce = ((SettingsCommandMulti) command).keysAtOnce;
+                iterations = Math.min(50000, (int) Math.ceil(500000d / keysAtOnce));
+                break;
+            default:
+                throw new IllegalStateException();
+        }
+        output.println(String.format("Warming up %s with %d iterations...", type, iterations));
+        run(type, 20, iterations, warmupOutput);
+    }
 
-        // starting worker threads
-        for (int i = 0; i < threadCount; i++)
-            consumers[i].start();
+    // TODO : permit varying more than just thread count
+    // TODO : vary thread count based on percentage improvement of previous increment, not by fixed amounts
+    private boolean runAuto()
+    {
+        int prevThreadCount = -1;
+        int threadCount = settings.rate.minAutoThreads;
+        List<StressMetrics> results = new ArrayList<>();
+        List<String> runIds = new ArrayList<>();
+        do
+        {
+            output.println(String.format("Running with %d threadCount", threadCount));
 
-        // initialization of the values
-        boolean terminate = false;
-        epoch = total = keyCount = 0;
+            StressMetrics result = run(settings.command.type, threadCount, settings.command.count, output);
+            if (result == null)
+                return false;
+            results.add(result);
 
-        int interval = client.getProgressInterval();
-        int epochIntervals = client.getProgressInterval() * 10;
-        long testStartTime = System.nanoTime();
-        
-        StressStatistics stats = new StressStatistics(client, output);
+            if (prevThreadCount > 0)
+                System.out.println(String.format("Improvement over %d threadCount: %.0f%%",
+                        prevThreadCount, 100 * averageImprovement(results, 1)));
 
-        while (!terminate)
-        {
-            if (stop)
-            {
-                producer.stopProducer();
-
-                for (Consumer consumer : consumers)
-                    consumer.stopConsume();
+            runIds.add(threadCount + " threadCount");
+            prevThreadCount = threadCount;
+            if (threadCount < 16)
+                threadCount *= 2;
+            else
+                threadCount *= 1.5;
 
+            if (!results.isEmpty() && threadCount > settings.rate.maxAutoThreads)
                 break;
+
+            if (settings.command.type.updates)
+            {
+                // pause an arbitrary period of time to let the commit log flush, etc. shouldn't make much difference
+                // as we only increase load, never decrease it
+                output.println("Sleeping for 15s");
+                try
+                {
+                    Thread.sleep(15 * 1000);
+                } catch (InterruptedException e)
+                {
+                    return false;
+                }
             }
+            // run until we have not improved throughput significantly for previous three runs
+        } while (hasAverageImprovement(results, 3, 0) && hasAverageImprovement(results, 5, settings.command.targetUncertainty));
 
-            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        // summarise all results
+        StressMetrics.summarise(runIds, results, output);
+        return true;
+    }
 
-            int alive = 0;
-            for (Thread thread : consumers)
-                if (thread.isAlive()) alive++;
+    private boolean hasAverageImprovement(List<StressMetrics> results, int count, double minImprovement)
+    {
+        if (results.size() < count + 1)
+            return true;
+        return averageImprovement(results, count) >= minImprovement;
+    }
 
-            if (alive == 0)
-                terminate = true;
+    private double averageImprovement(List<StressMetrics> results, int count)
+    {
+        double improvement = 0;
+        for (int i = results.size() - count ; i < results.size() ; i++)
+        {
+            double prev = results.get(i - 1).getTiming().getHistory().realOpRate();
+            double cur = results.get(i).getTiming().getHistory().realOpRate();
+            improvement += (cur - prev) / prev;
+        }
+        return improvement / count;
+    }
 
-            epoch++;
+    private StressMetrics run(Command type, int threadCount, long opCount, PrintStream output)
+    {
 
-            if (terminate || epoch > epochIntervals)
-            {
-                epoch = 0;
+        output.println(String.format("Running %s with %d threads %s",
+                type.toString(),
+                threadCount,
+                opCount > 0 ? " for " + opCount + " iterations" : "until stderr of mean < " + settings.command.targetUncertainty));
+        final WorkQueue workQueue;
+        if (opCount < 0)
+            workQueue = new ContinuousWorkQueue(50);
+        else
+            workQueue = FixedWorkQueue.build(opCount);
+
+        RateLimiter rateLimiter = null;
+        // TODO : move this to a new queue wrapper that gates progress based on a poisson (or configurable) distribution
+        if (settings.rate.opRateTargetPerSecond > 0)
+            rateLimiter = RateLimiter.create(settings.rate.opRateTargetPerSecond);
+
+        final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis);
+
+        final CountDownLatch done = new CountDownLatch(threadCount);
+        final Consumer[] consumers = new Consumer[threadCount];
+        for (int i = 0; i < threadCount; i++)
+            consumers[i] = new Consumer(type, done, workQueue, metrics, rateLimiter);
 
-                oldTotal = total;
-                oldKeyCount = keyCount;
+        // starting worker threadCount
+        for (int i = 0; i < threadCount; i++)
+            consumers[i].start();
 
-                total = client.operations.get();
-                keyCount = client.keys.get();
-                latency = client.latency.getSnapshot();
+        metrics.start();
 
-                int opDelta = total - oldTotal;
-                int keyDelta = keyCount - oldKeyCount;
+        if (opCount <= 0)
+        {
+            try
+            {
+                metrics.waitUntilConverges(settings.command.targetUncertainty,
+                        settings.command.minimumUncertaintyMeasurements,
+                        settings.command.maximumUncertaintyMeasurements);
+            } catch (InterruptedException e) { }
+            workQueue.stop();
+        }
 
-                long currentTimeInSeconds = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - testStartTime);
+        try
+        {
+            done.await();
+            metrics.stop();
+        } catch (InterruptedException e) {}
 
-                output.println(String.format("%d,%d,%d,%.1f,%.1f,%.1f,%d",
-                                             total,
-                                             opDelta / interval,
-                                             keyDelta / interval,
-                                             latency.getMedian(), latency.get95thPercentile(), latency.get999thPercentile(),
-                                             currentTimeInSeconds));
+        if (metrics.wasCancelled())
+            return null;
 
-                if (client.outputStatistics()) {
-                    stats.addIntervalStats(total, 
-                                           opDelta / interval, 
-                                           keyDelta / interval, 
-                                           latency, 
-                                           currentTimeInSeconds);
-                        }
-            }
-        }
+        metrics.summarise();
 
-        // if any consumer failed, set the return code to failure.
-        returnCode = SUCCESS;
-        if (producer.isAlive())
-        {
-            producer.interrupt(); // if producer is still alive it means that we had errors in the consumers
-            returnCode = FAILURE;
-        }
+        boolean success = true;
         for (Consumer consumer : consumers)
-            if (consumer.getReturnCode() == FAILURE)
-                returnCode = FAILURE;
-
-        if (returnCode == SUCCESS) {            
-            if (client.outputStatistics())
-                stats.printStats();
-            // marking an end of the output to the client
-            output.println("END");            
-        } else {
-            output.println("FAILURE");
-        }
+            success &= consumer.success;
 
-    }
+        if (!success)
+            return null;
 
-    public int getReturnCode()
-    {
-        return returnCode;
+        return metrics;
     }
 
-    /**
-     * Produces exactly N items (awaits each to be consumed)
-     */
-    private class Producer extends Thread
+    private class Consumer extends Thread
     {
-        private volatile boolean stop = false;
+
+        private final Operation.State state;
+        private final RateLimiter rateLimiter;
+        private volatile boolean success = true;
+        private final WorkQueue workQueue;
+        private final CountDownLatch done;
+
+        public Consumer(Command type, CountDownLatch done, WorkQueue workQueue, StressMetrics metrics, RateLimiter rateLimiter)
+        {
+            this.done = done;
+            this.rateLimiter = rateLimiter;
+            this.workQueue = workQueue;
+            this.state = new Operation.State(type, settings, metrics);
+        }
 
         public void run()
         {
-            for (int i = 0; i < client.getNumKeys(); i++)
+
+            try
             {
-                if (stop)
-                    break;
 
-                try
+                SimpleClient sclient = null;
+                ThriftClient tclient = null;
+                JavaDriverClient jclient = null;
+
+                switch (settings.mode.api)
                 {
-                    operations.put(createOperation(i % client.getNumDifferentKeys()));
+                    case JAVA_DRIVER_NATIVE:
+                        jclient = settings.getJavaDriverClient();
+                        break;
+                    case SIMPLE_NATIVE:
+                        sclient = settings.getSimpleNativeClient();
+                        break;
+                    case THRIFT:
+                        tclient = settings.getThriftClient();
+                        break;
+                    case THRIFT_SMART:
+                        tclient = settings.getSmartThriftClient();
+                        break;
+                    default:
+                        throw new IllegalStateException();
                 }
-                catch (InterruptedException e)
+
+                Work work;
+                while ( null != (work = workQueue.poll()) )
                 {
-                    if (e.getMessage() != null)
-                        System.err.println("Producer error - " + e.getMessage());
-                    return;
+
+                    if (rateLimiter != null)
+                        rateLimiter.acquire(work.count);
+
+                    for (int i = 0 ; i < work.count ; i++)
+                    {
+                        try
+                        {
+                            Operation op = createOperation(state, i + work.offset);
+                            switch (settings.mode.api)
+                            {
+                                case JAVA_DRIVER_NATIVE:
+                                    op.run(jclient);
+                                    break;
+                                case SIMPLE_NATIVE:
+                                    op.run(sclient);
+                                    break;
+                                default:
+                                    op.run(tclient);
+                            }
+                        } catch (Exception e)
+                        {
+                            if (output == null)
+                            {
+                                System.err.println(e.getMessage());
+                                success = false;
+                                System.exit(-1);
+                            }
+
+                            e.printStackTrace(output);
+                            success = false;
+                            workQueue.stop();
+                            state.metrics.cancel();
+                            return;
+                        }
+                    }
                 }
+
             }
+            finally
+            {
+                done.countDown();
+                state.timer.close();
+            }
+
         }
 
-        public void stopProducer()
+    }
+
+    private interface WorkQueue
+    {
+        // null indicates consumer should terminate
+        Work poll();
+
+        // signal all consumers to terminate
+        void stop();
+    }
+
+    private static final class Work
+    {
+        // index of operations
+        final long offset;
+
+        // how many operations to perform
+        final int count;
+
+        public Work(long offset, int count)
         {
-            stop = true;
+            this.offset = offset;
+            this.count = count;
         }
     }
 
-    /**
-     * Each consumes exactly N items from queue
-     */
-    private class Consumer extends Thread
+    private static final class FixedWorkQueue implements WorkQueue
     {
-        private final int items;
-        private final RateLimiter rateLimiter;
-        private volatile boolean stop = false;
-        private volatile int returnCode = StressAction.SUCCESS;
 
-        public Consumer(int toConsume, RateLimiter rateLimiter)
+        final ArrayBlockingQueue<Work> work;
+        volatile boolean stop = false;
+
+        public FixedWorkQueue(ArrayBlockingQueue<Work> work)
         {
-            items = toConsume;
-            this.rateLimiter = rateLimiter;
+            this.work = work;
         }
 
-        public void run()
+        @Override
+        public Work poll()
+        {
+            if (stop)
+                return null;
+            return work.poll();
+        }
+
+        @Override
+        public void stop()
+        {
+            stop = true;
+        }
+
+        static FixedWorkQueue build(long operations)
         {
-            if (client.use_native_protocol)
+            // target splitting into around 50-500k items, with a minimum size of 20
+            if (operations > Integer.MAX_VALUE * (1L << 19))
+                throw new IllegalStateException("Cannot currently support more than approx 2^50 operations for one stress run. This is a LOT.");
+            int batchSize = (int) (operations / (1 << 19));
+            if (batchSize < 20)
+                batchSize = 20;
+            ArrayBlockingQueue<Work> work = new ArrayBlockingQueue<Work>(
+                    (int) ((operations / batchSize)
+                  + (operations % batchSize == 0 ? 0 : 1))
+            );
+            long offset = 0;
+            while (offset < operations)
             {
-                SimpleClient connection = client.getNativeClient();
+                work.add(new Work(offset, (int) Math.min(batchSize, operations - offset)));
+                offset += batchSize;
+            }
+            return new FixedWorkQueue(work);
+        }
 
-                for (int i = 0; i < items; i++)
-                {
-                    if (stop)
-                        break;
+    }
 
-                    try
-                    {
-                        rateLimiter.acquire();
-                        operations.take().run(connection); // running job
-                    }
-                    catch (Exception e)
-                    {
-                        if (output == null)
-                        {
-                            System.err.println(e.getMessage());
-                            returnCode = StressAction.FAILURE;
-                            System.exit(-1);
-                        }
+    private static final class ContinuousWorkQueue implements WorkQueue
+    {
 
-                        output.println(e.getMessage());
-                        returnCode = StressAction.FAILURE;
-                        break;
-                    }
-                }
-            }
-            else
-            {
-                CassandraClient connection = client.getClient();
+        final AtomicLong offset = new AtomicLong();
+        final int batchSize;
+        volatile boolean stop = false;
 
-                for (int i = 0; i < items; i++)
-                {
-                    if (stop)
-                        break;
+        private ContinuousWorkQueue(int batchSize)
+        {
+            this.batchSize = batchSize;
+        }
 
-                    try
-                    {
-                        rateLimiter.acquire();
-                        operations.take().run(connection); // running job
-                    }
-                    catch (Exception e)
-                    {
-                        if (output == null)
-                        {
-                            System.err.println(e.getMessage());
-                            returnCode = StressAction.FAILURE;
-                            System.exit(-1);
-                        }
+        @Override
+        public Work poll()
+        {
+            if (stop)
+                return null;
+            return new Work(nextOffset(), batchSize);
+        }
 
-                        output.println(e.getMessage());
-                        returnCode = StressAction.FAILURE;
-                        break;
-                    }
-                }
+        private long nextOffset()
+        {
+            final int inc = batchSize;
+            while (true)
+            {
+                final long cur = offset.get();
+                if (offset.compareAndSet(cur, cur + inc))
+                    return cur;
             }
         }
 
-        public void stopConsume()
+        @Override
+        public void stop()
         {
             stop = true;
         }
 
-        public int getReturnCode()
-        {
-            return returnCode;
-        }
     }
 
-    private Operation createOperation(int index)
+    private Operation createOperation(Operation.State state, long index)
+    {
+        return createOperation(state.type, state, index);
+    }
+    private Operation createOperation(Command type, Operation.State state, long index)
     {
-        switch (client.getOperation())
+        switch (type)
         {
             case READ:
-                return client.isCQL() ? new CqlReader(client, index) : new Reader(client, index);
+                switch(state.settings.mode.style)
+                {
+                    case THRIFT:
+                        return new ThriftReader(state, index);
+                    case CQL:
+                    case CQL_PREPARED:
+                        return new CqlReader(state, index);
+                    default:
+                        throw new UnsupportedOperationException();
+                }
+
+
+            case COUNTERREAD:
+                switch(state.settings.mode.style)
+                {
+                    case THRIFT:
+                        return new ThriftCounterGetter(state, index);
+                    case CQL:
+                    case CQL_PREPARED:
+                        return new CqlCounterGetter(state, index);
+                    default:
+                        throw new UnsupportedOperationException();
+                }
+
+            case WRITE:
+                switch(state.settings.mode.style)
+                {
+                    case THRIFT:
+                        return new ThriftInserter(state, index);
+                    case CQL:
+                    case CQL_PREPARED:
+                        return new CqlInserter(state, index);
+                    default:
+                        throw new UnsupportedOperationException();
+                }
 
-            case COUNTER_GET:
-                return client.isCQL() ? new CqlCounterGetter(client, index) : new CounterGetter(client, index);
+            case COUNTERWRITE:
+                switch(state.settings.mode.style)
+                {
+                    case THRIFT:
+                        return new ThriftCounterAdder(state, index);
+                    case CQL:
+                    case CQL_PREPARED:
+                        return new CqlCounterAdder(state, index);
+                    default:
+                        throw new UnsupportedOperationException();
+                }
 
-            case INSERT:
-                return client.isCQL() ? new CqlInserter(client, index) : new Inserter(client, index);
+            case RANGESLICE:
+                switch(state.settings.mode.style)
+                {
+                    case THRIFT:
+                        return new ThriftRangeSlicer(state, index);
+                    case CQL:
+                    case CQL_PREPARED:
+                        return new CqlRangeSlicer(state, index);
+                    default:
+                        throw new UnsupportedOperationException();
+                }
 
-            case COUNTER_ADD:
-                return client.isCQL() ? new CqlCounterAdder(client, index) : new CounterAdder(client, index);
+            case IRANGESLICE:
+                switch(state.settings.mode.style)
+                {
+                    case THRIFT:
+                        return new ThriftIndexedRangeSlicer(state, index);
+                    case CQL:
+                    case CQL_PREPARED:
+                        return new CqlIndexedRangeSlicer(state, index);
+                    default:
+                        throw new UnsupportedOperationException();
+                }
 
-            case RANGE_SLICE:
-                return client.isCQL() ? new CqlRangeSlicer(client, index) : new RangeSlicer(client, index);
+            case READMULTI:
+                switch(state.settings.mode.style)
+                {
+                    case THRIFT:
+                        return new ThriftMultiGetter(state, index);
+                    case CQL:
+                    case CQL_PREPARED:
+                        return new CqlMultiGetter(state, index);
+                    default:
+                        throw new UnsupportedOperationException();
+                }
 
-            case INDEXED_RANGE_SLICE:
-                return client.isCQL() ? new CqlIndexedRangeSlicer(client, index) : new IndexedRangeSlicer(client, index);
+            case MIXED:
+                return createOperation(state.readWriteSelector.next(), state, index);
 
-            case MULTI_GET:
-                return client.isCQL() ? new CqlMultiGetter(client, index) : new MultiGetter(client, index);
         }
 
         throw new UnsupportedOperationException();
     }
 
-    public void stopAction()
-    {
-        stop = true;
-    }
 }