You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2013/12/24 03:08:41 UTC
[2/6] Improve Stress Tool patch by Benedict;
reviewed by Pavel Yaskevich for CASSANDRA-6199
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java
new file mode 100644
index 0000000..6cef0bf
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java
@@ -0,0 +1,130 @@
+package org.apache.cassandra.stress.settings;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.stress.generatedata.DataGenHexFromDistribution;
+import org.apache.cassandra.stress.generatedata.DataGenHexFromOpIndex;
+import org.apache.cassandra.stress.generatedata.DistributionFactory;
+import org.apache.cassandra.stress.generatedata.KeyGen;
+
+// Settings for key generation
+public class SettingsKey implements Serializable
+{
+
+ private final int keySize;
+ private final DistributionFactory distribution;
+ private final long[] range;
+
+ public SettingsKey(DistributionOptions options)
+ {
+ this.keySize = Integer.parseInt(options.size.value());
+ this.distribution = options.dist.get();
+ this.range = null;
+ }
+
+ public SettingsKey(PopulateOptions options)
+ {
+ this.keySize = Integer.parseInt(options.size.value());
+ this.distribution = null;
+ String[] bounds = options.populate.value().split("\\.\\.+");
+ this.range = new long[] { Long.parseLong(bounds[0]), Long.parseLong(bounds[1]) };
+ }
+
+ // Option Declarations
+
+ private static final class DistributionOptions extends GroupedOptions
+ {
+ final OptionDistribution dist;
+ final OptionSimple size = new OptionSimple("size=", "[0-9]+", "10", "Key size in bytes", false);
+
+ public DistributionOptions(String defaultLimit)
+ {
+ dist = new OptionDistribution("dist=", "GAUSSIAN(1.." + defaultLimit + ")");
+ }
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(dist, size);
+ }
+ }
+
+ private static final class PopulateOptions extends GroupedOptions
+ {
+ final OptionSimple populate;
+ final OptionSimple size = new OptionSimple("size=", "[0-9]+", "10", "Key size in bytes", false);
+
+ public PopulateOptions(String defaultLimit)
+ {
+ populate = new OptionSimple("populate=", "[0-9]+\\.\\.+[0-9]+",
+ "1.." + defaultLimit,
+ "Populate all keys in sequence", true);
+ }
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(populate, size);
+ }
+ }
+
+ public KeyGen newKeyGen()
+ {
+ if (range != null)
+ return new KeyGen(new DataGenHexFromOpIndex(range[0], range[1]), keySize);
+ return new KeyGen(new DataGenHexFromDistribution(distribution.get()), keySize);
+ }
+
+ // CLI Utility Methods
+
+ public static SettingsKey get(Map<String, String[]> clArgs, SettingsCommand command)
+ {
+ // set default size to number of commands requested, unless set to err convergence, then use 1M
+ String defaultLimit = command.count <= 0 ? "1000000" : Long.toString(command.count);
+
+ String[] params = clArgs.remove("-key");
+ if (params == null)
+ {
+ // return defaults:
+ switch(command.type)
+ {
+ case WRITE:
+ case COUNTERWRITE:
+ return new SettingsKey(new PopulateOptions(defaultLimit));
+ default:
+ return new SettingsKey(new DistributionOptions(defaultLimit));
+ }
+ }
+ GroupedOptions options = GroupedOptions.select(params, new PopulateOptions(defaultLimit), new DistributionOptions(defaultLimit));
+ if (options == null)
+ {
+ printHelp();
+ System.out.println("Invalid -key options provided, see output for valid options");
+ System.exit(1);
+ }
+ return options instanceof PopulateOptions ?
+ new SettingsKey((PopulateOptions) options) :
+ new SettingsKey((DistributionOptions) options);
+ }
+
+ public static void printHelp()
+ {
+ GroupedOptions.printOptions(System.out, "-key", new PopulateOptions("N"), new DistributionOptions("N"));
+ }
+
+ public static Runnable helpPrinter()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ printHelp();
+ }
+ };
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/settings/SettingsLog.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsLog.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsLog.java
new file mode 100644
index 0000000..6a8e510
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsLog.java
@@ -0,0 +1,92 @@
+package org.apache.cassandra.stress.settings;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SettingsLog implements Serializable
+{
+
+ public final boolean noSummary;
+ public final File file;
+ public final int intervalMillis;
+
+ public SettingsLog(Options options)
+ {
+ noSummary = options.noSummmary.setByUser();
+
+ if (options.outputFile.setByUser())
+ file = new File(options.outputFile.value());
+ else
+ file = null;
+
+ String interval = options.interval.value();
+ if (interval.endsWith("ms"))
+ intervalMillis = Integer.parseInt(interval.substring(0, interval.length() - 2));
+ else if (interval.endsWith("s"))
+ intervalMillis = 1000 * Integer.parseInt(interval.substring(0, interval.length() - 1));
+ else
+ intervalMillis = 1000 * Integer.parseInt(interval);
+ if (intervalMillis <= 0)
+ throw new IllegalArgumentException("Log interval must be greater than zero");
+ }
+
+ public PrintStream getOutput() throws FileNotFoundException
+ {
+ return file == null ? new PrintStream(System.out) : new PrintStream(file);
+ }
+
+ // Option Declarations
+
+ public static final class Options extends GroupedOptions
+ {
+ final OptionSimple noSummmary = new OptionSimple("no-summary", "", null, "Disable printing of aggregate statistics at the end of a test", false);
+ final OptionSimple outputFile = new OptionSimple("file=", ".*", null, "Log to a file", false);
+ final OptionSimple interval = new OptionSimple("interval=", "[0-9]+(ms|s|)", "1s", "Log progress every <value> seconds or milliseconds", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(noSummmary, outputFile, interval);
+ }
+ }
+
+ // CLI Utility Methods
+
+ public static SettingsLog get(Map<String, String[]> clArgs)
+ {
+ String[] params = clArgs.remove("-log");
+ if (params == null)
+ return new SettingsLog(new Options());
+
+ GroupedOptions options = GroupedOptions.select(params, new Options());
+ if (options == null)
+ {
+ printHelp();
+ System.out.println("Invalid -log options provided, see output for valid options");
+ System.exit(1);
+ }
+ return new SettingsLog((Options) options);
+ }
+
+ public static void printHelp()
+ {
+ GroupedOptions.printOptions(System.out, "-log", new Options());
+ }
+
+ public static Runnable helpPrinter()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ printHelp();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMisc.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMisc.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMisc.java
new file mode 100644
index 0000000..2092c02
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMisc.java
@@ -0,0 +1,200 @@
+package org.apache.cassandra.stress.settings;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.stress.generatedata.Distribution;
+
+public class SettingsMisc implements Serializable
+{
+
+ static boolean maybeDoSpecial(Map<String, String[]> clArgs)
+ {
+ if (maybePrintHelp(clArgs))
+ return true;
+ if (maybePrintDistribution(clArgs))
+ return true;
+ return false;
+ }
+
+ static final class PrintDistribution extends GroupedOptions
+ {
+ final OptionDistribution dist = new OptionDistribution("dist=", null);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(dist);
+ }
+ }
+
+ static boolean maybePrintDistribution(Map<String, String[]> clArgs)
+ {
+ final String[] args = clArgs.get("print");
+ if (args == null)
+ return false;
+ final PrintDistribution dist = new PrintDistribution();
+ if (null == GroupedOptions.select(args, dist))
+ {
+ printHelpPrinter().run();
+ System.out.println("Invalid print options provided, see output for valid options");
+ System.exit(1);
+ }
+ printDistribution(dist.dist.get().get());
+ return true;
+ }
+
+ static void printDistribution(Distribution dist)
+ {
+ PrintStream out = System.out;
+ out.println("% of samples Range % of total");
+ String format = "%-16.1f%-12d%12.1f";
+ double rangemax = dist.inverseCumProb(1d) / 100d;
+ for (double d : new double[] { 0.1d, 0.2d, 0.3d, 0.4d, 0.5d, 0.6d, 0.7d, 0.8d, 0.9d, 0.95d, 0.99d, 1d })
+ {
+ double sampleperc = d * 100;
+ long max = dist.inverseCumProb(d);
+ double rangeperc = max/ rangemax;
+ out.println(String.format(format, sampleperc, max, rangeperc));
+ }
+ }
+
+ private static boolean maybePrintHelp(Map<String, String[]> clArgs)
+ {
+ if (!clArgs.containsKey("-?") && !clArgs.containsKey("help"))
+ return false;
+ String[] params = clArgs.remove("-?");
+ if (params == null)
+ params = clArgs.remove("help");
+ if (params.length == 0)
+ {
+ if (!clArgs.isEmpty())
+ {
+ if (clArgs.size() == 1)
+ {
+ String p = clArgs.keySet().iterator().next();
+ if (clArgs.get(p).length == 0)
+ params = new String[] {p};
+ }
+ }
+ else
+ {
+ printHelp();
+ return true;
+ }
+ }
+ if (params.length == 1)
+ {
+ printHelp(params[0]);
+ return true;
+ }
+ throw new IllegalArgumentException("Invalid command/option provided to help");
+ }
+
+ public static void printHelp()
+ {
+ System.out.println("Usage: ./bin/cassandra-stress <command> [options]");
+ System.out.println();
+ System.out.println("---Commands---");
+ for (Command cmd : Command.values())
+ {
+ System.out.println(String.format("%-20s : %s", cmd.toString().toLowerCase(), cmd.description));
+ }
+ System.out.println();
+ System.out.println("---Options---");
+ for (CliOption cmd : CliOption.values())
+ {
+ System.out.println(String.format("-%-20s : %s", cmd.toString().toLowerCase(), cmd.description));
+ }
+ }
+
+ public static void printHelp(String command)
+ {
+ Command cmd = Command.get(command);
+ if (cmd != null)
+ {
+ cmd.printHelp();
+ return;
+ }
+ CliOption opt = CliOption.get(command);
+ if (opt != null)
+ {
+ opt.printHelp();
+ return;
+ }
+ printHelp();
+ throw new IllegalArgumentException("Invalid command or option provided to command help");
+ }
+
+ public static Runnable helpHelpPrinter()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ System.out.println("Usage: ./bin/cassandra-stress help <command|option>");
+ System.out.println("Commands:");
+ for (Command cmd : Command.values())
+ System.out.println(" " + cmd.toString().toLowerCase() + (cmd.extraName != null ? ", " + cmd.extraName : ""));
+ System.out.println("Options:");
+ for (CliOption op : CliOption.values())
+ System.out.println(" -" + op.toString().toLowerCase() + (op.extraName != null ? ", " + op.extraName : ""));
+ }
+ };
+ }
+
+ public static Runnable printHelpPrinter()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ GroupedOptions.printOptions(System.out, "print", new GroupedOptions()
+ {
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(new OptionDistribution("dist=", null));
+ }
+ });
+ }
+ };
+ }
+
+ public static Runnable sendToDaemonHelpPrinter()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ System.out.println("Usage: -sendToDaemon <host>");
+ System.out.println();
+ System.out.println("Specify a host running the stress server to send this stress command to");
+ }
+ };
+ }
+
+ public static String getSendToDaemon(Map<String, String[]> clArgs)
+ {
+ String[] params = clArgs.remove("-send-to");
+ if (params == null)
+ params = clArgs.remove("-sendto");
+ if (params == null)
+ return null;
+ if (params.length != 1)
+ {
+ sendToDaemonHelpPrinter().run();
+ System.out.println("Invalid -send-to specifier: " + Arrays.toString(params));
+ System.exit(1);
+ }
+ return params[0];
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java
new file mode 100644
index 0000000..1800b28
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java
@@ -0,0 +1,154 @@
+package org.apache.cassandra.stress.settings;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import com.datastax.driver.core.ProtocolOptions;
+
+public class SettingsMode implements Serializable
+{
+
+ public final ConnectionAPI api;
+ public final ConnectionStyle style;
+ public final CqlVersion cqlVersion;
+ private final String compression;
+
+ public SettingsMode(GroupedOptions options)
+ {
+ if (options instanceof Cql3Options)
+ {
+ cqlVersion = CqlVersion.CQL3;
+ Cql3Options opts = (Cql3Options) options;
+ api = opts.useNative.setByUser() ? ConnectionAPI.JAVA_DRIVER_NATIVE : ConnectionAPI.THRIFT;
+ style = opts.usePrepared.setByUser() ? ConnectionStyle.CQL_PREPARED : ConnectionStyle.CQL;
+ compression = ProtocolOptions.Compression.valueOf(opts.useCompression.value().toUpperCase()).name();
+ }
+ else if (options instanceof Cql3SimpleNativeOptions)
+ {
+ cqlVersion = CqlVersion.CQL3;
+ Cql3SimpleNativeOptions opts = (Cql3SimpleNativeOptions) options;
+ api = ConnectionAPI.SIMPLE_NATIVE;
+ style = opts.usePrepared.setByUser() ? ConnectionStyle.CQL_PREPARED : ConnectionStyle.CQL;
+ compression = ProtocolOptions.Compression.NONE.name();
+ }
+ else if (options instanceof Cql2Options)
+ {
+ cqlVersion = CqlVersion.CQL2;
+ api = ConnectionAPI.THRIFT;
+ Cql2Options opts = (Cql2Options) options;
+ style = opts.usePrepared.setByUser() ? ConnectionStyle.CQL_PREPARED : ConnectionStyle.CQL;
+ compression = ProtocolOptions.Compression.NONE.name();
+ }
+ else if (options instanceof ThriftOptions)
+ {
+ ThriftOptions opts = (ThriftOptions) options;
+ cqlVersion = CqlVersion.NOCQL;
+ api = opts.smart.setByUser() ? ConnectionAPI.THRIFT_SMART : ConnectionAPI.THRIFT;
+ style = ConnectionStyle.THRIFT;
+ compression = ProtocolOptions.Compression.NONE.name();
+ }
+ else
+ throw new IllegalStateException();
+ }
+
+ public ProtocolOptions.Compression compression()
+ {
+ return ProtocolOptions.Compression.valueOf(compression);
+ }
+
+ // Option Declarations
+
+ private static final class Cql3Options extends GroupedOptions
+ {
+ final OptionSimple api = new OptionSimple("cql3", "", null, "", true);
+ final OptionSimple useNative = new OptionSimple("native", "", null, "", false);
+ final OptionSimple usePrepared = new OptionSimple("prepared", "", null, "", false);
+ final OptionSimple useCompression = new OptionSimple("compression=", "none|lz4|snappy", "none", "", false);
+ final OptionSimple port = new OptionSimple("port=", "[0-9]+", "9046", "", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(useNative, usePrepared, api, useCompression, port);
+ }
+ }
+
+ private static final class Cql3SimpleNativeOptions extends GroupedOptions
+ {
+ final OptionSimple api = new OptionSimple("cql3", "", null, "", true);
+ final OptionSimple useSimpleNative = new OptionSimple("simplenative", "", null, "", true);
+ final OptionSimple usePrepared = new OptionSimple("prepared", "", null, "", false);
+ final OptionSimple port = new OptionSimple("port=", "[0-9]+", "9046", "", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(useSimpleNative, usePrepared, api, port);
+ }
+ }
+
+ private static final class Cql2Options extends GroupedOptions
+ {
+ final OptionSimple api = new OptionSimple("cql2", "", null, "", true);
+ final OptionSimple usePrepared = new OptionSimple("prepared", "", null, "", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(usePrepared, api);
+ }
+ }
+
+ private static final class ThriftOptions extends GroupedOptions
+ {
+ final OptionSimple api = new OptionSimple("thrift", "", null, "", true);
+ final OptionSimple smart = new OptionSimple("smart", "", null, "", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(api, smart);
+ }
+ }
+
+ // CLI Utility Methods
+
+ public static SettingsMode get(Map<String, String[]> clArgs)
+ {
+ String[] params = clArgs.remove("-mode");
+ if (params == null)
+ {
+ ThriftOptions opts = new ThriftOptions();
+ opts.smart.accept("smart");
+ return new SettingsMode(opts);
+ }
+
+ GroupedOptions options = GroupedOptions.select(params, new ThriftOptions(), new Cql2Options(), new Cql3Options(), new Cql3SimpleNativeOptions());
+ if (options == null)
+ {
+ printHelp();
+ System.out.println("Invalid -mode options provided, see output for valid options");
+ System.exit(1);
+ }
+ return new SettingsMode(options);
+ }
+
+ public static void printHelp()
+ {
+ GroupedOptions.printOptions(System.out, "-mode", new ThriftOptions(), new Cql2Options(), new Cql3Options(), new Cql3SimpleNativeOptions());
+ }
+
+ public static Runnable helpPrinter()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ printHelp();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
new file mode 100644
index 0000000..2888987
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
@@ -0,0 +1,103 @@
+package org.apache.cassandra.stress.settings;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SettingsNode implements Serializable
+{
+
+ public final List<String> nodes;
+
+ public SettingsNode(Options options)
+ {
+ if (options.file.setByUser())
+ {
+ try
+ {
+ String node;
+ List<String> tmpNodes = new ArrayList<String>();
+ BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(options.file.value())));
+ try
+ {
+ while ((node = in.readLine()) != null)
+ {
+ if (node.length() > 0)
+ tmpNodes.add(node);
+ }
+ nodes = Arrays.asList(tmpNodes.toArray(new String[tmpNodes.size()]));
+ }
+ finally
+ {
+ in.close();
+ }
+ }
+ catch(IOException ioe)
+ {
+ throw new RuntimeException(ioe);
+ }
+
+ }
+ else
+ nodes = Arrays.asList(options.list.value().split(","));
+ }
+
+ public String randomNode()
+ {
+ int index = (int) (Math.random() * nodes.size());
+ if (index >= nodes.size())
+ index = nodes.size() - 1;
+ return nodes.get(index);
+ }
+
+ // Option Declarations
+
+ public static final class Options extends GroupedOptions
+ {
+ final OptionSimple file = new OptionSimple("file=", ".*", null, "Node file (one per line)", false);
+ final OptionSimple list = new OptionSimple("", "[^=,]+(,[^=,]+)*", "localhost", "comma delimited list of hosts", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(file, list);
+ }
+ }
+
+ // CLI Utility Methods
+
+ public static SettingsNode get(Map<String, String[]> clArgs)
+ {
+ String[] params = clArgs.remove("-node");
+ if (params == null)
+ return new SettingsNode(new Options());
+
+ GroupedOptions options = GroupedOptions.select(params, new Options());
+ if (options == null)
+ {
+ printHelp();
+ System.out.println("Invalid -node options provided, see output for valid options");
+ System.exit(1);
+ }
+ return new SettingsNode((Options) options);
+ }
+
+ public static void printHelp()
+ {
+ GroupedOptions.printOptions(System.out, "-node", new Options());
+ }
+
+ public static Runnable helpPrinter()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ printHelp();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java
new file mode 100644
index 0000000..4d9b0ba
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPort.java
@@ -0,0 +1,70 @@
+package org.apache.cassandra.stress.settings;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SettingsPort implements Serializable
+{
+
+ public final int nativePort;
+ public final int thriftPort;
+
+ public SettingsPort(PortOptions options)
+ {
+ nativePort = Integer.parseInt(options.nativePort.value());
+ thriftPort = Integer.parseInt(options.thriftPort.value());
+ }
+
+ // Option Declarations
+
+ private static final class PortOptions extends GroupedOptions
+ {
+ final OptionSimple nativePort = new OptionSimple("native=", "[0-9]+", "9042", "Use this port for the Cassandra native protocol", false);
+ final OptionSimple thriftPort = new OptionSimple("thrift=", "[0-9]+", "9160", "Use this port for the thrift protocol", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(nativePort, thriftPort);
+ }
+ }
+
+ // CLI Utility Methods
+
+ public static SettingsPort get(Map<String, String[]> clArgs)
+ {
+ String[] params = clArgs.remove("-port");
+ if (params == null)
+ {
+ return new SettingsPort(new PortOptions());
+ }
+ PortOptions options = GroupedOptions.select(params, new PortOptions());
+ if (options == null)
+ {
+ printHelp();
+ System.out.println("Invalid -port options provided, see output for valid options");
+ System.exit(1);
+ }
+ return new SettingsPort(options);
+ }
+
+ public static void printHelp()
+ {
+ GroupedOptions.printOptions(System.out, "-port", new PortOptions());
+ }
+
+ public static Runnable helpPrinter()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ printHelp();
+ }
+ };
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java
new file mode 100644
index 0000000..c5aff7a
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java
@@ -0,0 +1,116 @@
+package org.apache.cassandra.stress.settings;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SettingsRate implements Serializable
+{
+
+ public final boolean auto;
+ public final int minAutoThreads;
+ public final int maxAutoThreads;
+ public final int threadCount;
+ public final int opRateTargetPerSecond;
+
+ public SettingsRate(ThreadOptions options)
+ {
+ auto = false;
+ threadCount = Integer.parseInt(options.threads.value());
+ String rateOpt = options.rate.value();
+ opRateTargetPerSecond = Integer.parseInt(rateOpt.substring(0, rateOpt.length() - 2));
+ minAutoThreads = -1;
+ maxAutoThreads = -1;
+ }
+
+ public SettingsRate(AutoOptions auto)
+ {
+ this.auto = true;
+ this.minAutoThreads = Integer.parseInt(auto.minThreads.value());
+ this.maxAutoThreads = Integer.parseInt(auto.maxThreads.value());
+ this.threadCount = -1;
+ this.opRateTargetPerSecond = 0;
+ }
+
+
+ // Option Declarations
+
+ private static final class AutoOptions extends GroupedOptions
+ {
+ final OptionSimple auto = new OptionSimple("auto", "", null, "test with increasing number of threadCount until performance plateaus", false);
+ final OptionSimple minThreads = new OptionSimple("threads>=", "[0-9]+", "4", "run at least this many clients concurrently", false);
+ final OptionSimple maxThreads = new OptionSimple("threads<=", "[0-9]+", "1000", "run at most this many clients concurrently", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(auto, minThreads, maxThreads);
+ }
+ }
+
+ private static final class ThreadOptions extends GroupedOptions
+ {
+ final OptionSimple threads = new OptionSimple("threads=", "[0-9]+", null, "run this many clients concurrently", true);
+ final OptionSimple rate = new OptionSimple("limit=", "[0-9]+/s", "0/s", "limit operations per second across all clients", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(threads, rate);
+ }
+ }
+
+ // CLI Utility Methods
+
+ public static SettingsRate get(Map<String, String[]> clArgs, SettingsCommand command)
+ {
+ String[] params = clArgs.remove("-rate");
+ if (params == null)
+ {
+ switch (command.type)
+ {
+ case WRITE:
+ case COUNTERWRITE:
+ if (command.count > 0)
+ {
+ ThreadOptions options = new ThreadOptions();
+ options.accept("threads=50");
+ return new SettingsRate(options);
+ }
+ }
+ return new SettingsRate(new AutoOptions());
+ }
+ GroupedOptions options = GroupedOptions.select(params, new AutoOptions(), new ThreadOptions());
+ if (options == null)
+ {
+ printHelp();
+ System.out.println("Invalid -rate options provided, see output for valid options");
+ System.exit(1);
+ }
+ if (options instanceof AutoOptions)
+ return new SettingsRate((AutoOptions) options);
+ else if (options instanceof ThreadOptions)
+ return new SettingsRate((ThreadOptions) options);
+ else
+ throw new IllegalStateException();
+ }
+
+ public static void printHelp()
+ {
+ GroupedOptions.printOptions(System.out, "-rate", new ThreadOptions(), new AutoOptions());
+ }
+
+ public static Runnable helpPrinter()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ printHelp();
+ }
+ };
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
new file mode 100644
index 0000000..6fc03e9
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
@@ -0,0 +1,236 @@
+package org.apache.cassandra.stress.settings;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class SettingsSchema implements Serializable
+{
+
+ public static final String DEFAULT_COMPARATOR = "AsciiType";
+ public static final String DEFAULT_VALIDATOR = "BytesType";
+
+ private final String replicationStrategy;
+ private final Map<String, String> replicationStrategyOptions;
+
+ private final IndexType indexType;
+ private final boolean replicateOnWrite;
+ private final String compression;
+ private final String compactionStrategy;
+ public final String keyspace;
+ public final String columnFamily;
+
+ public SettingsSchema(Options options)
+ {
+ replicateOnWrite = !options.noReplicateOnWrite.setByUser();
+ replicationStrategy = options.replication.getStrategy();
+ replicationStrategyOptions = options.replication.getOptions();
+ if (options.index.setByUser())
+ indexType = IndexType.valueOf(options.index.value().toUpperCase());
+ else
+ indexType = null;
+ compression = options.compression.value();
+ compactionStrategy = options.compactionStrategy.value();
+ if (compactionStrategy != null)
+ {
+ try
+ {
+ CFMetaData.createCompactionStrategy(compactionStrategy);
+ } catch (ConfigurationException e)
+ {
+ throw new IllegalArgumentException("Invalid compaction strategy: " + compactionStrategy);
+ }
+ }
+ keyspace = options.keyspace.value();
+ columnFamily = options.columnFamily.value();
+ }
+
+ private void createKeyspacesCql3(StressSettings settings)
+ {
+// settings.getJavaDriverClient().execute("create table Standard1")
+ }
+
+ public void createKeySpaces(StressSettings settings)
+ {
+ createKeySpacesThrift(settings);
+ }
+
+
+ /**
+ * Create Keyspace with Standard and Super/Counter column families
+ */
+ public void createKeySpacesThrift(StressSettings settings)
+ {
+ KsDef ksdef = new KsDef();
+
+ // column family for standard columns
+ CfDef standardCfDef = new CfDef(keyspace, columnFamily);
+ Map<String, String> compressionOptions = new HashMap<String, String>();
+ if (compression != null)
+ compressionOptions.put("sstable_compression", compression);
+
+ String comparator = settings.columns.comparator;
+ standardCfDef.setComparator_type(comparator)
+ .setDefault_validation_class(DEFAULT_VALIDATOR)
+ .setCompression_options(compressionOptions);
+
+ if (!settings.columns.useTimeUUIDComparator)
+ {
+ for (int i = 0; i < settings.columns.maxColumnsPerKey; i++)
+ {
+ standardCfDef.addToColumn_metadata(new ColumnDef(ByteBufferUtil.bytes("C" + i), "BytesType"));
+ }
+ }
+
+ if (indexType != null)
+ {
+ ColumnDef standardColumn = new ColumnDef(ByteBufferUtil.bytes("C1"), "BytesType");
+ standardColumn.setIndex_type(indexType).setIndex_name("Idx1");
+ standardCfDef.setColumn_metadata(Arrays.asList(standardColumn));
+ }
+
+ // column family with super columns
+ CfDef superCfDef = new CfDef(keyspace, "Super1")
+ .setColumn_type("Super");
+ superCfDef.setComparator_type(DEFAULT_COMPARATOR)
+ .setSubcomparator_type(comparator)
+ .setDefault_validation_class(DEFAULT_VALIDATOR)
+ .setCompression_options(compressionOptions);
+
+ // column family for standard counters
+ CfDef counterCfDef = new CfDef(keyspace, "Counter1")
+ .setComparator_type(comparator)
+ .setDefault_validation_class("CounterColumnType")
+ .setReplicate_on_write(replicateOnWrite)
+ .setCompression_options(compressionOptions);
+
+ // column family with counter super columns
+ CfDef counterSuperCfDef = new CfDef(keyspace, "SuperCounter1")
+ .setComparator_type(comparator)
+ .setDefault_validation_class("CounterColumnType")
+ .setReplicate_on_write(replicateOnWrite)
+ .setColumn_type("Super")
+ .setCompression_options(compressionOptions);
+
+ ksdef.setName(keyspace);
+ ksdef.setStrategy_class(replicationStrategy);
+
+ if (!replicationStrategyOptions.isEmpty())
+ {
+ ksdef.setStrategy_options(replicationStrategyOptions);
+ }
+
+ if (compactionStrategy != null)
+ {
+ standardCfDef.setCompaction_strategy(compactionStrategy);
+ superCfDef.setCompaction_strategy(compactionStrategy);
+ counterCfDef.setCompaction_strategy(compactionStrategy);
+ counterSuperCfDef.setCompaction_strategy(compactionStrategy);
+ }
+
+ ksdef.setCf_defs(new ArrayList<CfDef>(Arrays.asList(standardCfDef, superCfDef, counterCfDef, counterSuperCfDef)));
+
+ Cassandra.Client client = settings.getRawThriftClient(false);
+
+ try
+ {
+ client.system_add_keyspace(ksdef);
+
+ /* CQL3 counter cf */
+ client.set_cql_version("3.0.0"); // just to create counter cf for cql3
+
+ client.set_keyspace(keyspace);
+ client.execute_cql3_query(createCounterCFStatementForCQL3(settings), Compression.NONE, ConsistencyLevel.ONE);
+
+ if (settings.mode.cqlVersion.isCql())
+ client.set_cql_version(settings.mode.cqlVersion.connectVersion);
+ /* end */
+
+ System.out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", settings.node.nodes.size()));
+ Thread.sleep(settings.node.nodes.size() * 1000); // seconds
+ }
+ catch (InvalidRequestException e)
+ {
+ System.err.println("Unable to create stress keyspace: " + e.getWhy());
+ }
+ catch (Exception e)
+ {
+ System.err.println("!!!! " + e.getMessage());
+ }
+ }
+
+ private ByteBuffer createCounterCFStatementForCQL3(StressSettings options)
+ {
+ StringBuilder counter3 = new StringBuilder("CREATE TABLE \"Counter3\" (KEY blob PRIMARY KEY, ");
+
+ for (int i = 0; i < options.columns.maxColumnsPerKey; i++)
+ {
+ counter3.append("c").append(i).append(" counter");
+ if (i != options.columns.maxColumnsPerKey - 1)
+ counter3.append(", ");
+ }
+ counter3.append(");");
+
+ return ByteBufferUtil.bytes(counter3.toString());
+ }
+
+ // Option Declarations
+
+ private static final class Options extends GroupedOptions
+ {
+ final OptionReplication replication = new OptionReplication();
+ final OptionSimple index = new OptionSimple("index=", "KEYS|CUSTOM|COMPOSITES", null, "Type of index to create on needed column families (KEYS)", false);
+ final OptionSimple keyspace = new OptionSimple("keyspace=", ".*", "Keyspace1", "The keyspace name to use", false);
+ final OptionSimple columnFamily = new OptionSimple("columnfamily=", ".*", "Standard1", "The column family name to use", false);
+ final OptionSimple compactionStrategy = new OptionSimple("compaction=", ".*", null, "The compaction strategy to use", false);
+ final OptionSimple noReplicateOnWrite = new OptionSimple("no-replicate-on-write", "", null, "Set replicate_on_write to false for counters. Only counter add with CL=ONE will work", false);
+ final OptionSimple compression = new OptionSimple("compression=", ".*", null, "Specify the compression to use for sstable, default:no compression", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(replication, index, keyspace, columnFamily, compactionStrategy, noReplicateOnWrite, compression);
+ }
+ }
+
+ // CLI Utility Methods
+
+ public static SettingsSchema get(Map<String, String[]> clArgs)
+ {
+ String[] params = clArgs.remove("-schema");
+ if (params == null)
+ return new SettingsSchema(new Options());
+
+ GroupedOptions options = GroupedOptions.select(params, new Options());
+ if (options == null)
+ {
+ printHelp();
+ System.out.println("Invalid -schema options provided, see output for valid options");
+ System.exit(1);
+ }
+ return new SettingsSchema((Options) options);
+ }
+
+ public static void printHelp()
+ {
+ GroupedOptions.printOptions(System.out, "-schema", new Options());
+ }
+
+ public static Runnable helpPrinter()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ printHelp();
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java
new file mode 100644
index 0000000..3cb0402
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java
@@ -0,0 +1,121 @@
+package org.apache.cassandra.stress.settings;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.thrift.transport.TTransportFactory;
+
+public class SettingsTransport implements Serializable
+{
+
+ private final String fqFactoryClass;
+ private TTransportFactory factory;
+
+ public SettingsTransport(TOptions options)
+ {
+ if (options instanceof SSLOptions)
+ {
+ throw new UnsupportedOperationException();
+ }
+ else
+ {
+ this.fqFactoryClass = options.factory.value();
+ try
+ {
+ Class<?> clazz = Class.forName(fqFactoryClass);
+ if (!TTransportFactory.class.isAssignableFrom(clazz))
+ throw new ClassCastException();
+ // check we can instantiate it
+ clazz.newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Invalid transport factory class: " + options.factory.value(), e);
+ }
+
+ }
+ }
+
+ public synchronized TTransportFactory getFactory()
+ {
+ if (factory == null)
+ {
+ try
+ {
+ this.factory = (TTransportFactory) Class.forName(fqFactoryClass).newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ return factory;
+ }
+
+ // Option Declarations
+
+ static class TOptions extends GroupedOptions
+ {
+ final OptionSimple factory = new OptionSimple("factory=", ".*", "org.apache.cassandra.cli.transport.FramedTransportFactory", "Fully-qualified TTransportFactory class name for creating a connection. Note: For Thrift over SSL, use org.apache.cassandra.stress.SSLTransportFactory.", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(factory);
+ }
+ }
+
+ static final class SSLOptions extends TOptions
+ {
+ final OptionSimple trustStore = new OptionSimple("truststore=", ".*", null, "SSL: full path to truststore", false);
+ final OptionSimple trustStorePw = new OptionSimple("truststore-password=", ".*", null, "", false);
+ final OptionSimple protocol = new OptionSimple("ssl-protocol=", ".*", "TLS", "SSL: connections protocol to use", false);
+ final OptionSimple alg = new OptionSimple("ssl-alg=", ".*", "SunX509", "SSL: algorithm", false);
+ final OptionSimple storeType = new OptionSimple("store-type=", ".*", "TLS", "SSL: comma delimited list of encryption suites to use", false);
+ final OptionSimple ciphers = new OptionSimple("ssl-ciphers=", ".*", "TLS", "SSL: comma delimited list of encryption suites to use", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(factory, trustStore, trustStorePw, protocol, alg, storeType, ciphers);
+ }
+ }
+
+ // CLI Utility Methods
+
+ public static SettingsTransport get(Map<String, String[]> clArgs)
+ {
+ String[] params = clArgs.remove("-transport");
+ if (params == null)
+ return new SettingsTransport(new TOptions());
+
+ GroupedOptions options = GroupedOptions.select(params, new TOptions());
+ if (options == null)
+ {
+ printHelp();
+ System.out.println("Invalid -transport options provided, see output for valid options");
+ System.exit(1);
+ }
+ return new SettingsTransport((TOptions) options);
+ }
+
+ public static void printHelp()
+ {
+ GroupedOptions.printOptions(System.out, "-transport", new TOptions());
+ }
+
+ public static Runnable helpPrinter()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ printHelp();
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
new file mode 100644
index 0000000..ec4db96
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -0,0 +1,239 @@
+package org.apache.cassandra.stress.settings;
+
+import java.io.Serializable;
+import java.util.*;
+
+import com.datastax.driver.core.Metadata;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.SimpleThriftClient;
+import org.apache.cassandra.stress.util.SmartThriftClient;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.TFramedTransportFactory;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+
+public class StressSettings implements Serializable
+{
+
+ public final SettingsCommand command;
+ public final SettingsRate rate;
+ public final SettingsKey keys;
+ public final SettingsColumn columns;
+ public final SettingsLog log;
+ public final SettingsMode mode;
+ public final SettingsNode node;
+ public final SettingsSchema schema;
+ public final SettingsTransport transport;
+ public final SettingsPort port;
+ public final String sendToDaemon;
+
+ public StressSettings(SettingsCommand command, SettingsRate rate, SettingsKey keys, SettingsColumn columns, SettingsLog log, SettingsMode mode, SettingsNode node, SettingsSchema schema, SettingsTransport transport, SettingsPort port, String sendToDaemon)
+ {
+ this.command = command;
+ this.rate = rate;
+ this.keys = keys;
+ this.columns = columns;
+ this.log = log;
+ this.mode = mode;
+ this.node = node;
+ this.schema = schema;
+ this.transport = transport;
+ this.port = port;
+ this.sendToDaemon = sendToDaemon;
+ }
+
+ public SmartThriftClient getSmartThriftClient()
+ {
+ Metadata metadata = getJavaDriverClient().getCluster().getMetadata();
+ return new SmartThriftClient(this, schema.keyspace, metadata);
+ }
+
+ /**
+ * Thrift client connection
+ * @return cassandra client connection
+ */
+ public SimpleThriftClient getThriftClient()
+ {
+ return new SimpleThriftClient(getRawThriftClient(node.randomNode(), true));
+ }
+
+ public Cassandra.Client getRawThriftClient(boolean setKeyspace)
+ {
+ return getRawThriftClient(node.randomNode(), setKeyspace);
+ }
+
+ public Cassandra.Client getRawThriftClient(String host)
+ {
+ return getRawThriftClient(host, true);
+ }
+
+ public Cassandra.Client getRawThriftClient(String host, boolean setKeyspace)
+ {
+ TSocket socket = new TSocket(host, port.thriftPort);
+ Cassandra.Client client;
+
+ try
+ {
+ TTransport transport = this.transport.getFactory().getTransport(socket);
+ transport.open();
+
+ client = new Cassandra.Client(new TBinaryProtocol(transport));
+
+ if (mode.cqlVersion.isCql())
+ client.set_cql_version(mode.cqlVersion.connectVersion);
+
+ if (setKeyspace)
+ client.set_keyspace("Keyspace1");
+ }
+ catch (InvalidRequestException e)
+ {
+ throw new RuntimeException(e.getWhy());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+
+ return client;
+ }
+
+
+ public SimpleClient getSimpleNativeClient()
+ {
+ try
+ {
+ String currentNode = node.randomNode();
+ SimpleClient client = new SimpleClient(currentNode, port.nativePort);
+ client.connect(false);
+ client.execute("USE \"Keyspace1\";", org.apache.cassandra.db.ConsistencyLevel.ONE);
+ return client;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ private static volatile JavaDriverClient client;
+
+ public JavaDriverClient getJavaDriverClient()
+ {
+ if (client != null)
+ return client;
+
+ try
+ {
+ synchronized (this)
+ {
+ String currentNode = node.randomNode();
+ if (client != null)
+ return client;
+
+ JavaDriverClient c = new JavaDriverClient(currentNode, port.nativePort);
+ c.connect(mode.compression());
+ c.execute("USE \"Keyspace1\";", org.apache.cassandra.db.ConsistencyLevel.ONE);
+ return client = c;
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void maybeCreateKeyspaces()
+ {
+ if (command.type == Command.WRITE || command.type == Command.COUNTERWRITE)
+ schema.createKeySpaces(this);
+
+ }
+
+ public static StressSettings parse(String[] args)
+ {
+ final Map<String, String[]> clArgs = parseMap(args);
+ if (clArgs.containsKey("legacy"))
+ return Legacy.build(Arrays.copyOfRange(args, 1, args.length));
+ if (SettingsMisc.maybeDoSpecial(clArgs))
+ System.exit(1);
+ return get(clArgs);
+ }
+
+ public static StressSettings get(Map<String, String[]> clArgs)
+ {
+ SettingsCommand command = SettingsCommand.get(clArgs);
+ if (command == null)
+ throw new IllegalArgumentException("No command specified");
+ String sendToDaemon = SettingsMisc.getSendToDaemon(clArgs);
+ SettingsPort port = SettingsPort.get(clArgs);
+ SettingsRate rate = SettingsRate.get(clArgs, command);
+ SettingsKey keys = SettingsKey.get(clArgs, command);
+ SettingsColumn columns = SettingsColumn.get(clArgs);
+ SettingsLog log = SettingsLog.get(clArgs);
+ SettingsMode mode = SettingsMode.get(clArgs);
+ SettingsNode node = SettingsNode.get(clArgs);
+ SettingsSchema schema = SettingsSchema.get(clArgs);
+ SettingsTransport transport = SettingsTransport.get(clArgs);
+ if (!clArgs.isEmpty())
+ {
+ printHelp();
+ System.out.println("Error processing command line arguments. The following were ignored:");
+ for (Map.Entry<String, String[]> e : clArgs.entrySet())
+ {
+ System.out.print(e.getKey());
+ for (String v : e.getValue())
+ {
+ System.out.print(" ");
+ System.out.print(v);
+ }
+ System.out.println();
+ }
+ System.exit(1);
+ }
+ return new StressSettings(command, rate, keys, columns, log, mode, node, schema, transport, port, sendToDaemon);
+ }
+
+ private static Map<String, String[]> parseMap(String[] args)
+ {
+ // first is the main command/operation, so specified without a -
+ if (args.length == 0)
+ {
+ System.out.println("No command provided");
+ printHelp();
+ System.exit(1);
+ }
+ final LinkedHashMap<String, String[]> r = new LinkedHashMap<>();
+ String key = null;
+ List<String> params = new ArrayList<>();
+ for (int i = 0 ; i < args.length ; i++)
+ {
+ if (i == 0 || args[i].startsWith("-"))
+ {
+ if (i > 0)
+ r.put(key, params.toArray(new String[0]));
+ key = args[i].toLowerCase();
+ params.clear();
+ }
+ else
+ params.add(args[i]);
+ }
+ r.put(key, params.toArray(new String[0]));
+ return r;
+ }
+
+ public static void printHelp()
+ {
+ SettingsMisc.printHelp();
+ }
+
+ public synchronized void disconnect()
+ {
+ if (client == null)
+ return;
+
+ client.disconnect();
+ client = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java b/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java
deleted file mode 100644
index 5136a55..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.util;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.cassandra.thrift.Cassandra.Client;
-import org.apache.thrift.protocol.TProtocol;
-
-public class CassandraClient extends Client
-{
- public Map<Integer, Integer> preparedStatements = new HashMap<Integer, Integer>();
-
- public CassandraClient(TProtocol protocol)
- {
- super(protocol);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
new file mode 100644
index 0000000..f13c1b6
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.util;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import javax.net.ssl.SSLContext;
+
+import com.datastax.driver.core.*;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.utils.FBUtilities;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Slf4JLoggerFactory;
+
+public class JavaDriverClient
+{
+
+ static
+ {
+ InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
+ }
+
+ public final String host;
+ public final int port;
+ private final EncryptionOptions.ClientEncryptionOptions encryptionOptions;
+ private Cluster cluster;
+ private Session session;
+
+ public JavaDriverClient(String host, int port)
+ {
+ this(host, port, new EncryptionOptions.ClientEncryptionOptions());
+ }
+
+ public JavaDriverClient(String host, int port, EncryptionOptions.ClientEncryptionOptions encryptionOptions)
+ {
+ this.host = host;
+ this.port = port;
+ this.encryptionOptions = encryptionOptions;
+ }
+
+ public PreparedStatement prepare(String query)
+ {
+ return getSession().prepare(query);
+ }
+
+ public void connect(ProtocolOptions.Compression compression) throws Exception
+ {
+ Cluster.Builder clusterBuilder = Cluster.builder()
+ .addContactPoint(host).withPort(port);
+ clusterBuilder.withCompression(compression);
+ if (encryptionOptions.enabled)
+ {
+ SSLContext sslContext;
+ sslContext = SSLFactory.createSSLContext(encryptionOptions, true);
+ SSLOptions sslOptions = new SSLOptions(sslContext, encryptionOptions.cipher_suites);
+ clusterBuilder.withSSL(sslOptions);
+ }
+ cluster = clusterBuilder.build();
+ Metadata metadata = cluster.getMetadata();
+ System.out.printf("Connected to cluster: %s\n",
+ metadata.getClusterName());
+ for (Host host : metadata.getAllHosts())
+ {
+ System.out.printf("Datatacenter: %s; Host: %s; Rack: %s\n",
+ host.getDatacenter(), host.getAddress(), host.getRack());
+ }
+
+ session = cluster.connect();
+ }
+
+ public Cluster getCluster()
+ {
+ return cluster;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ public ResultSet execute(String query, org.apache.cassandra.db.ConsistencyLevel consistency)
+ {
+ SimpleStatement stmt = new SimpleStatement(query);
+ stmt.setConsistencyLevel(from(consistency));
+ return getSession().execute(stmt);
+ }
+
+ public ResultSet executePrepared(PreparedStatement stmt, List<ByteBuffer> queryParams, org.apache.cassandra.db.ConsistencyLevel consistency)
+ {
+
+ stmt.setConsistencyLevel(from(consistency));
+ BoundStatement bstmt = stmt.bind(queryParams.toArray(new ByteBuffer[queryParams.size()]));
+ return getSession().execute(bstmt);
+ }
+
+ /**
+ * Get ConsistencyLevel from a C* ConsistencyLevel. This exists in the Java Driver ConsistencyLevel,
+ * but it is not public.
+ *
+ * @param cl
+ * @return
+ */
+ ConsistencyLevel from(org.apache.cassandra.db.ConsistencyLevel cl)
+ {
+ switch (cl)
+ {
+ case ANY:
+ return com.datastax.driver.core.ConsistencyLevel.ANY;
+ case ONE:
+ return com.datastax.driver.core.ConsistencyLevel.ONE;
+ case TWO:
+ return com.datastax.driver.core.ConsistencyLevel.TWO;
+ case THREE:
+ return com.datastax.driver.core.ConsistencyLevel.THREE;
+ case QUORUM:
+ return com.datastax.driver.core.ConsistencyLevel.QUORUM;
+ case ALL:
+ return com.datastax.driver.core.ConsistencyLevel.ALL;
+ case LOCAL_QUORUM:
+ return com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM;
+ case EACH_QUORUM:
+ return com.datastax.driver.core.ConsistencyLevel.EACH_QUORUM;
+ }
+ throw new AssertionError();
+ }
+
+ public void disconnect()
+ {
+ FBUtilities.waitOnFuture(cluster.shutdown());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Operation.java b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
deleted file mode 100644
index e2e12f8..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.util;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.Map;
-import java.util.HashMap;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-
-import org.apache.cassandra.db.marshal.TimeUUIDType;
-import org.apache.cassandra.stress.Session;
-import org.apache.cassandra.stress.Stress;
-import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.thrift.Compression;
-import org.apache.cassandra.thrift.CqlPreparedResult;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Hex;
-import org.apache.cassandra.utils.UUIDGen;
-
-public abstract class Operation
-{
- public final int index;
-
- protected final Session session;
- protected static volatile Double nextGaussian = null;
-
- public Operation(int idx)
- {
- index = idx;
- session = Stress.session;
- }
-
- public Operation(Session client, int idx)
- {
- index = idx;
- session = client;
- }
-
- /**
- * Run operation
- * @param client Cassandra Thrift client connection
- * @throws IOException on any I/O error.
- */
- public abstract void run(CassandraClient client) throws IOException;
-
- public void run(SimpleClient client) throws IOException {}
-
- // Utility methods
-
- protected List<ByteBuffer> generateValues()
- {
- if (session.averageSizeValues)
- {
- return generateRandomizedValues();
- }
-
- List<ByteBuffer> values = new ArrayList<ByteBuffer>();
-
- for (int i = 0; i < session.getCardinality(); i++)
- {
- String hash = getMD5(Integer.toString(i));
- int times = session.getColumnSize() / hash.length();
- int sumReminder = session.getColumnSize() % hash.length();
-
- String value = multiplyString(hash, times) + hash.substring(0, sumReminder);
- values.add(ByteBuffer.wrap(value.getBytes()));
- }
-
- return values;
- }
-
- /**
- * Generate values of average size specified by -S, up to cardinality specified by -C
- * @return Collection of the values
- */
- protected List<ByteBuffer> generateRandomizedValues()
- {
- List<ByteBuffer> values = new ArrayList<ByteBuffer>();
-
- int limit = 2 * session.getColumnSize();
-
- for (int i = 0; i < session.getCardinality(); i++)
- {
- byte[] value = new byte[Stress.randomizer.nextInt(limit)];
- Stress.randomizer.nextBytes(value);
- values.add(ByteBuffer.wrap(value));
- }
-
- return values;
- }
-
- /**
- * key generator using Gauss or Random algorithm
- * @return byte[] representation of the key string
- */
- protected byte[] generateKey()
- {
- return (session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey();
- }
-
- /**
- * Random key generator
- * @return byte[] representation of the key string
- */
- private byte[] generateRandomKey()
- {
- String format = "%0" + session.getTotalKeysLength() + "d";
- return String.format(format, Stress.randomizer.nextInt(Stress.session.getNumDifferentKeys() - 1)).getBytes(UTF_8);
- }
-
- /**
- * Gauss key generator
- * @return byte[] representation of the key string
- */
- private byte[] generateGaussKey()
- {
- String format = "%0" + session.getTotalKeysLength() + "d";
-
- for (;;)
- {
- double token = nextGaussian(session.getMean(), session.getSigma());
-
- if (0 <= token && token < session.getNumDifferentKeys())
- {
- return String.format(format, (int) token).getBytes(UTF_8);
- }
- }
- }
-
- /**
- * Gaussian distribution.
- * @param mu is the mean
- * @param sigma is the standard deviation
- *
- * @return next Gaussian distribution number
- */
- private static double nextGaussian(int mu, float sigma)
- {
- Random random = Stress.randomizer;
-
- Double currentState = nextGaussian;
- nextGaussian = null;
-
- if (currentState == null)
- {
- double x2pi = random.nextDouble() * 2 * Math.PI;
- double g2rad = Math.sqrt(-2.0 * Math.log(1.0 - random.nextDouble()));
-
- currentState = Math.cos(x2pi) * g2rad;
- nextGaussian = Math.sin(x2pi) * g2rad;
- }
-
- return mu + currentState * sigma;
- }
-
- /**
- * MD5 string generation
- * @param input String
- * @return md5 representation of the string
- */
- private String getMD5(String input)
- {
- MessageDigest md = FBUtilities.threadLocalMD5Digest();
- byte[] messageDigest = md.digest(input.getBytes(UTF_8));
- StringBuilder hash = new StringBuilder(new BigInteger(1, messageDigest).toString(16));
-
- while (hash.length() < 32)
- hash.append("0").append(hash);
-
- return hash.toString();
- }
-
- /**
- * Equal to python/ruby - 's' * times
- * @param str String to multiple
- * @param times multiplication times
- * @return multiplied string
- */
- private String multiplyString(String str, int times)
- {
- StringBuilder result = new StringBuilder();
-
- for (int i = 0; i < times; i++)
- result.append(str);
-
- return result.toString();
- }
-
- protected ByteBuffer columnName(int index, boolean timeUUIDComparator)
- {
- return timeUUIDComparator
- ? TimeUUIDType.instance.decompose(UUIDGen.getTimeUUID())
- : ByteBufferUtil.bytes(String.format("C%d", index));
- }
-
- protected String getExceptionMessage(Exception e)
- {
- String className = e.getClass().getSimpleName();
- String message = (e instanceof InvalidRequestException) ? ((InvalidRequestException) e).getWhy() : e.getMessage();
- return (message == null) ? "(" + className + ")" : String.format("(%s): %s", className, message);
- }
-
- protected void error(String message) throws IOException
- {
- if (!session.ignoreErrors())
- throw new IOException(message);
- else
- System.err.println(message);
- }
-
- protected String getUnQuotedCqlBlob(String term, boolean isCQL3)
- {
- return getUnQuotedCqlBlob(term.getBytes(), isCQL3);
- }
-
- protected String getUnQuotedCqlBlob(byte[] term, boolean isCQL3)
- {
- return isCQL3
- ? "0x" + Hex.bytesToHex(term)
- : Hex.bytesToHex(term);
- }
-
- protected List<ByteBuffer> queryParamsAsByteBuffer(List<String> queryParams)
- {
- return Lists.transform(queryParams, new Function<String, ByteBuffer>()
- {
- public ByteBuffer apply(String param)
- {
- if (param.startsWith("0x"))
- param = param.substring(2);
- return ByteBufferUtil.hexToBytes(param);
- }
- });
- }
-
- /**
- * Constructs a CQL query string by replacing instances of the character
- * '?', with the corresponding parameter.
- *
- * @param query base query string to format
- * @param parms sequence of string query parameters
- * @return formatted CQL query string
- */
- protected static String formatCqlQuery(String query, List<String> parms)
- {
- int marker, position = 0;
- StringBuilder result = new StringBuilder();
-
- if (-1 == (marker = query.indexOf('?')) || parms.size() == 0)
- return query;
-
- for (String parm : parms)
- {
- result.append(query.substring(position, marker));
- result.append(parm);
-
- position = marker + 1;
- if (-1 == (marker = query.indexOf('?', position + 1)))
- break;
- }
-
- if (position < query.length())
- result.append(query.substring(position));
-
- return result.toString();
- }
-
- protected Integer getPreparedStatement(CassandraClient client, String cqlQuery) throws Exception
- {
- Integer statementId = client.preparedStatements.get(cqlQuery.hashCode());
- if (statementId == null)
- {
- CqlPreparedResult response = session.cqlVersion.startsWith("3")
- ? client.prepare_cql3_query(ByteBufferUtil.bytes(cqlQuery), Compression.NONE)
- : client.prepare_cql_query(ByteBufferUtil.bytes(cqlQuery), Compression.NONE);
- statementId = response.itemId;
- client.preparedStatements.put(cqlQuery.hashCode(), statementId);
- }
-
- return statementId;
- }
-
- private static final Map<Integer, byte[]> preparedStatementsNative = new HashMap<Integer, byte[]>();
-
- protected static byte[] getPreparedStatement(SimpleClient client, String cqlQuery) throws Exception
- {
- byte[] statementId = preparedStatementsNative.get(cqlQuery.hashCode());
- if (statementId == null)
- {
- statementId = client.prepare(cqlQuery).statementId.bytes;
- preparedStatementsNative.put(cqlQuery.hashCode(), statementId);
- }
- return statementId;
- }
-
- protected String wrapInQuotesIfRequired(String string)
- {
- return session.cqlVersion.startsWith("3")
- ? "\"" + string + "\""
- : string;
- }
-
- public interface CQLQueryExecutor
- {
- public boolean execute(String query, List<String> queryParameters) throws Exception;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/SampleOfLongs.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/SampleOfLongs.java b/tools/stress/src/org/apache/cassandra/stress/util/SampleOfLongs.java
new file mode 100644
index 0000000..af1381b
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/SampleOfLongs.java
@@ -0,0 +1,107 @@
+package org.apache.cassandra.stress.util;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+// represents a sample of long (latencies) together with the probability of selection of each sample (i.e. the ratio of
+// samples to total number of events). This is used to ensure that, when merging, the result has samples from each
+// with equal probability
+public final class SampleOfLongs
+{
+
+ // nanos
+ final long[] sample;
+
+ // probability with which each sample was selected
+ final double p;
+
+ SampleOfLongs(long[] sample, int p)
+ {
+ this.sample = sample;
+ this.p = 1 / (float) p;
+ }
+
+ SampleOfLongs(long[] sample, double p)
+ {
+ this.sample = sample;
+ this.p = p;
+ }
+
+ static SampleOfLongs merge(Random rnd, List<SampleOfLongs> merge, int maxSamples)
+ {
+ int maxLength = 0;
+ double targetp = 1;
+ for (SampleOfLongs sampleOfLongs : merge)
+ {
+ maxLength += sampleOfLongs.sample.length;
+ targetp = Math.min(targetp, sampleOfLongs.p);
+ }
+ long[] sample = new long[maxLength];
+ int count = 0;
+ for (SampleOfLongs latencies : merge)
+ {
+ long[] in = latencies.sample;
+ double p = targetp / latencies.p;
+ for (int i = 0 ; i < in.length ; i++)
+ if (rnd.nextDouble() < p)
+ sample[count++] = in[i];
+ }
+ if (count > maxSamples)
+ {
+ targetp = subsample(rnd, maxSamples, sample, count, targetp);
+ count = maxSamples;
+ }
+ sample = Arrays.copyOf(sample, count);
+ Arrays.sort(sample);
+ return new SampleOfLongs(sample, targetp);
+ }
+
+ public SampleOfLongs subsample(Random rnd, int maxSamples)
+ {
+ if (maxSamples > sample.length)
+ return this;
+
+ long[] sample = this.sample.clone();
+ double p = subsample(rnd, maxSamples, sample, sample.length, this.p);
+ sample = Arrays.copyOf(sample, maxSamples);
+ return new SampleOfLongs(sample, p);
+ }
+
+ private static double subsample(Random rnd, int maxSamples, long[] sample, int count, double p)
+ {
+ // want exactly maxSamples, so select random indexes up to maxSamples
+ for (int i = 0 ; i < maxSamples ; i++)
+ {
+ int take = i + rnd.nextInt(count - i);
+ long tmp = sample[i];
+ sample[i] = sample[take];
+ sample[take] = tmp;
+ }
+
+ // calculate new p; have selected with probability maxSamples / count
+ // so multiply p by this probability
+ p *= maxSamples / (double) sample.length;
+ return p;
+ }
+
+ public double medianLatency()
+ {
+ if (sample.length == 0)
+ return 0;
+ return sample[sample.length >> 1] * 0.000001d;
+ }
+
+ // 0 < rank < 1
+ public double rankLatency(float rank)
+ {
+ if (sample.length == 0)
+ return 0;
+ int index = (int)(rank * sample.length);
+ if (index >= sample.length)
+ index = sample.length - 1;
+ return sample[index] * 0.000001d;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/util/SimpleThriftClient.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/SimpleThriftClient.java b/tools/stress/src/org/apache/cassandra/stress/util/SimpleThriftClient.java
new file mode 100644
index 0000000..9e8b046
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/SimpleThriftClient.java
@@ -0,0 +1,90 @@
+package org.apache.cassandra.stress.util;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.thrift.TException;
+
+public class SimpleThriftClient implements ThriftClient
+{
+
+ final Cassandra.Client client;
+ public SimpleThriftClient(Cassandra.Client client)
+ {
+ this.client = client;
+ }
+
+ public void batch_mutate(Map<ByteBuffer, Map<String, List<Mutation>>> record, ConsistencyLevel consistencyLevel) throws TException
+ {
+ client.batch_mutate(record, consistencyLevel);
+ }
+
+ @Override
+ public List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ {
+ return client.get_slice(key, column_parent, predicate, consistency_level);
+ }
+
+ @Override
+ public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ {
+ return client.get_indexed_slices(column_parent, index_clause, column_predicate, consistency_level);
+ }
+
+ @Override
+ public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ {
+ return client.get_range_slices(column_parent, predicate, range, consistency_level);
+ }
+
+ @Override
+ public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ {
+ return client.multiget_slice(keys, column_parent, predicate, consistency_level);
+ }
+
+ @Override
+ public void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
+ {
+ client.insert(key, column_parent, column, consistency_level);
+ }
+
+ @Override
+ public Integer prepare_cql3_query(String query, Compression compression) throws InvalidRequestException, TException
+ {
+ return client.prepare_cql3_query(ByteBufferUtil.bytes(query), compression).itemId;
+ }
+
+ @Override
+ public CqlResult execute_prepared_cql_query(int itemId, ByteBuffer key, List<ByteBuffer> values) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException
+ {
+ return client.execute_prepared_cql_query(itemId, values);
+ }
+
+ @Override
+ public Integer prepare_cql_query(String query, Compression compression) throws InvalidRequestException, TException
+ {
+ return client.prepare_cql_query(ByteBufferUtil.bytes(query), compression).itemId;
+ }
+
+ @Override
+ public CqlResult execute_cql3_query(String query, ByteBuffer key, Compression compression, ConsistencyLevel consistency) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException
+ {
+ return client.execute_cql3_query(ByteBufferUtil.bytes(query), compression, consistency);
+ }
+
+ @Override
+ public CqlResult execute_prepared_cql3_query(int itemId, ByteBuffer key, List<ByteBuffer> values, ConsistencyLevel consistency) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException
+ {
+ return client.execute_prepared_cql3_query(itemId, values, consistency);
+ }
+
+ @Override
+ public CqlResult execute_cql_query(String query, ByteBuffer key, Compression compression) throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException
+ {
+ return client.execute_cql_query(ByteBufferUtil.bytes(query), compression);
+ }
+}