You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/06/01 18:55:28 UTC
svn commit: r1130223 - in /cassandra/branches/cassandra-0.8/tools/stress:
bin/ src/org/apache/cassandra/stress/
src/org/apache/cassandra/stress/operations/
src/org/apache/cassandra/stress/server/ src/org/apache/cassandra/stress/util/
Author: brandonwilliams
Date: Wed Jun 1 16:55:27 2011
New Revision: 1130223
URL: http://svn.apache.org/viewvc?rev=1130223&view=rev
Log:
stress.java daemon mode.
Patch by Pavel Yaskevich, reviewed by brandonwilliams for CASSANDRA-2267
Added:
cassandra/branches/cassandra-0.8/tools/stress/bin/stressd (with props)
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java
- copied, changed from r1130201, cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java (with props)
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java (with props)
Modified:
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
Added: cassandra/branches/cassandra-0.8/tools/stress/bin/stressd
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/bin/stressd?rev=1130223&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/bin/stressd (added)
+++ cassandra/branches/cassandra-0.8/tools/stress/bin/stressd Wed Jun 1 16:55:27 2011
@@ -0,0 +1,87 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+DESC="Stress Test Daemon"
+
+if [ "x$CLASSPATH" = "x" ]; then
+ # Cassandra class files.
+ if [ ! -d `dirname $0`/../../../build/classes/main ]; then
+ echo "Unable to locate cassandra class files" >&2
+ exit 1
+ fi
+
+ # Stress class files.
+ if [ ! -d `dirname $0`/../build/classes ]; then
+ echo "Unable to locate stress class files" >&2
+ exit 1
+ fi
+
+ CLASSPATH=`dirname $0`/../../../build/classes/main
+ CLASSPATH=$CLASSPATH:`dirname $0`/../../../build/classes/thrift
+ CLASSPATH=$CLASSPATH:`dirname $0`/../build/classes
+ for jar in `dirname $0`/../../../lib/*.jar; do
+ CLASSPATH=$CLASSPATH:$jar
+ done
+fi
+
+if [ -x $JAVA_HOME/bin/java ]; then
+ JAVA=$JAVA_HOME/bin/java
+else
+ JAVA=`which java`
+fi
+
+if [ "x$JAVA" = "x" ]; then
+ echo "Java executable not found (hint: set JAVA_HOME)" >&2
+ exit 1
+fi
+
+case "$1" in
+ start)
+ echo "Starting $DESC: "
+ $JAVA -server -cp $CLASSPATH org.apache.cassandra.stress.StressServer $@ 1> ./stressd.out.log 2> ./stressd.err.log &
+ echo $! > ./stressd.pid
+ echo "done."
+ ;;
+
+ stop)
+ PID=`cat ./stressd.pid 2> /dev/null`
+
+ if [ "x$PID" = "x" ]; then
+ echo "$DESC is not running."
+ else
+ kill -9 $PID
+ rm ./stressd.pid
+ echo "$DESC is stopped."
+ fi
+ ;;
+
+ status)
+ PID=`cat ./stressd.pid 2> /dev/null`
+
+ if [ "x$PID" = "x" ]; then
+ echo "$DESC is not running."
+ else
+ echo "$DESC is running with pid $PID."
+ fi
+ ;;
+
+ *)
+ echo "Usage: $0 start|stop|status [-h <host>]"
+ ;;
+esac
+
Propchange: cassandra/branches/cassandra-0.8/tools/stress/bin/stressd
------------------------------------------------------------------------------
svn:executable = *
Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java Wed Jun 1 16:55:27 2011
@@ -18,6 +18,8 @@
package org.apache.cassandra.stress;
import java.io.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -34,7 +36,7 @@ import org.apache.thrift.transport.TFram
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
-public class Session
+public class Session implements Serializable
{
// command line options
public static final Options availableOptions = new Options();
@@ -74,6 +76,7 @@ public class Session
availableOptions.addOption("O", "strategy-properties", true, "Replication strategy properties in the following format <dc_name>:<num>,<dc_name>:<num>,...");
availableOptions.addOption("W", "no-replicate-on-write",false, "Set replicate_on_write to false for counters. Only counter add with CL=ONE will work");
availableOptions.addOption("V", "average-size-values", false, "Generate column values of average rather than specific size");
+ availableOptions.addOption("T", "send-to", true, "Send this as a request to the stress daemon at specified address.");
}
private int numKeys = 1000 * 1000;
@@ -95,7 +98,7 @@ public class Session
private boolean replicateOnWrite = true;
private boolean ignoreErrors = false;
- private PrintStream out = System.out;
+ private final String outFileName;
private IndexType indexType = null;
private Stress.Operations operation = Stress.Operations.INSERT;
@@ -110,6 +113,8 @@ public class Session
protected int mean;
protected float sigma;
+ public final InetAddress sendToDaemon;
+
public Session(String[] arguments) throws IllegalArgumentException
{
float STDev = 0.1f;
@@ -181,17 +186,7 @@ public class Session
if (cmd.hasOption("r"))
random = true;
- if (cmd.hasOption("f"))
- {
- try
- {
- out = new PrintStream(new FileOutputStream(cmd.getOptionValue("f")));
- }
- catch (FileNotFoundException e)
- {
- System.out.println(e.getMessage());
- }
- }
+ outFileName = (cmd.hasOption("f")) ? cmd.getOptionValue("f") : null;
if (cmd.hasOption("p"))
port = Integer.parseInt(cmd.getOptionValue("p"));
@@ -264,6 +259,17 @@ public class Session
replicateOnWrite = false;
averageSizeValues = cmd.hasOption("V");
+
+ try
+ {
+ sendToDaemon = cmd.hasOption("send-to")
+ ? InetAddress.getByName(cmd.getOptionValue("send-to"))
+ : null;
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
}
catch (ParseException e)
{
@@ -360,7 +366,14 @@ public class Session
public PrintStream getOutputStream()
{
- return out;
+ try
+ {
+ return (outFileName == null) ? System.out : new PrintStream(new FileOutputStream(outFileName));
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
}
public int getProgressInterval()
@@ -432,16 +445,16 @@ public class Session
try
{
client.system_add_keyspace(keyspace);
- out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length));
+ System.out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length));
Thread.sleep(nodes.length * 1000); // seconds
}
catch (InvalidRequestException e)
{
- out.println("Unable to create stress keyspace: " + e.getWhy());
+ System.err.println("Unable to create stress keyspace: " + e.getWhy());
}
catch (Exception e)
{
- out.println(e.getMessage());
+ System.err.println(e.getMessage());
}
}
Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java Wed Jun 1 16:55:27 2011
@@ -17,15 +17,12 @@
*/
package org.apache.cassandra.stress;
-import org.apache.cassandra.stress.operations.*;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
import org.apache.commons.cli.Option;
-import java.io.PrintStream;
+import java.io.*;
+import java.net.Socket;
+import java.net.SocketException;
import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
public final class Stress
{
@@ -36,17 +33,10 @@ public final class Stress
public static Session session;
public static Random randomizer = new Random();
-
- /**
- * Producer-Consumer model: 1 producer, N consumers
- */
- private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
+ private static volatile boolean stopped = false;
public static void main(String[] arguments) throws Exception
{
- long latency, oldLatency;
- int epoch, total, oldTotal, keyCount, oldKeyCount;
-
try
{
session = new Session(arguments);
@@ -57,111 +47,49 @@ public final class Stress
return;
}
- // creating keyspace and column families
- if (session.getOperation() == Operations.INSERT || session.getOperation() == Operations.COUNTER_ADD)
- {
- session.createKeySpaces();
- }
-
- int threadCount = session.getThreads();
- Thread[] consumers = new Thread[threadCount];
- PrintStream out = session.getOutputStream();
-
- out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
+ PrintStream outStream = session.getOutputStream();
- int itemsPerThread = session.getKeysPerThread();
- int modulo = session.getNumKeys() % threadCount;
-
- // creating required type of the threads for the test
- for (int i = 0; i < threadCount; i++)
+ if (session.sendToDaemon != null)
{
- if (i == threadCount - 1)
- itemsPerThread += modulo; // last one is going to handle N + modulo items
-
- consumers[i] = new Consumer(itemsPerThread);
- }
-
- new Producer().start();
-
- // starting worker threads
- for (int i = 0; i < threadCount; i++)
- {
- consumers[i].start();
- }
+ Socket socket = new Socket(session.sendToDaemon, 2159);
- // initialization of the values
- boolean terminate = false;
- latency = 0;
- epoch = total = keyCount = 0;
+ ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
+ BufferedReader inp = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- int interval = session.getProgressInterval();
- int epochIntervals = session.getProgressInterval() * 10;
- long testStartTime = System.currentTimeMillis();
+ Runtime.getRuntime().addShutdownHook(new ShutDown(socket, out));
- while (!terminate)
- {
- Thread.sleep(100);
-
- int alive = 0;
- for (Thread thread : consumers)
- if (thread.isAlive()) alive++;
+ out.writeObject(session);
- if (alive == 0)
- terminate = true;
+ String line;
- epoch++;
-
- if (terminate || epoch > epochIntervals)
+ try
{
- epoch = 0;
-
- oldTotal = total;
- oldLatency = latency;
- oldKeyCount = keyCount;
-
- total = session.operations.get();
- keyCount = session.keys.get();
- latency = session.latency.get();
+ while (!socket.isClosed() && (line = inp.readLine()) != null)
+ {
+ if (line.equals("END"))
+ {
+ out.writeInt(1);
+ break;
+ }
- int opDelta = total - oldTotal;
- int keyDelta = keyCount - oldKeyCount;
- double latencyDelta = latency - oldLatency;
+ outStream.println(line);
+ }
+ }
+ catch (SocketException e)
+ {
+ if (!stopped)
+ e.printStackTrace();
+ }
- long currentTimeInSeconds = (System.currentTimeMillis() - testStartTime) / 1000;
- String formattedDelta = (opDelta > 0) ? Double.toString(latencyDelta / (opDelta * 1000)) : "NaN";
+ out.close();
+ inp.close();
- out.println(String.format("%d,%d,%d,%s,%d", total, opDelta / interval, keyDelta / interval, formattedDelta, currentTimeInSeconds));
- }
+ socket.close();
}
- }
-
- private static Operation createOperation(int index)
- {
- switch (session.getOperation())
+ else
{
- case READ:
- return new Reader(index);
-
- case COUNTER_GET:
- return new CounterGetter(index);
-
- case INSERT:
- return new Inserter(index);
-
- case COUNTER_ADD:
- return new CounterAdder(index);
-
- case RANGE_SLICE:
- return new RangeSlicer(index);
-
- case INDEXED_RANGE_SLICE:
- return new IndexedRangeSlicer(index);
-
- case MULTI_GET:
- return new MultiGetter(index);
+ new StressAction(session, outStream).run();
}
-
- throw new UnsupportedOperationException();
}
/**
@@ -180,56 +108,35 @@ public final class Stress
}
}
- /**
- * Produces exactly N items (awaits each to be consumed)
- */
- private static class Producer extends Thread
+ private static class ShutDown extends Thread
{
- public void run()
- {
- for (int i = 0; i < session.getNumKeys(); i++)
- {
- try
- {
- operations.put(createOperation(i % session.getNumDifferentKeys()));
- }
- catch (InterruptedException e)
- {
- System.err.println("Producer error - " + e.getMessage());
- return;
- }
- }
- }
- }
-
- /**
- * Each consumes exactly N items from queue
- */
- private static class Consumer extends Thread
- {
- private final int items;
+ private final Socket socket;
+ private final ObjectOutputStream out;
- public Consumer(int toConsume)
+ public ShutDown(Socket socket, ObjectOutputStream out)
{
- items = toConsume;
+ this.out = out;
+ this.socket = socket;
}
public void run()
{
- Cassandra.Client client = session.getClient();
-
- for (int i = 0; i < items; i++)
+ try
{
- try
- {
- operations.take().run(client); // running job
- }
- catch (Exception e)
+ if (!socket.isClosed())
{
- System.err.println(e.getMessage());
- System.exit(-1);
+ System.out.println("Control-C caught. Canceling running action and shutting down...");
+
+ out.writeInt(1);
+ out.close();
+
+ stopped = true;
}
}
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
}
}
Copied: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java (from r1130201, cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java)
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java?p2=cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java&p1=cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java&r1=1130201&r2=1130223&rev=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java Wed Jun 1 16:55:27 2011
@@ -17,90 +17,93 @@
*/
package org.apache.cassandra.stress;
-import org.apache.cassandra.stress.operations.*;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.commons.cli.Option;
-
import java.io.PrintStream;
-import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
-public final class Stress
-{
- public static enum Operations
- {
- INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET, COUNTER_ADD, COUNTER_GET
- }
-
- public static Session session;
- public static Random randomizer = new Random();
+import org.apache.cassandra.stress.operations.*;
+import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.thrift.Cassandra;
+public class StressAction extends Thread
+{
/**
* Producer-Consumer model: 1 producer, N consumers
*/
- private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
+ private final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
+
+ private final Session client;
+ private final PrintStream output;
- public static void main(String[] arguments) throws Exception
+ private volatile boolean stop = false;
+
+ public StressAction(Session session, PrintStream out)
+ {
+ client = session;
+ output = out;
+ }
+
+ public void run()
{
long latency, oldLatency;
int epoch, total, oldTotal, keyCount, oldKeyCount;
- try
- {
- session = new Session(arguments);
- }
- catch (IllegalArgumentException e)
- {
- printHelpMessage();
- return;
- }
-
// creating keyspace and column families
- if (session.getOperation() == Operations.INSERT || session.getOperation() == Operations.COUNTER_ADD)
- {
- session.createKeySpaces();
- }
+ if (client.getOperation() == Stress.Operations.INSERT || client.getOperation() == Stress.Operations.COUNTER_ADD)
+ client.createKeySpaces();
- int threadCount = session.getThreads();
- Thread[] consumers = new Thread[threadCount];
- PrintStream out = session.getOutputStream();
+ int threadCount = client.getThreads();
+ Consumer[] consumers = new Consumer[threadCount];
- out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
+ output.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
- int itemsPerThread = session.getKeysPerThread();
- int modulo = session.getNumKeys() % threadCount;
+ int itemsPerThread = client.getKeysPerThread();
+ int modulo = client.getNumKeys() % threadCount;
// creating required type of the threads for the test
- for (int i = 0; i < threadCount; i++)
- {
+ for (int i = 0; i < threadCount; i++) {
if (i == threadCount - 1)
itemsPerThread += modulo; // last one is going to handle N + modulo items
consumers[i] = new Consumer(itemsPerThread);
}
- new Producer().start();
+ Producer producer = new Producer();
+ producer.start();
// starting worker threads
for (int i = 0; i < threadCount; i++)
- {
consumers[i].start();
- }
// initialization of the values
boolean terminate = false;
latency = 0;
epoch = total = keyCount = 0;
- int interval = session.getProgressInterval();
- int epochIntervals = session.getProgressInterval() * 10;
+ int interval = client.getProgressInterval();
+ int epochIntervals = client.getProgressInterval() * 10;
long testStartTime = System.currentTimeMillis();
while (!terminate)
{
- Thread.sleep(100);
+ if (stop)
+ {
+ producer.stopProducer();
+
+ for (Consumer consumer : consumers)
+ consumer.stopConsume();
+
+ break;
+ }
+
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
int alive = 0;
for (Thread thread : consumers)
@@ -115,83 +118,46 @@ public final class Stress
{
epoch = 0;
- oldTotal = total;
- oldLatency = latency;
+ oldTotal = total;
+ oldLatency = latency;
oldKeyCount = keyCount;
- total = session.operations.get();
- keyCount = session.keys.get();
- latency = session.latency.get();
+ total = client.operations.get();
+ keyCount = client.keys.get();
+ latency = client.latency.get();
- int opDelta = total - oldTotal;
+ int opDelta = total - oldTotal;
int keyDelta = keyCount - oldKeyCount;
double latencyDelta = latency - oldLatency;
long currentTimeInSeconds = (System.currentTimeMillis() - testStartTime) / 1000;
String formattedDelta = (opDelta > 0) ? Double.toString(latencyDelta / (opDelta * 1000)) : "NaN";
- out.println(String.format("%d,%d,%d,%s,%d", total, opDelta / interval, keyDelta / interval, formattedDelta, currentTimeInSeconds));
+ output.println(String.format("%d,%d,%d,%s,%d", total, opDelta / interval, keyDelta / interval, formattedDelta, currentTimeInSeconds));
}
}
- }
-
- private static Operation createOperation(int index)
- {
- switch (session.getOperation())
- {
- case READ:
- return new Reader(index);
-
- case COUNTER_GET:
- return new CounterGetter(index);
-
- case INSERT:
- return new Inserter(index);
- case COUNTER_ADD:
- return new CounterAdder(index);
-
- case RANGE_SLICE:
- return new RangeSlicer(index);
-
- case INDEXED_RANGE_SLICE:
- return new IndexedRangeSlicer(index);
-
- case MULTI_GET:
- return new MultiGetter(index);
- }
-
- throw new UnsupportedOperationException();
- }
-
- /**
- * Printing out help message
- */
- public static void printHelpMessage()
- {
- System.out.println("Usage: ./bin/stress [options]\n\nOptions:");
-
- for(Object o : Session.availableOptions.getOptions())
- {
- Option option = (Option) o;
- String upperCaseName = option.getLongOpt().toUpperCase();
- System.out.println(String.format("-%s%s, --%s%s%n\t\t%s%n", option.getOpt(), (option.hasArg()) ? " "+upperCaseName : "",
- option.getLongOpt(), (option.hasArg()) ? "="+upperCaseName : "", option.getDescription()));
- }
+ // marking an end of the output to the client
+ output.println("END");
}
/**
* Produces exactly N items (awaits each to be consumed)
*/
- private static class Producer extends Thread
+ private class Producer extends Thread
{
+ private volatile boolean stop = false;
+
public void run()
{
- for (int i = 0; i < session.getNumKeys(); i++)
+ for (int i = 0; i < client.getNumKeys(); i++)
{
+ if (stop)
+ break;
+
try
{
- operations.put(createOperation(i % session.getNumDifferentKeys()));
+ operations.put(createOperation(i % client.getNumDifferentKeys()));
}
catch (InterruptedException e)
{
@@ -200,14 +166,20 @@ public final class Stress
}
}
}
+
+ public void stopProducer()
+ {
+ stop = true;
+ }
}
/**
* Each consumes exactly N items from queue
*/
- private static class Consumer extends Thread
+ private class Consumer extends Thread
{
private final int items;
+ private volatile boolean stop = false;
public Consumer(int toConsume)
{
@@ -216,21 +188,69 @@ public final class Stress
public void run()
{
- Cassandra.Client client = session.getClient();
+ Cassandra.Client connection = client.getClient();
for (int i = 0; i < items; i++)
{
+ if (stop)
+ break;
+
try
{
- operations.take().run(client); // running job
+ operations.take().run(connection); // running job
}
catch (Exception e)
{
- System.err.println(e.getMessage());
- System.exit(-1);
+ if (output == null)
+ {
+ System.err.println(e.getMessage());
+ System.exit(-1);
+ }
+
+
+ output.println(e.getMessage());
+ break;
}
}
}
+
+ public void stopConsume()
+ {
+ stop = true;
+ }
+ }
+
+ private Operation createOperation(int index)
+ {
+ switch (client.getOperation())
+ {
+ case READ:
+ return new Reader(client, index);
+
+ case COUNTER_GET:
+ return new CounterGetter(client, index);
+
+ case INSERT:
+ return new Inserter(client, index);
+
+ case COUNTER_ADD:
+ return new CounterAdder(client, index);
+
+ case RANGE_SLICE:
+ return new RangeSlicer(client, index);
+
+ case INDEXED_RANGE_SLICE:
+ return new IndexedRangeSlicer(client, index);
+
+ case MULTI_GET:
+ return new MultiGetter(client, index);
+ }
+
+ throw new UnsupportedOperationException();
}
+ public void stopAction()
+ {
+ stop = true;
+ }
}
Added: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java?rev=1130223&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java (added)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java Wed Jun 1 16:55:27 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+
+import org.apache.cassandra.stress.server.StressThread;
+import org.apache.commons.cli.*;
+
+public class StressServer
+{
+ private static final Options availableOptions = new Options();
+
+ static
+ {
+ availableOptions.addOption("h", "host", true, "Host to listen for connections.");
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ ServerSocket serverSocket = null;
+ CommandLineParser parser = new PosixParser();
+
+ InetAddress address = InetAddress.getByName("127.0.0.1");
+
+ try
+ {
+ CommandLine cmd = parser.parse(availableOptions, args);
+
+ if (cmd.hasOption("h"))
+ {
+ address = InetAddress.getByName(cmd.getOptionValue("h"));
+ }
+ }
+ catch (ParseException e)
+ {
+ System.err.printf("Usage: ./bin/stressd start|stop|status [-h <host>]");
+ System.exit(1);
+ }
+
+ try
+ {
+ serverSocket = new ServerSocket(2159, 0, address);
+ }
+ catch (IOException e)
+ {
+ System.err.printf("Could not listen on port: %s:2159.%n", address.getHostAddress());
+ System.exit(1);
+ }
+
+ for (;;)
+ new StressThread(serverSocket.accept()).start();
+ }
+}
Propchange: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java Wed Jun 1 16:55:27 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -31,9 +32,9 @@ import java.util.Map;
public class CounterAdder extends Operation
{
- public CounterAdder(int index)
+ public CounterAdder(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java Wed Jun 1 16:55:27 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -27,9 +28,9 @@ import java.util.List;
public class CounterGetter extends Operation
{
- public CounterGetter(int index)
+ public CounterGetter(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java Wed Jun 1 16:55:27 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -30,9 +31,9 @@ public class IndexedRangeSlicer extends
{
private static List<ByteBuffer> values = null;
- public IndexedRangeSlicer(int index)
+ public IndexedRangeSlicer(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java Wed Jun 1 16:55:27 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -33,9 +34,9 @@ public class Inserter extends Operation
{
private static List<ByteBuffer> values;
- public Inserter(int index)
+ public Inserter(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java Wed Jun 1 16:55:27 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -31,9 +32,9 @@ import java.util.Map;
public class MultiGetter extends Operation
{
- public MultiGetter(int index)
+ public MultiGetter(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java Wed Jun 1 16:55:27 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -30,9 +31,9 @@ import java.util.List;
public class RangeSlicer extends Operation
{
- public RangeSlicer(int index)
+ public RangeSlicer(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java Wed Jun 1 16:55:27 2011
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.stress.operations;
+import org.apache.cassandra.stress.Session;
import org.apache.cassandra.stress.util.Operation;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.thrift.*;
@@ -29,9 +30,9 @@ import static com.google.common.base.Cha
public class Reader extends Operation
{
- public Reader(int index)
+ public Reader(Session client, int index)
{
- super(index);
+ super(client, index);
}
public void run(Cassandra.Client client) throws IOException
Added: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java?rev=1130223&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java (added)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java Wed Jun 1 16:55:27 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.server;
+
+import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.StressAction;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.PrintStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+
+public class StressThread extends Thread
+{
+ private final Socket socket;
+
+ public StressThread(Socket client)
+ {
+ this.socket = client;
+ }
+
+ public void run()
+ {
+ try
+ {
+ ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
+ PrintStream out = new PrintStream(socket.getOutputStream());
+
+ StressAction action = new StressAction((Session) in.readObject(), out);
+ action.start();
+
+ while (action.isAlive())
+ {
+ try
+ {
+ if (in.readInt() == 1)
+ {
+ action.stopAction();
+ break;
+ }
+ }
+ catch (Exception e)
+ {
+ // continue without problem
+ }
+ }
+
+ out.close();
+ in.close();
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+}
Propchange: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java Wed Jun 1 16:55:27 2011
@@ -46,6 +46,12 @@ public abstract class Operation
session = Stress.session;
}
+ public Operation(Session client, int idx)
+ {
+ index = idx;
+ session = client;
+ }
+
/**
* Run operation
* @param client Cassandra Thrift client connection
@@ -101,18 +107,18 @@ public abstract class Operation
* key generator using Gauss or Random algorithm
* @return byte[] representation of the key string
*/
- protected static byte[] generateKey()
+ protected byte[] generateKey()
{
- return (Stress.session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey();
+ return (session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey();
}
/**
* Random key generator
* @return byte[] representation of the key string
*/
- private static byte[] generateRandomKey()
+ private byte[] generateRandomKey()
{
- String format = "%0" + Stress.session.getTotalKeysLength() + "d";
+ String format = "%0" + session.getTotalKeysLength() + "d";
return String.format(format, Stress.randomizer.nextInt(Stress.session.getNumDifferentKeys() - 1)).getBytes(UTF_8);
}
@@ -120,9 +126,8 @@ public abstract class Operation
* Gauss key generator
* @return byte[] representation of the key string
*/
- private static byte[] generateGaussKey()
+ private byte[] generateGaussKey()
{
- Session session = Stress.session;
String format = "%0" + session.getTotalKeysLength() + "d";
for (;;)