You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2013/12/24 03:08:45 UTC
[6/6] git commit: Improve Stress Tool patch by Benedict;
reviewed by Pavel Yaskevich for CASSANDRA-6199
Improve Stress Tool
patch by Benedict; reviewed by Pavel Yaskevich for CASSANDRA-6199
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2e1e98ad
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2e1e98ad
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2e1e98ad
Branch: refs/heads/trunk
Commit: 2e1e98ad04c81900524763eddf560edc55dfb299
Parents: 34235ad
Author: belliottsmith <gi...@sub.laerad.com>
Authored: Tue Dec 24 00:33:38 2013 +0000
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Mon Dec 23 18:05:43 2013 -0800
----------------------------------------------------------------------
build.xml | 6 +
lib/commons-math3-3.2.jar | Bin 0 -> 1692782 bytes
tools/bin/cassandra-stressd | 34 +-
...2.0.0-rc2-SNAPSHOT-jar-with-dependencies.jar | Bin 0 -> 5869229 bytes
...cassandra-driver-core-2.0.0-rc2-SNAPSHOT.jar | Bin 0 -> 490145 bytes
.../org/apache/cassandra/stress/Operation.java | 204 +++++
.../org/apache/cassandra/stress/Session.java | 841 -------------------
.../src/org/apache/cassandra/stress/Stress.java | 63 +-
.../apache/cassandra/stress/StressAction.java | 639 +++++++++-----
.../apache/cassandra/stress/StressMetrics.java | 178 ++++
.../apache/cassandra/stress/StressServer.java | 90 +-
.../cassandra/stress/StressStatistics.java | 126 ---
.../cassandra/stress/generatedata/DataGen.java | 18 +
.../stress/generatedata/DataGenBytesRandom.java | 24 +
.../stress/generatedata/DataGenFactory.java | 9 +
.../stress/generatedata/DataGenHex.java | 39 +
.../DataGenHexFromDistribution.java | 45 +
.../generatedata/DataGenHexFromOpIndex.java | 27 +
.../generatedata/DataGenStringDictionary.java | 84 ++
.../generatedata/DataGenStringRepeats.java | 69 ++
.../stress/generatedata/Distribution.java | 19 +
.../generatedata/DistributionBoundApache.java | 42 +
.../generatedata/DistributionFactory.java | 10 +
.../stress/generatedata/DistributionFixed.java | 25 +
.../generatedata/DistributionOffsetApache.java | 40 +
.../generatedata/DistributionSeqBatch.java | 47 ++
.../cassandra/stress/generatedata/KeyGen.java | 33 +
.../cassandra/stress/generatedata/RowGen.java | 31 +
.../generatedata/RowGenDistributedSize.java | 84 ++
.../stress/operations/CQLOperation.java | 96 ---
.../stress/operations/CounterAdder.java | 141 ----
.../stress/operations/CounterGetter.java | 152 ----
.../stress/operations/CqlCounterAdder.java | 98 +--
.../stress/operations/CqlCounterGetter.java | 98 +--
.../operations/CqlIndexedRangeSlicer.java | 224 ++---
.../stress/operations/CqlInserter.java | 124 +--
.../stress/operations/CqlMultiGetter.java | 19 +-
.../stress/operations/CqlOperation.java | 566 +++++++++++++
.../stress/operations/CqlRangeSlicer.java | 95 +--
.../cassandra/stress/operations/CqlReader.java | 121 +--
.../stress/operations/IndexedRangeSlicer.java | 135 ---
.../cassandra/stress/operations/Inserter.java | 135 ---
.../stress/operations/MultiGetter.java | 152 ----
.../stress/operations/RangeSlicer.java | 144 ----
.../cassandra/stress/operations/Reader.java | 159 ----
.../stress/operations/ThriftCounterAdder.java | 95 +++
.../stress/operations/ThriftCounterGetter.java | 75 ++
.../operations/ThriftIndexedRangeSlicer.java | 115 +++
.../stress/operations/ThriftInserter.java | 117 +++
.../stress/operations/ThriftMultiGetter.java | 81 ++
.../stress/operations/ThriftRangeSlicer.java | 86 ++
.../stress/operations/ThriftReader.java | 76 ++
.../cassandra/stress/server/StressThread.java | 77 --
.../cassandra/stress/settings/CliOption.java | 58 ++
.../cassandra/stress/settings/Command.java | 101 +++
.../stress/settings/CommandCategory.java | 8 +
.../stress/settings/ConnectionAPI.java | 7 +
.../stress/settings/ConnectionStyle.java | 9 +
.../cassandra/stress/settings/CqlVersion.java | 48 ++
.../stress/settings/GroupedOptions.java | 104 +++
.../cassandra/stress/settings/Legacy.java | 369 ++++++++
.../cassandra/stress/settings/Option.java | 24 +
.../stress/settings/OptionDataGen.java | 177 ++++
.../stress/settings/OptionDistribution.java | 340 ++++++++
.../cassandra/stress/settings/OptionMulti.java | 107 +++
.../stress/settings/OptionReplication.java | 114 +++
.../cassandra/stress/settings/OptionSimple.java | 131 +++
.../stress/settings/SettingsColumn.java | 176 ++++
.../stress/settings/SettingsCommand.java | 159 ++++
.../stress/settings/SettingsCommandMixed.java | 184 ++++
.../stress/settings/SettingsCommandMulti.java | 69 ++
.../cassandra/stress/settings/SettingsKey.java | 130 +++
.../cassandra/stress/settings/SettingsLog.java | 92 ++
.../cassandra/stress/settings/SettingsMisc.java | 200 +++++
.../cassandra/stress/settings/SettingsMode.java | 154 ++++
.../cassandra/stress/settings/SettingsNode.java | 103 +++
.../cassandra/stress/settings/SettingsPort.java | 70 ++
.../cassandra/stress/settings/SettingsRate.java | 116 +++
.../stress/settings/SettingsSchema.java | 236 ++++++
.../stress/settings/SettingsTransport.java | 121 +++
.../stress/settings/StressSettings.java | 239 ++++++
.../cassandra/stress/util/CassandraClient.java | 34 -
.../cassandra/stress/util/JavaDriverClient.java | 148 ++++
.../apache/cassandra/stress/util/Operation.java | 334 --------
.../cassandra/stress/util/SampleOfLongs.java | 107 +++
.../stress/util/SimpleThriftClient.java | 90 ++
.../stress/util/SmartThriftClient.java | 235 ++++++
.../cassandra/stress/util/ThriftClient.java | 36 +
.../org/apache/cassandra/stress/util/Timer.java | 129 +++
.../apache/cassandra/stress/util/Timing.java | 72 ++
.../cassandra/stress/util/TimingInterval.java | 132 +++
.../cassandra/stress/util/Uncertainty.java | 81 ++
92 files changed, 7692 insertions(+), 3360 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 1ffb908..347323f 100644
--- a/build.xml
+++ b/build.xml
@@ -37,6 +37,7 @@
<property name="build.src.resources" value="${basedir}/src/resources"/>
<property name="build.src.gen-java" value="${basedir}/src/gen-java"/>
<property name="build.lib" value="${basedir}/lib"/>
+ <property name="build.tools.lib" value="${basedir}/tools/lib"/>
<property name="build.dir" value="${basedir}/build"/>
<property name="build.dir.lib" value="${basedir}/build/lib"/>
<property name="build.test.dir" value="${build.dir}/test"/>
@@ -343,6 +344,7 @@
<dependency groupId="commons-cli" artifactId="commons-cli" version="1.1"/>
<dependency groupId="commons-codec" artifactId="commons-codec" version="1.2"/>
<dependency groupId="org.apache.commons" artifactId="commons-lang3" version="3.1"/>
+ <dependency groupId="org.apache.commons" artifactId="commons-math3" version="3.2"/>
<dependency groupId="com.googlecode.concurrentlinkedhashmap" artifactId="concurrentlinkedhashmap-lru" version="1.3"/>
<dependency groupId="org.antlr" artifactId="antlr" version="3.2"/>
<dependency groupId="org.slf4j" artifactId="slf4j-api" version="1.7.2"/>
@@ -453,6 +455,7 @@
<dependency groupId="commons-cli" artifactId="commons-cli"/>
<dependency groupId="commons-codec" artifactId="commons-codec"/>
<dependency groupId="org.apache.commons" artifactId="commons-lang3"/>
+ <dependency groupId="org.apache.commons" artifactId="commons-math3"/>
<dependency groupId="com.googlecode.concurrentlinkedhashmap" artifactId="concurrentlinkedhashmap-lru"/>
<dependency groupId="org.antlr" artifactId="antlr"/>
<dependency groupId="org.slf4j" artifactId="slf4j-api"/>
@@ -703,6 +706,9 @@
<fileset dir="${build.lib}">
<include name="**/*.jar" />
</fileset>
+ <fileset dir="${build.tools.lib}">
+ <include name="**/*.jar" />
+ </fileset>
</path>
</classpath>
</javac>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/lib/commons-math3-3.2.jar
----------------------------------------------------------------------
diff --git a/lib/commons-math3-3.2.jar b/lib/commons-math3-3.2.jar
new file mode 100644
index 0000000..f8b7db2
Binary files /dev/null and b/lib/commons-math3-3.2.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/bin/cassandra-stressd
----------------------------------------------------------------------
diff --git a/tools/bin/cassandra-stressd b/tools/bin/cassandra-stressd
index 8d337e5..9110c5d 100755
--- a/tools/bin/cassandra-stressd
+++ b/tools/bin/cassandra-stressd
@@ -17,23 +17,25 @@
# limitations under the License.
DESC="Cassandra Stress Test Daemon"
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+ for include in "`dirname $0`/cassandra.in.sh" \
+ "$HOME/.cassandra.in.sh" \
+ /usr/share/cassandra/cassandra.in.sh \
+ /usr/local/share/cassandra/cassandra.in.sh \
+ /opt/cassandra/cassandra.in.sh; do
+ if [ -r $include ]; then
+ . $include
+ break
+ fi
+ done
+elif [ -r $CASSANDRA_INCLUDE ]; then
+ . $CASSANDRA_INCLUDE
+fi
-if [ "x$CLASSPATH" = "x" ]; then
-
- # execute from the build dir.
- if [ -d `dirname $0`/../../build/classes ]; then
- for directory in `dirname $0`/../../build/classes/*; do
- CLASSPATH=$CLASSPATH:$directory
- done
- else
- if [ -f `dirname $0`/../lib/stress.jar ]; then
- CLASSPATH=`dirname $0`/../lib/stress.jar
- fi
- fi
-
- for jar in `dirname $0`/../../lib/*.jar; do
- CLASSPATH=$CLASSPATH:$jar
- done
+if [ -x $JAVA_HOME/bin/java ]; then
+ JAVA=$JAVA_HOME/bin/java
+else
+ JAVA=`which java`
fi
if [ -x $JAVA_HOME/bin/java ]; then
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT-jar-with-dependencies.jar
----------------------------------------------------------------------
diff --git a/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT-jar-with-dependencies.jar b/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT-jar-with-dependencies.jar
new file mode 100644
index 0000000..1f4dafd
Binary files /dev/null and b/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT-jar-with-dependencies.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT.jar
----------------------------------------------------------------------
diff --git a/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT.jar b/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT.jar
new file mode 100644
index 0000000..c0d4242
Binary files /dev/null and b/tools/lib/cassandra-driver-core-2.0.0-rc2-SNAPSHOT.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
new file mode 100644
index 0000000..fa7a453
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.stress.generatedata.KeyGen;
+import org.apache.cassandra.stress.generatedata.RowGen;
+import org.apache.cassandra.stress.settings.Command;
+import org.apache.cassandra.stress.settings.CqlVersion;
+import org.apache.cassandra.stress.settings.SettingsCommandMixed;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public abstract class Operation
+{
+ public final long index;
+ protected final State state;
+
+ public Operation(State state, long idx)
+ {
+ index = idx;
+ this.state = state;
+ }
+
+ public static interface RunOp
+ {
+ public boolean run() throws Exception;
+ public String key();
+ public int keyCount();
+ }
+
+ // one per thread!
+ public static final class State
+ {
+
+ public final StressSettings settings;
+ public final Timer timer;
+ public final Command type;
+ public final KeyGen keyGen;
+ public final RowGen rowGen;
+ public final List<ColumnParent> columnParents;
+ public final StressMetrics metrics;
+ public final SettingsCommandMixed.CommandSelector readWriteSelector;
+ private Object cqlCache;
+
+ public State(Command type, StressSettings settings, StressMetrics metrics)
+ {
+ this.type = type;
+ this.timer = metrics.getTiming().newTimer();
+ if (type == Command.MIXED)
+ readWriteSelector = ((SettingsCommandMixed) settings.command).selector();
+ else
+ readWriteSelector = null;
+ this.settings = settings;
+ this.keyGen = settings.keys.newKeyGen();
+ this.rowGen = settings.columns.newRowGen();
+ this.metrics = metrics;
+ if (!settings.columns.useSuperColumns)
+ columnParents = Collections.singletonList(new ColumnParent(settings.schema.columnFamily));
+ else
+ {
+ ColumnParent[] cp = new ColumnParent[settings.columns.superColumns];
+ for (int i = 0 ; i < cp.length ; i++)
+ cp[i] = new ColumnParent("Super1").setSuper_column(ByteBufferUtil.bytes("S" + i));
+ columnParents = Arrays.asList(cp);
+ }
+ }
+ public boolean isCql3()
+ {
+ return settings.mode.cqlVersion == CqlVersion.CQL3;
+ }
+ public boolean isCql2()
+ {
+ return settings.mode.cqlVersion == CqlVersion.CQL2;
+ }
+ public Object getCqlCache()
+ {
+ return cqlCache;
+ }
+ public void storeCqlCache(Object val)
+ {
+ cqlCache = val;
+ }
+ }
+
+ protected ByteBuffer getKey()
+ {
+ return state.keyGen.getKeys(1, index).get(0);
+ }
+
+ protected List<ByteBuffer> getKeys(int count)
+ {
+ return state.keyGen.getKeys(count, index);
+ }
+
+ protected List<ByteBuffer> generateColumnValues()
+ {
+ return state.rowGen.generate(index);
+ }
+
+ /**
+ * Run operation
+ * @param client Cassandra Thrift client connection
+ * @throws IOException on any I/O error.
+ */
+ public abstract void run(ThriftClient client) throws IOException;
+
+ public void run(SimpleClient client) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void run(JavaDriverClient client) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void timeWithRetry(RunOp run) throws IOException
+ {
+ state.timer.start();
+
+ boolean success = false;
+ String exceptionMessage = null;
+
+ for (int t = 0; t < state.settings.command.tries; t++)
+ {
+ if (success)
+ break;
+
+ try
+ {
+ success = run.run();
+ }
+ catch (Exception e)
+ {
+ System.err.println(e);
+ exceptionMessage = getExceptionMessage(e);
+ success = false;
+ }
+ }
+
+ state.timer.stop(run.keyCount());
+
+ if (!success)
+ {
+ error(String.format("Operation [%d] retried %d times - error executing for key %s %s%n",
+ index,
+ state.settings.command.tries,
+ run.key(),
+ (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+ }
+
+ }
+
+ protected String getExceptionMessage(Exception e)
+ {
+ String className = e.getClass().getSimpleName();
+ String message = (e instanceof InvalidRequestException) ? ((InvalidRequestException) e).getWhy() : e.getMessage();
+ return (message == null) ? "(" + className + ")" : String.format("(%s): %s", className, message);
+ }
+
+ protected void error(String message) throws IOException
+ {
+ if (!state.settings.command.ignoreErrors)
+ throw new IOException(message);
+ else
+ System.err.println(message);
+ }
+
+ public static ByteBuffer getColumnNameBytes(int i)
+ {
+ return ByteBufferUtil.bytes("C" + i);
+ }
+
+ public static String getColumnName(int i)
+ {
+ return "C" + i;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/Session.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java
deleted file mode 100644
index 8d138f5..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/Session.java
+++ /dev/null
@@ -1,841 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.cli.*;
-import org.apache.commons.lang3.StringUtils;
-
-import com.yammer.metrics.Metrics;
-
-import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.cli.transport.FramedTransportFactory;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
-import org.apache.cassandra.db.ColumnFamilyType;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportFactory;
-
-public class Session implements Serializable
-{
- // command line options
- public static final Options availableOptions = new Options();
-
- public static final String KEYSPACE_NAME = "Keyspace1";
- public static final String DEFAULT_COMPARATOR = "AsciiType";
- public static final String DEFAULT_VALIDATOR = "BytesType";
-
- private static InetAddress localInetAddress;
-
- public final AtomicInteger operations = new AtomicInteger();
- public final AtomicInteger keys = new AtomicInteger();
- public final com.yammer.metrics.core.Timer latency = Metrics.newTimer(Session.class, "latency");
-
- private static final String SSL_TRUSTSTORE = "truststore";
- private static final String SSL_TRUSTSTORE_PW = "truststore-password";
- private static final String SSL_PROTOCOL = "ssl-protocol";
- private static final String SSL_ALGORITHM = "ssl-alg";
- private static final String SSL_STORE_TYPE = "store-type";
- private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
-
- static
- {
- availableOptions.addOption("h", "help", false, "Show this help message and exit");
- availableOptions.addOption("n", "num-keys", true, "Number of keys, default:1000000");
- availableOptions.addOption("F", "num-different-keys", true, "Number of different keys (if < NUM-KEYS, the same key will re-used multiple times), default:NUM-KEYS");
- availableOptions.addOption("N", "skip-keys", true, "Fraction of keys to skip initially, default:0");
- availableOptions.addOption("t", "threads", true, "Number of threads to use, default:50");
- availableOptions.addOption("c", "cells", true, "Number of cells per key, default:5");
- availableOptions.addOption("S", "column-size", true, "Size of column values in bytes, default:34");
- availableOptions.addOption("C", "cardinality", true, "Number of unique values stored in cells, default:50");
- availableOptions.addOption("d", "nodes", true, "Host nodes (comma separated), default:locahost");
- availableOptions.addOption("D", "nodesfile", true, "File containing host nodes (one per line)");
- availableOptions.addOption("s", "stdev", true, "Standard Deviation Factor, default:0.1");
- availableOptions.addOption("r", "random", false, "Use random key generator (STDEV will have no effect), default:false");
- availableOptions.addOption("f", "file", true, "Write output to given file");
- availableOptions.addOption("p", "port", true, "Thrift port, default:9160");
- availableOptions.addOption("o", "operation", true, "Operation to perform (INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET, COUNTER_ADD, COUNTER_GET), default:INSERT");
- availableOptions.addOption("u", "supercolumns", true, "Number of super columns per key, default:1");
- availableOptions.addOption("y", "family-type", true, "Column Family Type (Super, Standard), default:Standard");
- availableOptions.addOption("K", "keep-trying", true, "Retry on-going operation N times (in case of failure). positive integer, default:10");
- availableOptions.addOption("k", "keep-going", false, "Ignore errors inserting or reading (when set, --keep-trying has no effect), default:false");
- availableOptions.addOption("i", "progress-interval", true, "Progress Report Interval (seconds), default:10");
- availableOptions.addOption("g", "keys-per-call", true, "Number of keys to get_range_slices or multiget per call, default:1000");
- availableOptions.addOption("l", "replication-factor", true, "Replication Factor to use when creating needed column families, default:1");
- availableOptions.addOption("L", "enable-cql", false, "Perform queries using CQL2 (Cassandra Query Language v 2.0.0)");
- availableOptions.addOption("L3", "enable-cql3", false, "Perform queries using CQL3 (Cassandra Query Language v 3.0.0)");
- availableOptions.addOption("b", "enable-native-protocol", false, "Use the binary native protocol (only work along with -L3)");
- availableOptions.addOption("P", "use-prepared-statements", false, "Perform queries using prepared statements (only applicable to CQL).");
- availableOptions.addOption("e", "consistency-level", true, "Consistency Level to use (ONE, QUORUM, LOCAL_QUORUM, EACH_QUORUM, ALL, ANY), default:ONE");
- availableOptions.addOption("x", "create-index", true, "Type of index to create on needed column families (KEYS)");
- availableOptions.addOption("R", "replication-strategy", true, "Replication strategy to use (only on insert if keyspace does not exist), default:org.apache.cassandra.locator.SimpleStrategy");
- availableOptions.addOption("O", "strategy-properties", true, "Replication strategy properties in the following format <dc_name>:<num>,<dc_name>:<num>,...");
- availableOptions.addOption("W", "no-replicate-on-write",false, "Set replicate_on_write to false for counters. Only counter add with CL=ONE will work");
- availableOptions.addOption("V", "average-size-values", false, "Generate column values of average rather than specific size");
- availableOptions.addOption("T", "send-to", true, "Send this as a request to the stress daemon at specified address.");
- availableOptions.addOption("I", "compression", true, "Specify the compression to use for sstable, default:no compression");
- availableOptions.addOption("Q", "query-names", true, "Comma-separated list of column names to retrieve from each row.");
- availableOptions.addOption("Z", "compaction-strategy", true, "CompactionStrategy to use.");
- availableOptions.addOption("U", "comparator", true, "Cell Comparator to use. Currently supported types are: TimeUUIDType, AsciiType, UTF8Type.");
- availableOptions.addOption("tf", "transport-factory", true, "Fully-qualified TTransportFactory class name for creating a connection. Note: For Thrift over SSL, use org.apache.cassandra.stress.SSLTransportFactory.");
- availableOptions.addOption("ns", "no-statistics", false, "Turn off the aggegate statistics that is normally output after completion.");
- availableOptions.addOption("ts", SSL_TRUSTSTORE, true, "SSL: full path to truststore");
- availableOptions.addOption("tspw", SSL_TRUSTSTORE_PW, true, "SSL: full path to truststore");
- availableOptions.addOption("prtcl", SSL_PROTOCOL, true, "SSL: connections protocol to use (default: TLS)");
- availableOptions.addOption("alg", SSL_ALGORITHM, true, "SSL: algorithm (default: SunX509)");
- availableOptions.addOption("st", SSL_STORE_TYPE, true, "SSL: type of store");
- availableOptions.addOption("ciphers", SSL_CIPHER_SUITES, true, "SSL: comma-separated list of encryption suites to use");
- availableOptions.addOption("th", "throttle", true, "Throttle the total number of operations per second to a maximum amount.");
- availableOptions.addOption("un", "username", true, "Username for authentication.");
- availableOptions.addOption("pw", "password", true, "Password for authentication.");
- }
-
- private int numKeys = 1000 * 1000;
- private int numDifferentKeys = numKeys;
- private float skipKeys = 0;
- private int threads = 50;
- private int columns = 5;
- private int columnSize = 34;
- private int cardinality = 50;
- public String[] nodes = new String[] { "127.0.0.1" };
- private boolean random = false;
- private int retryTimes = 10;
- public int port = 9160;
- private int superColumns = 1;
- private String compression = null;
- private String compactionStrategy = null;
- private String username = null;
- private String password = null;
-
- private int progressInterval = 10;
- private int keysPerCall = 1000;
- private boolean replicateOnWrite = true;
- private boolean ignoreErrors = false;
- private boolean enable_cql = false;
- private boolean use_prepared = false;
- private boolean trace = false;
- private boolean captureStatistics = true;
- public boolean use_native_protocol = false;
- private double maxOpsPerSecond = Double.MAX_VALUE;
-
- private final String outFileName;
-
- private IndexType indexType = null;
- private Stress.Operations operation = Stress.Operations.INSERT;
- private ColumnFamilyType columnFamilyType = ColumnFamilyType.Standard;
- private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
- private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy";
- private Map<String, String> replicationStrategyOptions = new HashMap<String, String>();
-
- // if we know exactly column names that we want to read (set by -Q option)
- public final List<ByteBuffer> columnNames;
-
- public String cqlVersion;
-
- public final boolean averageSizeValues;
-
- // required by Gaussian distribution.
- protected int mean;
- protected float sigma;
-
- public final InetAddress sendToDaemon;
- public final String comparator;
- public final boolean timeUUIDComparator;
- public double traceProbability = 0.0;
- public EncryptionOptions encOptions = new ClientEncryptionOptions();
- public TTransportFactory transportFactory = new FramedTransportFactory();
-
- public Session(String[] arguments) throws IllegalArgumentException, SyntaxException
- {
- float STDev = 0.1f;
- CommandLineParser parser = new PosixParser();
-
- try
- {
- CommandLine cmd = parser.parse(availableOptions, arguments);
-
- if (cmd.getArgs().length > 0)
- {
- System.err.println("Application does not allow arbitrary arguments: " + StringUtils.join(cmd.getArgList(), ", "));
- System.exit(1);
- }
-
- if (cmd.hasOption("h"))
- throw new IllegalArgumentException("help");
-
- if (cmd.hasOption("n"))
- numKeys = Integer.parseInt(cmd.getOptionValue("n"));
-
- if (cmd.hasOption("F"))
- numDifferentKeys = Integer.parseInt(cmd.getOptionValue("F"));
- else
- numDifferentKeys = numKeys;
-
- if (cmd.hasOption("N"))
- skipKeys = Float.parseFloat(cmd.getOptionValue("N"));
-
- if (cmd.hasOption("t"))
- threads = Integer.parseInt(cmd.getOptionValue("t"));
-
- if (cmd.hasOption("c"))
- columns = Integer.parseInt(cmd.getOptionValue("c"));
-
- if (cmd.hasOption("S"))
- columnSize = Integer.parseInt(cmd.getOptionValue("S"));
-
- if (cmd.hasOption("C"))
- cardinality = Integer.parseInt(cmd.getOptionValue("C"));
-
- if (cmd.hasOption("d"))
- nodes = cmd.getOptionValue("d").split(",");
-
- if (cmd.hasOption("D"))
- {
- try
- {
- String node;
- List<String> tmpNodes = new ArrayList<String>();
- BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(cmd.getOptionValue("D"))));
- try
- {
- while ((node = in.readLine()) != null)
- {
- if (node.length() > 0)
- tmpNodes.add(node);
- }
- nodes = tmpNodes.toArray(new String[tmpNodes.size()]);
- }
- finally
- {
- in.close();
- }
- }
- catch(IOException ioe)
- {
- throw new RuntimeException(ioe);
- }
- }
-
- if (cmd.hasOption("s"))
- STDev = Float.parseFloat(cmd.getOptionValue("s"));
-
- if (cmd.hasOption("r"))
- random = true;
-
- outFileName = (cmd.hasOption("f")) ? cmd.getOptionValue("f") : null;
-
- if (cmd.hasOption("p"))
- port = Integer.parseInt(cmd.getOptionValue("p"));
-
- if (cmd.hasOption("o"))
- operation = Stress.Operations.valueOf(cmd.getOptionValue("o").toUpperCase());
-
- if (cmd.hasOption("u"))
- superColumns = Integer.parseInt(cmd.getOptionValue("u"));
-
- if (cmd.hasOption("y"))
- columnFamilyType = ColumnFamilyType.valueOf(cmd.getOptionValue("y"));
-
- if (cmd.hasOption("K"))
- {
- retryTimes = Integer.valueOf(cmd.getOptionValue("K"));
-
- if (retryTimes <= 0)
- {
- throw new RuntimeException("--keep-trying option value should be > 0");
- }
- }
-
- if (cmd.hasOption("k"))
- {
- retryTimes = 1;
- ignoreErrors = true;
- }
-
-
- if (cmd.hasOption("i"))
- progressInterval = Integer.parseInt(cmd.getOptionValue("i"));
-
- if (cmd.hasOption("g"))
- keysPerCall = Integer.parseInt(cmd.getOptionValue("g"));
-
- if (cmd.hasOption("th"))
- maxOpsPerSecond = Double.parseDouble(cmd.getOptionValue("th"));
-
- if (cmd.hasOption("e"))
- consistencyLevel = ConsistencyLevel.valueOf(cmd.getOptionValue("e").toUpperCase());
-
- if (cmd.hasOption("x"))
- indexType = IndexType.valueOf(cmd.getOptionValue("x").toUpperCase());
-
- if (cmd.hasOption("R"))
- replicationStrategy = cmd.getOptionValue("R");
-
- if (cmd.hasOption("l"))
- replicationStrategyOptions.put("replication_factor", String.valueOf(Integer.parseInt(cmd.getOptionValue("l"))));
- else if (replicationStrategy.endsWith("SimpleStrategy"))
- replicationStrategyOptions.put("replication_factor", "1");
-
- if (cmd.hasOption("L"))
- {
- enable_cql = true;
- cqlVersion = "2.0.0";
- }
-
- if (cmd.hasOption("L3"))
- {
- enable_cql = true;
- cqlVersion = "3.0.0";
- }
-
- if (cmd.hasOption("b"))
- {
- if (!(enable_cql && cqlVersion.startsWith("3")))
- throw new IllegalArgumentException("Cannot use binary protocol without -L3");
- use_native_protocol = true;
- }
-
- if (cmd.hasOption("P"))
- {
- if (!enable_cql)
- {
- System.err.println("-P/--use-prepared-statements is only applicable with CQL (-L/--enable-cql)");
- System.exit(-1);
- }
- use_prepared = true;
- }
-
- if (cmd.hasOption("O"))
- {
- String[] pairs = StringUtils.split(cmd.getOptionValue("O"), ',');
-
- for (String pair : pairs)
- {
- String[] keyAndValue = StringUtils.split(pair, ':');
-
- if (keyAndValue.length != 2)
- throw new RuntimeException("Invalid --strategy-properties value.");
-
- replicationStrategyOptions.put(keyAndValue[0], keyAndValue[1]);
- }
- }
-
- if (cmd.hasOption("W"))
- replicateOnWrite = false;
-
- if (cmd.hasOption("I"))
- compression = cmd.getOptionValue("I");
-
- averageSizeValues = cmd.hasOption("V");
-
- try
- {
- sendToDaemon = cmd.hasOption("send-to")
- ? InetAddress.getByName(cmd.getOptionValue("send-to"))
- : null;
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
-
- if (cmd.hasOption("Q"))
- {
- AbstractType comparator = TypeParser.parse(DEFAULT_COMPARATOR);
-
- String[] names = StringUtils.split(cmd.getOptionValue("Q"), ",");
- columnNames = new ArrayList<ByteBuffer>(names.length);
-
- for (String columnName : names)
- columnNames.add(comparator.fromString(columnName));
- }
- else
- {
- columnNames = null;
- }
-
- if (cmd.hasOption("Z"))
- {
- compactionStrategy = cmd.getOptionValue("Z");
-
- try
- {
- // validate compaction strategy class
- CFMetaData.createCompactionStrategy(compactionStrategy);
- }
- catch (ConfigurationException e)
- {
- System.err.println(e.getMessage());
- System.exit(1);
- }
- }
-
- if (cmd.hasOption("U"))
- {
- AbstractType parsed = null;
-
- try
- {
- parsed = TypeParser.parse(cmd.getOptionValue("U"));
- }
- catch (ConfigurationException e)
- {
- System.err.println(e.getMessage());
- System.exit(1);
- }
-
- comparator = cmd.getOptionValue("U");
- timeUUIDComparator = parsed instanceof TimeUUIDType;
-
- if (!(parsed instanceof TimeUUIDType || parsed instanceof AsciiType || parsed instanceof UTF8Type))
- {
- System.err.println("Currently supported types are: TimeUUIDType, AsciiType, UTF8Type.");
- System.exit(1);
- }
- }
- else
- {
- comparator = null;
- timeUUIDComparator = false;
- }
-
- if (cmd.hasOption("ns"))
- {
- captureStatistics = false;
- }
-
- if(cmd.hasOption(SSL_TRUSTSTORE))
- encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
-
- if(cmd.hasOption(SSL_TRUSTSTORE_PW))
- encOptions.truststore_password = cmd.getOptionValue(SSL_TRUSTSTORE_PW);
-
- if(cmd.hasOption(SSL_PROTOCOL))
- encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL);
-
- if(cmd.hasOption(SSL_ALGORITHM))
- encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM);
-
- if(cmd.hasOption(SSL_STORE_TYPE))
- encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE);
-
- if(cmd.hasOption(SSL_CIPHER_SUITES))
- encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
-
- if (cmd.hasOption("tf"))
- transportFactory = validateAndSetTransportFactory(cmd.getOptionValue("tf"));
-
- if (cmd.hasOption("un"))
- username = cmd.getOptionValue("un");
-
- if (cmd.hasOption("pw"))
- password = cmd.getOptionValue("pw");
- }
- catch (ParseException e)
- {
- throw new IllegalArgumentException(e.getMessage(), e);
- }
- catch (ConfigurationException e)
- {
- throw new IllegalStateException(e.getMessage(), e);
- }
-
- mean = numDifferentKeys / 2;
- sigma = numDifferentKeys * STDev;
- }
-
- private TTransportFactory validateAndSetTransportFactory(String transportFactory)
- {
- try
- {
- Class factory = Class.forName(transportFactory);
-
- if(!TTransportFactory.class.isAssignableFrom(factory))
- throw new IllegalArgumentException(String.format("transport factory '%s' " +
- "not derived from TTransportFactory", transportFactory));
-
- return (TTransportFactory) factory.newInstance();
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException(String.format("Cannot create a transport factory '%s'.", transportFactory), e);
- }
- }
-
- public int getCardinality()
- {
- return cardinality;
- }
-
- public int getColumnSize()
- {
- return columnSize;
- }
-
- public int getColumnsPerKey()
- {
- return columns;
- }
-
- public ColumnFamilyType getColumnFamilyType()
- {
- return columnFamilyType;
- }
-
- public int getNumKeys()
- {
- return numKeys;
- }
-
- public int getNumDifferentKeys()
- {
- return numDifferentKeys;
- }
-
- public int getThreads()
- {
- return threads;
- }
-
- public double getMaxOpsPerSecond()
- {
- return maxOpsPerSecond;
- }
-
- public float getSkipKeys()
- {
- return skipKeys;
- }
-
- public int getSuperColumns()
- {
- return superColumns;
- }
-
- public int getKeysPerThread()
- {
- return numKeys / threads;
- }
-
- public int getTotalKeysLength()
- {
- return Integer.toString(numDifferentKeys).length();
- }
-
- public ConsistencyLevel getConsistencyLevel()
- {
- return consistencyLevel;
- }
-
- public int getRetryTimes()
- {
- return retryTimes;
- }
-
- public boolean ignoreErrors()
- {
- return ignoreErrors;
- }
-
- public Stress.Operations getOperation()
- {
- return operation;
- }
-
- public PrintStream getOutputStream()
- {
- try
- {
- return (outFileName == null) ? System.out : new PrintStream(new FileOutputStream(outFileName));
- }
- catch (FileNotFoundException e)
- {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- public int getProgressInterval()
- {
- return progressInterval;
- }
-
- public boolean useRandomGenerator()
- {
- return random;
- }
-
- public int getKeysPerCall()
- {
- return keysPerCall;
- }
-
- // required by Gaussian distribution
- public int getMean()
- {
- return mean;
- }
-
- // required by Gaussian distribution
- public float getSigma()
- {
- return sigma;
- }
-
- public boolean isCQL()
- {
- return enable_cql;
- }
-
- public boolean usePreparedStatements()
- {
- return use_prepared;
- }
-
- public boolean outputStatistics()
- {
- return captureStatistics;
- }
-
- /**
- * Create Keyspace with Standard and Super/Counter column families
- */
- public void createKeySpaces()
- {
- KsDef keyspace = new KsDef();
- String defaultComparator = comparator == null ? DEFAULT_COMPARATOR : comparator;
-
- // column family for standard columns
- CfDef standardCfDef = new CfDef(KEYSPACE_NAME, "Standard1");
- Map<String, String> compressionOptions = new HashMap<String, String>();
- if (compression != null)
- compressionOptions.put("sstable_compression", compression);
-
- standardCfDef.setComparator_type(defaultComparator)
- .setDefault_validation_class(DEFAULT_VALIDATOR)
- .setCompression_options(compressionOptions);
-
- if (!timeUUIDComparator)
- {
- for (int i = 0; i < getColumnsPerKey(); i++)
- {
- standardCfDef.addToColumn_metadata(new ColumnDef(ByteBufferUtil.bytes("C" + i), "BytesType"));
- }
- }
-
- if (indexType != null)
- {
- ColumnDef standardColumn = new ColumnDef(ByteBufferUtil.bytes("C1"), "BytesType");
- standardColumn.setIndex_type(indexType).setIndex_name("Idx1");
- standardCfDef.setColumn_metadata(Arrays.asList(standardColumn));
- }
-
- // column family with super columns
- CfDef superCfDef = new CfDef(KEYSPACE_NAME, "Super1").setColumn_type("Super");
- superCfDef.setComparator_type(DEFAULT_COMPARATOR)
- .setSubcomparator_type(defaultComparator)
- .setDefault_validation_class(DEFAULT_VALIDATOR)
- .setCompression_options(compressionOptions);
-
- // column family for standard counters
- CfDef counterCfDef = new CfDef(KEYSPACE_NAME, "Counter1").setComparator_type(defaultComparator)
- .setComparator_type(defaultComparator)
- .setDefault_validation_class("CounterColumnType")
- .setReplicate_on_write(replicateOnWrite)
- .setCompression_options(compressionOptions);
-
- // column family with counter super columns
- CfDef counterSuperCfDef = new CfDef(KEYSPACE_NAME, "SuperCounter1").setComparator_type(defaultComparator)
- .setDefault_validation_class("CounterColumnType")
- .setReplicate_on_write(replicateOnWrite)
- .setColumn_type("Super")
- .setCompression_options(compressionOptions);
-
- keyspace.setName(KEYSPACE_NAME);
- keyspace.setStrategy_class(replicationStrategy);
-
- if (!replicationStrategyOptions.isEmpty())
- {
- keyspace.setStrategy_options(replicationStrategyOptions);
- }
-
- if (compactionStrategy != null)
- {
- standardCfDef.setCompaction_strategy(compactionStrategy);
- superCfDef.setCompaction_strategy(compactionStrategy);
- counterCfDef.setCompaction_strategy(compactionStrategy);
- counterSuperCfDef.setCompaction_strategy(compactionStrategy);
- }
-
- keyspace.setCf_defs(new ArrayList<CfDef>(Arrays.asList(standardCfDef, superCfDef, counterCfDef, counterSuperCfDef)));
-
- CassandraClient client = getClient(false);
-
- try
- {
- client.system_add_keyspace(keyspace);
-
- /* CQL3 counter cf */
- client.set_cql_version("3.0.0"); // just to create counter cf for cql3
-
- client.set_keyspace(KEYSPACE_NAME);
- client.execute_cql3_query(createCounterCFStatementForCQL3(), Compression.NONE, ConsistencyLevel.ONE);
-
- if (enable_cql)
- client.set_cql_version(cqlVersion);
- /* end */
-
- System.out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length));
- Thread.sleep(nodes.length * 1000); // seconds
- }
- catch (InvalidRequestException e)
- {
- System.err.println("Unable to create stress keyspace: " + e.getWhy());
- }
- catch (Exception e)
- {
- System.err.println(e.getMessage());
- }
- }
-
- /**
- * Thrift client connection with Keyspace1 set.
- * @return cassandra client connection
- */
- public CassandraClient getClient()
- {
- return getClient(true);
- }
-
- /**
- * Thrift client connection
- * @param setKeyspace - should we set keyspace for client or not
- * @return cassandra client connection
- */
- public CassandraClient getClient(boolean setKeyspace)
- {
- // random node selection for fake load balancing
- String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)];
-
- TSocket socket = new TSocket(currentNode, port);
- TTransport transport = transportFactory.getTransport(socket);
- CassandraClient client = new CassandraClient(new TBinaryProtocol(transport));
-
- try
- {
- if (!transport.isOpen())
- transport.open();
-
- if (enable_cql)
- client.set_cql_version(cqlVersion);
-
- if (setKeyspace)
- client.set_keyspace("Keyspace1");
-
- if (username != null && password != null)
- {
- Map<String, String> credentials = new HashMap<String, String>();
- credentials.put(IAuthenticator.USERNAME_KEY, username);
- credentials.put(IAuthenticator.PASSWORD_KEY, password);
- AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
- client.login(authenticationRequest);
- }
- }
- catch (AuthenticationException e)
- {
- throw new RuntimeException(e.getWhy());
- }
- catch (AuthorizationException e)
- {
- throw new RuntimeException(e.getWhy());
- }
- catch (InvalidRequestException e)
- {
- throw new RuntimeException(e.getWhy());
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage());
- }
-
- return client;
- }
-
- public SimpleClient getNativeClient()
- {
- try
- {
- String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)];
- SimpleClient client = new SimpleClient(currentNode, 9042);
- client.connect(false);
- client.execute("USE \"Keyspace1\";", org.apache.cassandra.db.ConsistencyLevel.ONE);
- return client;
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage());
- }
- }
-
- public static InetAddress getLocalAddress()
- {
- if (localInetAddress == null)
- {
- try
- {
- localInetAddress = InetAddress.getLocalHost();
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- return localInetAddress;
- }
-
- private ByteBuffer createCounterCFStatementForCQL3()
- {
- StringBuilder counter3 = new StringBuilder("CREATE TABLE \"Counter3\" (KEY blob PRIMARY KEY, ");
-
- for (int i = 0; i < getColumnsPerKey(); i++)
- {
- counter3.append("c").append(i).append(" counter");
- if (i != getColumnsPerKey() - 1)
- counter3.append(", ");
- }
- counter3.append(");");
-
- return ByteBufferUtil.bytes(counter3.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/Stress.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Stress.java b/tools/stress/src/org/apache/cassandra/stress/Stress.java
index 738a1c0..38af4f6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Stress.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Stress.java
@@ -17,48 +17,65 @@
*/
package org.apache.cassandra.stress;
-import org.apache.commons.cli.Option;
-
import java.io.*;
import java.net.Socket;
import java.net.SocketException;
-import java.util.Random;
+
+import org.apache.cassandra.stress.settings.StressSettings;
public final class Stress
{
- public static enum Operations
- {
- INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET, COUNTER_ADD, COUNTER_GET
- }
- public static Session session;
- public static Random randomizer = new Random();
+ /**
+ * Known issues:
+ * - uncertainty/stderr assumes op-rates are normally distributed. Due to GC (and possibly latency stepping from
+ * different media, though the variance of request ratio across media should be normally distributed), they are not.
+ * Should attempt to account for pauses in stderr calculation, possibly by assuming these pauses are a separate
+ * normally distributed occurrence
+ * - Under very mixed work loads, the uncertainty calculations and op/s reporting really don't mean much. Should
+ * consider breaking op/s down per workload, or should have a lower-bound on inspection interval based on clustering
+ * of operations and thread count.
+ *
+ *
+ * Future improvements:
+ * - Configurable connection compression
+ * - Java driver support
+ * - Per column data generators
+ * - Automatic column/schema detection if provided with a CF
+ * - target rate produces a very steady work rate, and if we want to simulate a real op rate for an
+ * application we should have some variation in the actual op rate within any time-slice.
+ * - auto rate should vary the thread count based on performance improvement, potentially starting on a very low
+ * thread count with a high error rate / low count to get some basic numbers
+ */
+
private static volatile boolean stopped = false;
public static void main(String[] arguments) throws Exception
{
+ final StressSettings settings;
try
{
- session = new Session(arguments);
+ settings = StressSettings.parse(arguments);
}
catch (IllegalArgumentException e)
{
printHelpMessage();
+ e.printStackTrace();
return;
}
- PrintStream outStream = session.getOutputStream();
+ PrintStream logout = settings.log.getOutput();
- if (session.sendToDaemon != null)
+ if (settings.sendToDaemon != null)
{
- Socket socket = new Socket(session.sendToDaemon, 2159);
+ Socket socket = new Socket(settings.sendToDaemon, 2159);
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
BufferedReader inp = new BufferedReader(new InputStreamReader(socket.getInputStream()));
Runtime.getRuntime().addShutdownHook(new ShutDown(socket, out));
- out.writeObject(session);
+ out.writeObject(settings);
String line;
@@ -72,7 +89,7 @@ public final class Stress
break;
}
- outStream.println(line);
+ logout.println(line);
}
}
catch (SocketException e)
@@ -88,10 +105,8 @@ public final class Stress
}
else
{
- StressAction stressAction = new StressAction(session, outStream);
- stressAction.start();
- stressAction.join();
- System.exit(stressAction.getReturnCode());
+ StressAction stressAction = new StressAction(settings, logout);
+ stressAction.run();
}
}
@@ -100,15 +115,7 @@ public final class Stress
*/
public static void printHelpMessage()
{
- System.out.println("Usage: ./bin/cassandra-stress [options]\n\nOptions:");
-
- for(Object o : Session.availableOptions.getOptions())
- {
- Option option = (Option) o;
- String upperCaseName = option.getLongOpt().toUpperCase();
- System.out.println(String.format("-%s%s, --%s%s%n\t\t%s%n", option.getOpt(), (option.hasArg()) ? " "+upperCaseName : "",
- option.getLongOpt(), (option.hasArg()) ? "="+upperCaseName : "", option.getDescription()));
- }
+ StressSettings.printHelp();
}
private static class ShutDown extends Thread
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e1e98ad/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 7098d0b..0312093 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -17,322 +17,527 @@
*/
package org.apache.cassandra.stress;
+import java.io.IOException;
+import java.io.OutputStream;
import java.io.PrintStream;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.util.concurrent.Uninterruptibles;
import com.google.common.util.concurrent.RateLimiter;
-import com.yammer.metrics.stats.Snapshot;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.stress.operations.*;
-import org.apache.cassandra.stress.util.CassandraClient;
-import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.stress.settings.*;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.transport.SimpleClient;
-public class StressAction extends Thread
+public class StressAction implements Runnable
{
- /**
- * Producer-Consumer model: 1 producer, N consumers
- */
- private final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
- private final Session client;
+ private final StressSettings settings;
private final PrintStream output;
- private volatile boolean stop = false;
-
- public static final int SUCCESS = 0;
- public static final int FAILURE = 1;
-
- private volatile int returnCode = -1;
-
- public StressAction(Session session, PrintStream out)
+ public StressAction(StressSettings settings, PrintStream out)
{
- client = session;
+ this.settings = settings;
output = out;
}
public void run()
{
- Snapshot latency;
- long oldLatency;
- int epoch, total, oldTotal, keyCount, oldKeyCount;
-
// creating keyspace and column families
- if (client.getOperation() == Stress.Operations.INSERT || client.getOperation() == Stress.Operations.COUNTER_ADD)
- client.createKeySpaces();
+ settings.maybeCreateKeyspaces();
- int threadCount = client.getThreads();
- Consumer[] consumers = new Consumer[threadCount];
+ warmup(settings.command.type, settings.command);
- output.println("total,interval_op_rate,interval_key_rate,latency,95th,99.9th,elapsed_time");
+ output.println("Sleeping 2s...");
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
- int itemsPerThread = client.getKeysPerThread();
- int modulo = client.getNumKeys() % threadCount;
- RateLimiter rateLimiter = RateLimiter.create(client.getMaxOpsPerSecond());
+ boolean success;
+ if (settings.rate.auto)
+ success = runAuto();
+ else
+ success = null != run(settings.command.type, settings.rate.threadCount, settings.command.count, output);
- // creating required type of the threads for the test
- for (int i = 0; i < threadCount; i++) {
- if (i == threadCount - 1)
- itemsPerThread += modulo; // last one is going to handle N + modulo items
+ if (success)
+ output.println("END");
+ else
+ output.println("FAILURE");
- consumers[i] = new Consumer(itemsPerThread, rateLimiter);
- }
+ settings.disconnect();
+ }
- Producer producer = new Producer();
- producer.start();
+ // type provided separately to support recursive call for mixed command with each command type it is performing
+ private void warmup(Command type, SettingsCommand command)
+ {
+ // warmup - do 50k iterations; by default hotspot compiles methods after 10k invocations
+ PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } );
+ int iterations;
+ switch (type.category)
+ {
+ case BASIC:
+ iterations = 50000;
+ break;
+ case MIXED:
+ for (Command subtype : ((SettingsCommandMixed) command).getCommands())
+ warmup(subtype, command);
+ return;
+ case MULTI:
+ int keysAtOnce = ((SettingsCommandMulti) command).keysAtOnce;
+ iterations = Math.min(50000, (int) Math.ceil(500000d / keysAtOnce));
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ output.println(String.format("Warming up %s with %d iterations...", type, iterations));
+ run(type, 20, iterations, warmupOutput);
+ }
- // starting worker threads
- for (int i = 0; i < threadCount; i++)
- consumers[i].start();
+ // TODO : permit varying more than just thread count
+ // TODO : vary thread count based on percentage improvement of previous increment, not by fixed amounts
+ private boolean runAuto()
+ {
+ int prevThreadCount = -1;
+ int threadCount = settings.rate.minAutoThreads;
+ List<StressMetrics> results = new ArrayList<>();
+ List<String> runIds = new ArrayList<>();
+ do
+ {
+ output.println(String.format("Running with %d threadCount", threadCount));
- // initialization of the values
- boolean terminate = false;
- epoch = total = keyCount = 0;
+ StressMetrics result = run(settings.command.type, threadCount, settings.command.count, output);
+ if (result == null)
+ return false;
+ results.add(result);
- int interval = client.getProgressInterval();
- int epochIntervals = client.getProgressInterval() * 10;
- long testStartTime = System.nanoTime();
-
- StressStatistics stats = new StressStatistics(client, output);
+ if (prevThreadCount > 0)
+ System.out.println(String.format("Improvement over %d threadCount: %.0f%%",
+ prevThreadCount, 100 * averageImprovement(results, 1)));
- while (!terminate)
- {
- if (stop)
- {
- producer.stopProducer();
-
- for (Consumer consumer : consumers)
- consumer.stopConsume();
+ runIds.add(threadCount + " threadCount");
+ prevThreadCount = threadCount;
+ if (threadCount < 16)
+ threadCount *= 2;
+ else
+ threadCount *= 1.5;
+ if (!results.isEmpty() && threadCount > settings.rate.maxAutoThreads)
break;
+
+ if (settings.command.type.updates)
+ {
+ // pause an arbitrary period of time to let the commit log flush, etc. shouldn't make much difference
+ // as we only increase load, never decrease it
+ output.println("Sleeping for 15s");
+ try
+ {
+ Thread.sleep(15 * 1000);
+ } catch (InterruptedException e)
+ {
+ return false;
+ }
}
+ // run until we have not improved throughput significantly for previous three runs
+ } while (hasAverageImprovement(results, 3, 0) && hasAverageImprovement(results, 5, settings.command.targetUncertainty));
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ // summarise all results
+ StressMetrics.summarise(runIds, results, output);
+ return true;
+ }
- int alive = 0;
- for (Thread thread : consumers)
- if (thread.isAlive()) alive++;
+ private boolean hasAverageImprovement(List<StressMetrics> results, int count, double minImprovement)
+ {
+ if (results.size() < count + 1)
+ return true;
+ return averageImprovement(results, count) >= minImprovement;
+ }
- if (alive == 0)
- terminate = true;
+ private double averageImprovement(List<StressMetrics> results, int count)
+ {
+ double improvement = 0;
+ for (int i = results.size() - count ; i < results.size() ; i++)
+ {
+ double prev = results.get(i - 1).getTiming().getHistory().realOpRate();
+ double cur = results.get(i).getTiming().getHistory().realOpRate();
+ improvement += (cur - prev) / prev;
+ }
+ return improvement / count;
+ }
- epoch++;
+ private StressMetrics run(Command type, int threadCount, long opCount, PrintStream output)
+ {
- if (terminate || epoch > epochIntervals)
- {
- epoch = 0;
+ output.println(String.format("Running %s with %d threads %s",
+ type.toString(),
+ threadCount,
+ opCount > 0 ? " for " + opCount + " iterations" : "until stderr of mean < " + settings.command.targetUncertainty));
+ final WorkQueue workQueue;
+ if (opCount < 0)
+ workQueue = new ContinuousWorkQueue(50);
+ else
+ workQueue = FixedWorkQueue.build(opCount);
+
+ RateLimiter rateLimiter = null;
+ // TODO : move this to a new queue wrapper that gates progress based on a poisson (or configurable) distribution
+ if (settings.rate.opRateTargetPerSecond > 0)
+ rateLimiter = RateLimiter.create(settings.rate.opRateTargetPerSecond);
+
+ final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis);
+
+ final CountDownLatch done = new CountDownLatch(threadCount);
+ final Consumer[] consumers = new Consumer[threadCount];
+ for (int i = 0; i < threadCount; i++)
+ consumers[i] = new Consumer(type, done, workQueue, metrics, rateLimiter);
- oldTotal = total;
- oldKeyCount = keyCount;
+ // starting worker threadCount
+ for (int i = 0; i < threadCount; i++)
+ consumers[i].start();
- total = client.operations.get();
- keyCount = client.keys.get();
- latency = client.latency.getSnapshot();
+ metrics.start();
- int opDelta = total - oldTotal;
- int keyDelta = keyCount - oldKeyCount;
+ if (opCount <= 0)
+ {
+ try
+ {
+ metrics.waitUntilConverges(settings.command.targetUncertainty,
+ settings.command.minimumUncertaintyMeasurements,
+ settings.command.maximumUncertaintyMeasurements);
+ } catch (InterruptedException e) { }
+ workQueue.stop();
+ }
- long currentTimeInSeconds = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - testStartTime);
+ try
+ {
+ done.await();
+ metrics.stop();
+ } catch (InterruptedException e) {}
- output.println(String.format("%d,%d,%d,%.1f,%.1f,%.1f,%d",
- total,
- opDelta / interval,
- keyDelta / interval,
- latency.getMedian(), latency.get95thPercentile(), latency.get999thPercentile(),
- currentTimeInSeconds));
+ if (metrics.wasCancelled())
+ return null;
- if (client.outputStatistics()) {
- stats.addIntervalStats(total,
- opDelta / interval,
- keyDelta / interval,
- latency,
- currentTimeInSeconds);
- }
- }
- }
+ metrics.summarise();
- // if any consumer failed, set the return code to failure.
- returnCode = SUCCESS;
- if (producer.isAlive())
- {
- producer.interrupt(); // if producer is still alive it means that we had errors in the consumers
- returnCode = FAILURE;
- }
+ boolean success = true;
for (Consumer consumer : consumers)
- if (consumer.getReturnCode() == FAILURE)
- returnCode = FAILURE;
-
- if (returnCode == SUCCESS) {
- if (client.outputStatistics())
- stats.printStats();
- // marking an end of the output to the client
- output.println("END");
- } else {
- output.println("FAILURE");
- }
+ success &= consumer.success;
- }
+ if (!success)
+ return null;
- public int getReturnCode()
- {
- return returnCode;
+ return metrics;
}
- /**
- * Produces exactly N items (awaits each to be consumed)
- */
- private class Producer extends Thread
+ private class Consumer extends Thread
{
- private volatile boolean stop = false;
+
+ private final Operation.State state;
+ private final RateLimiter rateLimiter;
+ private volatile boolean success = true;
+ private final WorkQueue workQueue;
+ private final CountDownLatch done;
+
+ public Consumer(Command type, CountDownLatch done, WorkQueue workQueue, StressMetrics metrics, RateLimiter rateLimiter)
+ {
+ this.done = done;
+ this.rateLimiter = rateLimiter;
+ this.workQueue = workQueue;
+ this.state = new Operation.State(type, settings, metrics);
+ }
public void run()
{
- for (int i = 0; i < client.getNumKeys(); i++)
+
+ try
{
- if (stop)
- break;
- try
+ SimpleClient sclient = null;
+ ThriftClient tclient = null;
+ JavaDriverClient jclient = null;
+
+ switch (settings.mode.api)
{
- operations.put(createOperation(i % client.getNumDifferentKeys()));
+ case JAVA_DRIVER_NATIVE:
+ jclient = settings.getJavaDriverClient();
+ break;
+ case SIMPLE_NATIVE:
+ sclient = settings.getSimpleNativeClient();
+ break;
+ case THRIFT:
+ tclient = settings.getThriftClient();
+ break;
+ case THRIFT_SMART:
+ tclient = settings.getSmartThriftClient();
+ break;
+ default:
+ throw new IllegalStateException();
}
- catch (InterruptedException e)
+
+ Work work;
+ while ( null != (work = workQueue.poll()) )
{
- if (e.getMessage() != null)
- System.err.println("Producer error - " + e.getMessage());
- return;
+
+ if (rateLimiter != null)
+ rateLimiter.acquire(work.count);
+
+ for (int i = 0 ; i < work.count ; i++)
+ {
+ try
+ {
+ Operation op = createOperation(state, i + work.offset);
+ switch (settings.mode.api)
+ {
+ case JAVA_DRIVER_NATIVE:
+ op.run(jclient);
+ break;
+ case SIMPLE_NATIVE:
+ op.run(sclient);
+ break;
+ default:
+ op.run(tclient);
+ }
+ } catch (Exception e)
+ {
+ if (output == null)
+ {
+ System.err.println(e.getMessage());
+ success = false;
+ System.exit(-1);
+ }
+
+ e.printStackTrace(output);
+ success = false;
+ workQueue.stop();
+ state.metrics.cancel();
+ return;
+ }
+ }
}
+
}
+ finally
+ {
+ done.countDown();
+ state.timer.close();
+ }
+
}
- public void stopProducer()
+ }
+
+ private interface WorkQueue
+ {
+ // null indicates consumer should terminate
+ Work poll();
+
+ // signal all consumers to terminate
+ void stop();
+ }
+
+ private static final class Work
+ {
+ // index of operations
+ final long offset;
+
+ // how many operations to perform
+ final int count;
+
+ public Work(long offset, int count)
{
- stop = true;
+ this.offset = offset;
+ this.count = count;
}
}
- /**
- * Each consumes exactly N items from queue
- */
- private class Consumer extends Thread
+ private static final class FixedWorkQueue implements WorkQueue
{
- private final int items;
- private final RateLimiter rateLimiter;
- private volatile boolean stop = false;
- private volatile int returnCode = StressAction.SUCCESS;
- public Consumer(int toConsume, RateLimiter rateLimiter)
+ final ArrayBlockingQueue<Work> work;
+ volatile boolean stop = false;
+
+ public FixedWorkQueue(ArrayBlockingQueue<Work> work)
{
- items = toConsume;
- this.rateLimiter = rateLimiter;
+ this.work = work;
}
- public void run()
+ @Override
+ public Work poll()
+ {
+ if (stop)
+ return null;
+ return work.poll();
+ }
+
+ @Override
+ public void stop()
+ {
+ stop = true;
+ }
+
+ static FixedWorkQueue build(long operations)
{
- if (client.use_native_protocol)
+ // target splitting into around 50-500k items, with a minimum size of 20
+ if (operations > Integer.MAX_VALUE * (1L << 19))
+ throw new IllegalStateException("Cannot currently support more than approx 2^50 operations for one stress run. This is a LOT.");
+ int batchSize = (int) (operations / (1 << 19));
+ if (batchSize < 20)
+ batchSize = 20;
+ ArrayBlockingQueue<Work> work = new ArrayBlockingQueue<Work>(
+ (int) ((operations / batchSize)
+ + (operations % batchSize == 0 ? 0 : 1))
+ );
+ long offset = 0;
+ while (offset < operations)
{
- SimpleClient connection = client.getNativeClient();
+ work.add(new Work(offset, (int) Math.min(batchSize, operations - offset)));
+ offset += batchSize;
+ }
+ return new FixedWorkQueue(work);
+ }
- for (int i = 0; i < items; i++)
- {
- if (stop)
- break;
+ }
- try
- {
- rateLimiter.acquire();
- operations.take().run(connection); // running job
- }
- catch (Exception e)
- {
- if (output == null)
- {
- System.err.println(e.getMessage());
- returnCode = StressAction.FAILURE;
- System.exit(-1);
- }
+ private static final class ContinuousWorkQueue implements WorkQueue
+ {
- output.println(e.getMessage());
- returnCode = StressAction.FAILURE;
- break;
- }
- }
- }
- else
- {
- CassandraClient connection = client.getClient();
+ final AtomicLong offset = new AtomicLong();
+ final int batchSize;
+ volatile boolean stop = false;
- for (int i = 0; i < items; i++)
- {
- if (stop)
- break;
+ private ContinuousWorkQueue(int batchSize)
+ {
+ this.batchSize = batchSize;
+ }
- try
- {
- rateLimiter.acquire();
- operations.take().run(connection); // running job
- }
- catch (Exception e)
- {
- if (output == null)
- {
- System.err.println(e.getMessage());
- returnCode = StressAction.FAILURE;
- System.exit(-1);
- }
+ @Override
+ public Work poll()
+ {
+ if (stop)
+ return null;
+ return new Work(nextOffset(), batchSize);
+ }
- output.println(e.getMessage());
- returnCode = StressAction.FAILURE;
- break;
- }
- }
+ private long nextOffset()
+ {
+ final int inc = batchSize;
+ while (true)
+ {
+ final long cur = offset.get();
+ if (offset.compareAndSet(cur, cur + inc))
+ return cur;
}
}
- public void stopConsume()
+ @Override
+ public void stop()
{
stop = true;
}
- public int getReturnCode()
- {
- return returnCode;
- }
}
- private Operation createOperation(int index)
+ private Operation createOperation(Operation.State state, long index)
+ {
+ return createOperation(state.type, state, index);
+ }
+ private Operation createOperation(Command type, Operation.State state, long index)
{
- switch (client.getOperation())
+ switch (type)
{
case READ:
- return client.isCQL() ? new CqlReader(client, index) : new Reader(client, index);
+ switch(state.settings.mode.style)
+ {
+ case THRIFT:
+ return new ThriftReader(state, index);
+ case CQL:
+ case CQL_PREPARED:
+ return new CqlReader(state, index);
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+
+ case COUNTERREAD:
+ switch(state.settings.mode.style)
+ {
+ case THRIFT:
+ return new ThriftCounterGetter(state, index);
+ case CQL:
+ case CQL_PREPARED:
+ return new CqlCounterGetter(state, index);
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ case WRITE:
+ switch(state.settings.mode.style)
+ {
+ case THRIFT:
+ return new ThriftInserter(state, index);
+ case CQL:
+ case CQL_PREPARED:
+ return new CqlInserter(state, index);
+ default:
+ throw new UnsupportedOperationException();
+ }
- case COUNTER_GET:
- return client.isCQL() ? new CqlCounterGetter(client, index) : new CounterGetter(client, index);
+ case COUNTERWRITE:
+ switch(state.settings.mode.style)
+ {
+ case THRIFT:
+ return new ThriftCounterAdder(state, index);
+ case CQL:
+ case CQL_PREPARED:
+ return new CqlCounterAdder(state, index);
+ default:
+ throw new UnsupportedOperationException();
+ }
- case INSERT:
- return client.isCQL() ? new CqlInserter(client, index) : new Inserter(client, index);
+ case RANGESLICE:
+ switch(state.settings.mode.style)
+ {
+ case THRIFT:
+ return new ThriftRangeSlicer(state, index);
+ case CQL:
+ case CQL_PREPARED:
+ return new CqlRangeSlicer(state, index);
+ default:
+ throw new UnsupportedOperationException();
+ }
- case COUNTER_ADD:
- return client.isCQL() ? new CqlCounterAdder(client, index) : new CounterAdder(client, index);
+ case IRANGESLICE:
+ switch(state.settings.mode.style)
+ {
+ case THRIFT:
+ return new ThriftIndexedRangeSlicer(state, index);
+ case CQL:
+ case CQL_PREPARED:
+ return new CqlIndexedRangeSlicer(state, index);
+ default:
+ throw new UnsupportedOperationException();
+ }
- case RANGE_SLICE:
- return client.isCQL() ? new CqlRangeSlicer(client, index) : new RangeSlicer(client, index);
+ case READMULTI:
+ switch(state.settings.mode.style)
+ {
+ case THRIFT:
+ return new ThriftMultiGetter(state, index);
+ case CQL:
+ case CQL_PREPARED:
+ return new CqlMultiGetter(state, index);
+ default:
+ throw new UnsupportedOperationException();
+ }
- case INDEXED_RANGE_SLICE:
- return client.isCQL() ? new CqlIndexedRangeSlicer(client, index) : new IndexedRangeSlicer(client, index);
+ case MIXED:
+ return createOperation(state.readWriteSelector.next(), state, index);
- case MULTI_GET:
- return client.isCQL() ? new CqlMultiGetter(client, index) : new MultiGetter(client, index);
}
throw new UnsupportedOperationException();
}
- public void stopAction()
- {
- stop = true;
- }
}