You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/06/01 18:55:28 UTC

svn commit: r1130223 - in /cassandra/branches/cassandra-0.8/tools/stress: bin/ src/org/apache/cassandra/stress/ src/org/apache/cassandra/stress/operations/ src/org/apache/cassandra/stress/server/ src/org/apache/cassandra/stress/util/

Author: brandonwilliams
Date: Wed Jun  1 16:55:27 2011
New Revision: 1130223

URL: http://svn.apache.org/viewvc?rev=1130223&view=rev
Log:
stress.java daemon mode.
Patch by Pavel Yaskevich, reviewed by brandonwilliams for CASSANDRA-2267

Added:
    cassandra/branches/cassandra-0.8/tools/stress/bin/stressd   (with props)
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java
      - copied, changed from r1130201, cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java   (with props)
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java   (with props)
Modified:
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
    cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java

Added: cassandra/branches/cassandra-0.8/tools/stress/bin/stressd
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/bin/stressd?rev=1130223&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/bin/stressd (added)
+++ cassandra/branches/cassandra-0.8/tools/stress/bin/stressd Wed Jun  1 16:55:27 2011
@@ -0,0 +1,87 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+DESC="Stress Test Daemon"
+
+if [ "x$CLASSPATH" = "x" ]; then
+    # Cassandra class files.
+    if [ ! -d `dirname $0`/../../../build/classes/main ]; then
+        echo "Unable to locate cassandra class files" >&2
+        exit 1
+    fi
+
+    # Stress class files.
+    if [ ! -d `dirname $0`/../build/classes ]; then
+        echo "Unable to locate stress class files" >&2
+        exit 1
+    fi
+
+    CLASSPATH=`dirname $0`/../../../build/classes/main
+    CLASSPATH=$CLASSPATH:`dirname $0`/../../../build/classes/thrift
+    CLASSPATH=$CLASSPATH:`dirname $0`/../build/classes
+    for jar in `dirname $0`/../../../lib/*.jar; do
+        CLASSPATH=$CLASSPATH:$jar
+    done
+fi
+
+if [ -x $JAVA_HOME/bin/java ]; then
+    JAVA=$JAVA_HOME/bin/java
+else
+    JAVA=`which java`
+fi
+
+if [ "x$JAVA" = "x" ]; then
+    echo "Java executable not found (hint: set JAVA_HOME)" >&2
+    exit 1
+fi
+
+case "$1" in
+  start)
+    echo "Starting $DESC: "
+    $JAVA -server -cp $CLASSPATH org.apache.cassandra.stress.StressServer $@ 1> ./stressd.out.log 2> ./stressd.err.log &
+    echo $! > ./stressd.pid
+    echo "done."
+  ;;
+  
+  stop)
+    PID=`cat ./stressd.pid 2> /dev/null`
+    
+    if [ "x$PID" = "x" ]; then
+      echo "$DESC is not running."
+    else
+      kill -9 $PID
+      rm ./stressd.pid
+      echo "$DESC is stopped."
+    fi
+  ;;
+
+  status)
+    PID=`cat ./stressd.pid 2> /dev/null`
+
+    if [ "x$PID" = "x" ]; then
+      echo "$DESC is not running."
+    else
+      echo "$DESC is running with pid $PID."
+    fi
+  ;;
+
+  *)
+    echo "Usage: $0 start|stop|status [-h <host>]"
+  ;;
+esac
+

Propchange: cassandra/branches/cassandra-0.8/tools/stress/bin/stressd
------------------------------------------------------------------------------
    svn:executable = *

Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Session.java Wed Jun  1 16:55:27 2011
@@ -18,6 +18,8 @@
 package org.apache.cassandra.stress;
 
 import java.io.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -34,7 +36,7 @@ import org.apache.thrift.transport.TFram
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 
-public class Session
+public class Session implements Serializable
 {
     // command line options
     public static final Options availableOptions = new Options();
@@ -74,6 +76,7 @@ public class Session
         availableOptions.addOption("O",  "strategy-properties",  true,   "Replication strategy properties in the following format <dc_name>:<num>,<dc_name>:<num>,...");
         availableOptions.addOption("W",  "no-replicate-on-write",false,  "Set replicate_on_write to false for counters. Only counter add with CL=ONE will work");
         availableOptions.addOption("V",  "average-size-values",  false,  "Generate column values of average rather than specific size");
+        availableOptions.addOption("T",  "send-to",              true,   "Send this as a request to the stress daemon at specified address.");
     }
 
     private int numKeys          = 1000 * 1000;
@@ -95,7 +98,7 @@ public class Session
     private boolean replicateOnWrite = true;
     private boolean ignoreErrors  = false;
 
-    private PrintStream out = System.out;
+    private final String outFileName;
 
     private IndexType indexType = null;
     private Stress.Operations operation = Stress.Operations.INSERT;
@@ -110,6 +113,8 @@ public class Session
     protected int   mean;
     protected float sigma;
 
+    public final InetAddress sendToDaemon;
+
     public Session(String[] arguments) throws IllegalArgumentException
     {
         float STDev = 0.1f;
@@ -181,17 +186,7 @@ public class Session
             if (cmd.hasOption("r"))
                 random = true;
 
-            if (cmd.hasOption("f"))
-            {
-                try
-                {
-                    out = new PrintStream(new FileOutputStream(cmd.getOptionValue("f")));
-                }
-                catch (FileNotFoundException e)
-                {
-                    System.out.println(e.getMessage());
-                }
-            }
+            outFileName = (cmd.hasOption("f")) ? cmd.getOptionValue("f") : null;
 
             if (cmd.hasOption("p"))
                 port = Integer.parseInt(cmd.getOptionValue("p"));
@@ -264,6 +259,17 @@ public class Session
                 replicateOnWrite = false;
 
             averageSizeValues = cmd.hasOption("V");
+
+            try
+            {
+                sendToDaemon = cmd.hasOption("send-to")
+                                ? InetAddress.getByName(cmd.getOptionValue("send-to"))
+                                : null;
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
         catch (ParseException e)
         {
@@ -360,7 +366,14 @@ public class Session
 
     public PrintStream getOutputStream()
     {
-        return out;
+        try
+        {
+            return (outFileName == null) ? System.out : new PrintStream(new FileOutputStream(outFileName));
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e.getMessage(), e);
+        }
     }
 
     public int getProgressInterval()
@@ -432,16 +445,16 @@ public class Session
         try
         {
             client.system_add_keyspace(keyspace);
-            out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length));
+            System.out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length));
             Thread.sleep(nodes.length * 1000); // seconds
         }
         catch (InvalidRequestException e)
         {
-            out.println("Unable to create stress keyspace: " + e.getWhy());
+            System.err.println("Unable to create stress keyspace: " + e.getWhy());
         }
         catch (Exception e)
         {
-            out.println(e.getMessage());
+            System.err.println(e.getMessage());
         }
     }
 

Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java Wed Jun  1 16:55:27 2011
@@ -17,15 +17,12 @@
  */
 package org.apache.cassandra.stress;
 
-import org.apache.cassandra.stress.operations.*;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
 import org.apache.commons.cli.Option;
 
-import java.io.PrintStream;
+import java.io.*;
+import java.net.Socket;
+import java.net.SocketException;
 import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
 
 public final class Stress
 {
@@ -36,17 +33,10 @@ public final class Stress
 
     public static Session session;
     public static Random randomizer = new Random();
-
-    /**
-     * Producer-Consumer model: 1 producer, N consumers
-     */
-    private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
+    private static volatile boolean stopped = false;
 
     public static void main(String[] arguments) throws Exception
     {
-        long latency, oldLatency;
-        int epoch, total, oldTotal, keyCount, oldKeyCount;
-
         try
         {
             session = new Session(arguments);
@@ -57,111 +47,49 @@ public final class Stress
             return;
         }
 
-        // creating keyspace and column families
-        if (session.getOperation() == Operations.INSERT || session.getOperation() == Operations.COUNTER_ADD)
-        {
-            session.createKeySpaces();
-        }
-
-        int threadCount  = session.getThreads();
-        Thread[] consumers = new Thread[threadCount];
-        PrintStream out = session.getOutputStream();
-
-        out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
+        PrintStream outStream = session.getOutputStream();
 
-        int itemsPerThread = session.getKeysPerThread();
-        int modulo = session.getNumKeys() % threadCount;
-
-        // creating required type of the threads for the test
-        for (int i = 0; i < threadCount; i++)
+        if (session.sendToDaemon != null)
         {
-            if (i == threadCount - 1)
-                itemsPerThread += modulo; // last one is going to handle N + modulo items
-
-            consumers[i] = new Consumer(itemsPerThread);
-        }
-
-        new Producer().start();
-
-        // starting worker threads
-        for (int i = 0; i < threadCount; i++)
-        {
-            consumers[i].start();
-        }
+            Socket socket = new Socket(session.sendToDaemon, 2159);
 
-        // initialization of the values
-        boolean terminate = false;
-        latency = 0;
-        epoch = total = keyCount = 0;
+            ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
+            BufferedReader inp = new BufferedReader(new InputStreamReader(socket.getInputStream()));
 
-        int interval = session.getProgressInterval();
-        int epochIntervals = session.getProgressInterval() * 10;
-        long testStartTime = System.currentTimeMillis();
+            Runtime.getRuntime().addShutdownHook(new ShutDown(socket, out));
 
-        while (!terminate)
-        {
-            Thread.sleep(100);
-
-            int alive = 0;
-            for (Thread thread : consumers)
-                if (thread.isAlive()) alive++;
+            out.writeObject(session);
 
-            if (alive == 0)
-                terminate = true;
+            String line;
 
-            epoch++;
-
-            if (terminate || epoch > epochIntervals)
+            try
             {
-                epoch = 0;
-
-                oldTotal    = total;
-                oldLatency  = latency;
-                oldKeyCount = keyCount;
-
-                total    = session.operations.get();
-                keyCount = session.keys.get();
-                latency  = session.latency.get();
+                while (!socket.isClosed() && (line = inp.readLine()) != null)
+                {
+                    if (line.equals("END"))
+                    {
+                        out.writeInt(1);
+                        break;
+                    }
 
-                int opDelta  = total - oldTotal;
-                int keyDelta = keyCount - oldKeyCount;
-                double latencyDelta = latency - oldLatency;
+                    outStream.println(line);
+                }
+            }
+            catch (SocketException e)
+            {
+                if (!stopped)
+                    e.printStackTrace();
+            }
 
-                long currentTimeInSeconds = (System.currentTimeMillis() - testStartTime) / 1000;
-                String formattedDelta = (opDelta > 0) ? Double.toString(latencyDelta / (opDelta * 1000)) : "NaN";
+            out.close();
+            inp.close();
 
-                out.println(String.format("%d,%d,%d,%s,%d", total, opDelta / interval, keyDelta / interval, formattedDelta, currentTimeInSeconds));
-            }
+            socket.close();
         }
-    }
-
-    private static Operation createOperation(int index)
-    {
-        switch (session.getOperation())
+        else
         {
-            case READ:
-                return new Reader(index);
-
-            case COUNTER_GET:
-                return new CounterGetter(index);
-
-            case INSERT:
-                return new Inserter(index);
-
-            case COUNTER_ADD:
-                return new CounterAdder(index);
-
-            case RANGE_SLICE:
-                return new RangeSlicer(index);
-
-            case INDEXED_RANGE_SLICE:
-                return new IndexedRangeSlicer(index);
-
-            case MULTI_GET:
-                return new MultiGetter(index);
+            new StressAction(session, outStream).run();
         }
-
-        throw new UnsupportedOperationException();
     }
 
     /**
@@ -180,56 +108,35 @@ public final class Stress
         }
     }
 
-    /**
-     * Produces exactly N items (awaits each to be consumed)
-     */
-    private static class Producer extends Thread
+    private static class ShutDown extends Thread
     {
-        public void run()
-        {
-            for (int i = 0; i < session.getNumKeys(); i++)
-            {
-                try
-                {
-                    operations.put(createOperation(i % session.getNumDifferentKeys()));
-                }
-                catch (InterruptedException e)
-                {
-                    System.err.println("Producer error - " + e.getMessage());
-                    return;
-                }
-            }
-        }
-    }
-
-    /**
-     * Each consumes exactly N items from queue
-     */
-    private static class Consumer extends Thread
-    {
-        private final int items;
+        private final Socket socket;
+        private final ObjectOutputStream out;
 
-        public Consumer(int toConsume)
+        public ShutDown(Socket socket, ObjectOutputStream out)
         {
-            items = toConsume;
+            this.out = out;
+            this.socket = socket;
         }
 
         public void run()
         {
-            Cassandra.Client client = session.getClient();
-
-            for (int i = 0; i < items; i++)
+            try
             {
-                try
-                {
-                    operations.take().run(client); // running job
-                }
-                catch (Exception e)
+                if (!socket.isClosed())
                 {
-                    System.err.println(e.getMessage());
-                    System.exit(-1);
+                    System.out.println("Control-C caught. Canceling running action and shutting down...");
+
+                    out.writeInt(1);
+                    out.close();
+
+                    stopped = true;
                 }
             }
+            catch (IOException e)
+            {
+                e.printStackTrace();
+            }
         }
     }
 

Copied: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java (from r1130201, cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java)
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java?p2=cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java&p1=cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java&r1=1130201&r2=1130223&rev=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/Stress.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressAction.java Wed Jun  1 16:55:27 2011
@@ -17,90 +17,93 @@
  */
 package org.apache.cassandra.stress;
 
-import org.apache.cassandra.stress.operations.*;
-import org.apache.cassandra.stress.util.Operation;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.commons.cli.Option;
-
 import java.io.PrintStream;
-import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 
-public final class Stress
-{
-    public static enum Operations
-    {
-        INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET, COUNTER_ADD, COUNTER_GET
-    }
-
-    public static Session session;
-    public static Random randomizer = new Random();
+import org.apache.cassandra.stress.operations.*;
+import org.apache.cassandra.stress.util.Operation;
+import org.apache.cassandra.thrift.Cassandra;
 
+public class StressAction extends Thread
+{
     /**
      * Producer-Consumer model: 1 producer, N consumers
      */
-    private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
+    private final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
+
+    private final Session client;
+    private final PrintStream output;
 
-    public static void main(String[] arguments) throws Exception
+    private volatile boolean stop = false;
+
+    public StressAction(Session session, PrintStream out)
+    {
+        client = session;
+        output = out;
+    }
+
+    public void run()
     {
         long latency, oldLatency;
         int epoch, total, oldTotal, keyCount, oldKeyCount;
 
-        try
-        {
-            session = new Session(arguments);
-        }
-        catch (IllegalArgumentException e)
-        {
-            printHelpMessage();
-            return;
-        }
-
         // creating keyspace and column families
-        if (session.getOperation() == Operations.INSERT || session.getOperation() == Operations.COUNTER_ADD)
-        {
-            session.createKeySpaces();
-        }
+        if (client.getOperation() == Stress.Operations.INSERT || client.getOperation() == Stress.Operations.COUNTER_ADD)
+            client.createKeySpaces();
 
-        int threadCount  = session.getThreads();
-        Thread[] consumers = new Thread[threadCount];
-        PrintStream out = session.getOutputStream();
+        int threadCount = client.getThreads();
+        Consumer[] consumers = new Consumer[threadCount];
 
-        out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
+        output.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
 
-        int itemsPerThread = session.getKeysPerThread();
-        int modulo = session.getNumKeys() % threadCount;
+        int itemsPerThread = client.getKeysPerThread();
+        int modulo = client.getNumKeys() % threadCount;
 
         // creating required type of the threads for the test
-        for (int i = 0; i < threadCount; i++)
-        {
+        for (int i = 0; i < threadCount; i++) {
             if (i == threadCount - 1)
                 itemsPerThread += modulo; // last one is going to handle N + modulo items
 
             consumers[i] = new Consumer(itemsPerThread);
         }
 
-        new Producer().start();
+        Producer producer = new Producer();
+        producer.start();
 
         // starting worker threads
         for (int i = 0; i < threadCount; i++)
-        {
             consumers[i].start();
-        }
 
         // initialization of the values
         boolean terminate = false;
         latency = 0;
         epoch = total = keyCount = 0;
 
-        int interval = session.getProgressInterval();
-        int epochIntervals = session.getProgressInterval() * 10;
+        int interval = client.getProgressInterval();
+        int epochIntervals = client.getProgressInterval() * 10;
         long testStartTime = System.currentTimeMillis();
 
         while (!terminate)
         {
-            Thread.sleep(100);
+            if (stop)
+            {
+                producer.stopProducer();
+
+                for (Consumer consumer : consumers)
+                    consumer.stopConsume();
+
+                break;
+            }
+
+            try
+            {
+                Thread.sleep(100);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e.getMessage(), e);
+            }
 
             int alive = 0;
             for (Thread thread : consumers)
@@ -115,83 +118,46 @@ public final class Stress
             {
                 epoch = 0;
 
-                oldTotal    = total;
-                oldLatency  = latency;
+                oldTotal = total;
+                oldLatency = latency;
                 oldKeyCount = keyCount;
 
-                total    = session.operations.get();
-                keyCount = session.keys.get();
-                latency  = session.latency.get();
+                total = client.operations.get();
+                keyCount = client.keys.get();
+                latency = client.latency.get();
 
-                int opDelta  = total - oldTotal;
+                int opDelta = total - oldTotal;
                 int keyDelta = keyCount - oldKeyCount;
                 double latencyDelta = latency - oldLatency;
 
                 long currentTimeInSeconds = (System.currentTimeMillis() - testStartTime) / 1000;
                 String formattedDelta = (opDelta > 0) ? Double.toString(latencyDelta / (opDelta * 1000)) : "NaN";
 
-                out.println(String.format("%d,%d,%d,%s,%d", total, opDelta / interval, keyDelta / interval, formattedDelta, currentTimeInSeconds));
+                output.println(String.format("%d,%d,%d,%s,%d", total, opDelta / interval, keyDelta / interval, formattedDelta, currentTimeInSeconds));
             }
         }
-    }
-
-    private static Operation createOperation(int index)
-    {
-        switch (session.getOperation())
-        {
-            case READ:
-                return new Reader(index);
-
-            case COUNTER_GET:
-                return new CounterGetter(index);
-
-            case INSERT:
-                return new Inserter(index);
 
-            case COUNTER_ADD:
-                return new CounterAdder(index);
-
-            case RANGE_SLICE:
-                return new RangeSlicer(index);
-
-            case INDEXED_RANGE_SLICE:
-                return new IndexedRangeSlicer(index);
-
-            case MULTI_GET:
-                return new MultiGetter(index);
-        }
-
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * Printing out help message
-     */
-    public static void printHelpMessage()
-    {
-        System.out.println("Usage: ./bin/stress [options]\n\nOptions:");
-
-        for(Object o : Session.availableOptions.getOptions())
-        {
-            Option option = (Option) o;
-            String upperCaseName = option.getLongOpt().toUpperCase();
-            System.out.println(String.format("-%s%s, --%s%s%n\t\t%s%n", option.getOpt(), (option.hasArg()) ? " "+upperCaseName : "",
-                                                            option.getLongOpt(), (option.hasArg()) ? "="+upperCaseName : "", option.getDescription()));
-        }
+        // marking an end of the output to the client
+        output.println("END");
     }
 
     /**
      * Produces exactly N items (awaits each to be consumed)
      */
-    private static class Producer extends Thread
+    private class Producer extends Thread
     {
+        private volatile boolean stop = false;
+
         public void run()
         {
-            for (int i = 0; i < session.getNumKeys(); i++)
+            for (int i = 0; i < client.getNumKeys(); i++)
             {
+                if (stop)
+                    break;
+
                 try
                 {
-                    operations.put(createOperation(i % session.getNumDifferentKeys()));
+                    operations.put(createOperation(i % client.getNumDifferentKeys()));
                 }
                 catch (InterruptedException e)
                 {
@@ -200,14 +166,20 @@ public final class Stress
                 }
             }
         }
+
+        public void stopProducer()
+        {
+            stop = true;
+        }
     }
 
     /**
      * Each consumes exactly N items from queue
      */
-    private static class Consumer extends Thread
+    private class Consumer extends Thread
     {
         private final int items;
+        private volatile boolean stop = false;
 
         public Consumer(int toConsume)
         {
@@ -216,21 +188,69 @@ public final class Stress
 
         public void run()
         {
-            Cassandra.Client client = session.getClient();
+            Cassandra.Client connection = client.getClient();
 
             for (int i = 0; i < items; i++)
             {
+                if (stop)
+                    break;
+
                 try
                 {
-                    operations.take().run(client); // running job
+                    operations.take().run(connection); // running job
                 }
                 catch (Exception e)
                 {
-                    System.err.println(e.getMessage());
-                    System.exit(-1);
+                    if (output == null)
+                    {
+                        System.err.println(e.getMessage());
+                        System.exit(-1);
+                    }
+
+
+                    output.println(e.getMessage());
+                    break;
                 }
             }
         }
+
+        public void stopConsume()
+        {
+            stop = true;
+        }
+    }
+
+    private Operation createOperation(int index)
+    {
+        switch (client.getOperation())
+        {
+            case READ:
+                return new Reader(client, index);
+
+            case COUNTER_GET:
+                return new CounterGetter(client, index);
+
+            case INSERT:
+                return new Inserter(client, index);
+
+            case COUNTER_ADD:
+                return new CounterAdder(client, index);
+
+            case RANGE_SLICE:
+                return new RangeSlicer(client, index);
+
+            case INDEXED_RANGE_SLICE:
+                return new IndexedRangeSlicer(client, index);
+
+            case MULTI_GET:
+                return new MultiGetter(client, index);
+        }
+
+        throw new UnsupportedOperationException();
     }
 
+    public void stopAction()
+    {
+        stop = true;
+    }
 }

Added: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java?rev=1130223&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java (added)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java Wed Jun  1 16:55:27 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+
+import org.apache.cassandra.stress.server.StressThread;
+import org.apache.commons.cli.*;
+
+public class StressServer
+{
+    private static final Options availableOptions = new Options();
+
+    static
+    {
+        availableOptions.addOption("h", "host", true, "Host to listen for connections.");
+    }
+
+    public static void main(String[] args) throws Exception
+    {
+        ServerSocket serverSocket = null;
+        CommandLineParser parser  = new PosixParser();
+
+        InetAddress address = InetAddress.getByName("127.0.0.1");
+
+        try
+        {
+            CommandLine cmd = parser.parse(availableOptions, args);
+
+            if (cmd.hasOption("h"))
+            {
+                address = InetAddress.getByName(cmd.getOptionValue("h"));
+            }
+        }
+        catch (ParseException e)
+        {
+            System.err.printf("Usage: ./bin/stressd start|stop|status [-h <host>]");
+            System.exit(1);
+        }
+
+        try
+        {
+            serverSocket = new ServerSocket(2159, 0, address);
+        }
+        catch (IOException e)
+        {
+            System.err.printf("Could not listen on port: %s:2159.%n", address.getHostAddress());
+            System.exit(1);
+        }
+
+        for (;;)
+            new StressThread(serverSocket.accept()).start();
+    }
+}

Propchange: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/StressServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java Wed Jun  1 16:55:27 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -31,9 +32,9 @@ import java.util.Map;
 
 public class CounterAdder extends Operation
 {
-    public CounterAdder(int index)
+    public CounterAdder(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java Wed Jun  1 16:55:27 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -27,9 +28,9 @@ import java.util.List;
 
 public class CounterGetter extends Operation
 {
-    public CounterGetter(int index)
+    public CounterGetter(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java Wed Jun  1 16:55:27 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -30,9 +31,9 @@ public class IndexedRangeSlicer extends 
 {
     private static List<ByteBuffer> values = null;
 
-    public IndexedRangeSlicer(int index)
+    public IndexedRangeSlicer(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java Wed Jun  1 16:55:27 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -33,9 +34,9 @@ public class Inserter extends Operation
 {
     private static List<ByteBuffer> values;
 
-    public Inserter(int index)
+    public Inserter(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java Wed Jun  1 16:55:27 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -31,9 +32,9 @@ import java.util.Map;
 
 public class MultiGetter extends Operation
 {
-    public MultiGetter(int index)
+    public MultiGetter(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java Wed Jun  1 16:55:27 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -30,9 +31,9 @@ import java.util.List;
 public class RangeSlicer extends Operation
 {
 
-    public RangeSlicer(int index)
+    public RangeSlicer(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java Wed Jun  1 16:55:27 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.stress.operations;
 
+import org.apache.cassandra.stress.Session;
 import org.apache.cassandra.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
@@ -29,9 +30,9 @@ import static com.google.common.base.Cha
 
 public class Reader extends Operation
 {
-    public Reader(int index)
+    public Reader(Session client, int index)
     {
-        super(index);
+        super(client, index);
     }
 
     public void run(Cassandra.Client client) throws IOException

Added: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java?rev=1130223&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java (added)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java Wed Jun  1 16:55:27 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.stress.server;
+
+import org.apache.cassandra.stress.Session;
+import org.apache.cassandra.stress.StressAction;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.PrintStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+
+public class StressThread extends Thread
+{
+    private final Socket socket;
+
+    public StressThread(Socket client)
+    {
+        this.socket = client;
+    }
+
+    public void run()
+    {
+        try
+        {
+            ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
+            PrintStream out = new PrintStream(socket.getOutputStream());
+
+            StressAction action = new StressAction((Session) in.readObject(), out);
+            action.start();
+
+            while (action.isAlive())
+            {
+                try
+                {
+                    if (in.readInt() == 1)
+                    {
+                        action.stopAction();
+                        break;
+                    }
+                }
+                catch (Exception e)
+                {
+                    // continue without problem
+                }
+            }
+
+            out.close();
+            in.close();
+            socket.close();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+}

Propchange: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/server/StressThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java?rev=1130223&r1=1130222&r2=1130223&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java (original)
+++ cassandra/branches/cassandra-0.8/tools/stress/src/org/apache/cassandra/stress/util/Operation.java Wed Jun  1 16:55:27 2011
@@ -46,6 +46,12 @@ public abstract class Operation
         session = Stress.session;
     }
 
+    public Operation(Session client, int idx)
+    {
+        index = idx;
+        session = client;
+    }
+
     /**
      * Run operation
      * @param client Cassandra Thrift client connection
@@ -101,18 +107,18 @@ public abstract class Operation
      * key generator using Gauss or Random algorithm
      * @return byte[] representation of the key string
      */
-    protected static byte[] generateKey()
+    protected byte[] generateKey()
     {
-        return (Stress.session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey();
+        return (session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey();
     }
 
     /**
      * Random key generator
      * @return byte[] representation of the key string
      */
-    private static byte[] generateRandomKey()
+    private byte[] generateRandomKey()
     {
-        String format = "%0" + Stress.session.getTotalKeysLength() + "d";
+        String format = "%0" + session.getTotalKeysLength() + "d";
         return String.format(format, Stress.randomizer.nextInt(Stress.session.getNumDifferentKeys() - 1)).getBytes(UTF_8);
     }
 
@@ -120,9 +126,8 @@ public abstract class Operation
      * Gauss key generator
      * @return byte[] representation of the key string
      */
-    private static byte[] generateGaussKey()
+    private byte[] generateGaussKey()
     {
-        Session session = Stress.session;
         String format = "%0" + session.getTotalKeysLength() + "d";
 
         for (;;)