You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/02/23 20:25:40 UTC

svn commit: r1073894 - in /cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress: ./ operations/ util/

Author: brandonwilliams
Date: Wed Feb 23 19:25:40 2011
New Revision: 1073894

URL: http://svn.apache.org/viewvc?rev=1073894&view=rev
Log:
Switch stress.java to a producer/consumer model for better performance.
Patch by Pavel Yaskevich, reviewed by brandonwilliams for CASSANDRA-2020

Removed:
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/util/Range.java
Modified:
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java Wed Feb 23 19:25:40 2011
@@ -20,8 +20,8 @@ package org.apache.cassandra.contrib.str
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicIntegerArray;
-import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.cli.*;
 
@@ -38,9 +38,9 @@ public class Session
     // command line options
     public static final Options availableOptions = new Options();
 
-    public final AtomicIntegerArray operationCount;
-    public final AtomicIntegerArray keyCount;
-    public final AtomicLongArray latencies;
+    public final AtomicInteger operations;
+    public final AtomicInteger keys;
+    public final AtomicLong    latency;
 
     static
     {
@@ -93,7 +93,7 @@ public class Session
     private PrintStream out = System.out;
 
     private IndexType indexType = null;
-    private Stress.Operation operation = Stress.Operation.INSERT;
+    private Stress.Operations operation = Stress.Operations.INSERT;
     private ColumnFamilyType columnFamilyType = ColumnFamilyType.Standard;
     private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
     private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy";
@@ -183,7 +183,7 @@ public class Session
                 unframed = Boolean.parseBoolean(cmd.getOptionValue("m"));
 
             if (cmd.hasOption("o"))
-                operation = Stress.Operation.valueOf(cmd.getOptionValue("o").toUpperCase());
+                operation = Stress.Operations.valueOf(cmd.getOptionValue("o").toUpperCase());
 
             if (cmd.hasOption("u"))
                 superColumns = Integer.parseInt(cmd.getOptionValue("u"));
@@ -248,9 +248,9 @@ public class Session
         mean  = numKeys / 2;
         sigma = numKeys * STDev;
 
-        operationCount = new AtomicIntegerArray(threads);
-        keyCount = new AtomicIntegerArray(threads);
-        latencies = new AtomicLongArray(threads);
+        operations = new AtomicInteger();
+        keys = new AtomicInteger();
+        latency = new AtomicLong();
     }
 
     public int getCardinality()
@@ -323,7 +323,7 @@ public class Session
         return ignoreErrors;
     }
 
-    public Stress.Operation getOperation()
+    public Stress.Operations getOperation()
     {
         return operation;
     }

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java Wed Feb 23 19:25:40 2011
@@ -18,15 +18,18 @@
 package org.apache.cassandra.contrib.stress;
 
 import org.apache.cassandra.contrib.stress.operations.*;
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
+import org.apache.cassandra.thrift.Cassandra;
 import org.apache.commons.cli.Option;
 
 import java.io.PrintStream;
 import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
 
 public final class Stress
 {
-    public static enum Operation
+    public static enum Operations
     {
         INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET
     }
@@ -34,9 +37,15 @@ public final class Stress
     public static Session session;
     public static Random randomizer = new Random();
 
+    /**
+     * Producer-Consumer model: 1 producer, N consumers
+     */
+    private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true);
+
     public static void main(String[] arguments) throws Exception
     {
-        int epoch, total, oldTotal, latency, keyCount, oldKeyCount, oldLatency;
+        long latency, oldLatency;
+        int epoch, total, oldTotal, keyCount, oldKeyCount;
 
         try
         {
@@ -49,51 +58,52 @@ public final class Stress
         }
 
         // creating keyspace and column families
-        if (session.getOperation() == Stress.Operation.INSERT)
+        if (session.getOperation() == Stress.Operations.INSERT)
         {
             session.createKeySpaces();
         }
 
         int threadCount  = session.getThreads();
-        Thread[] threads = new Thread[threadCount];
-        PrintStream out  = session.getOutputStream();
+        Thread[] consumers = new Thread[threadCount];
+        PrintStream out = session.getOutputStream();
+
+        out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
+
+        int itemsPerThread = session.getKeysPerThread();
+        int modulo = session.getNumKeys() % threadCount;
 
         // creating required type of the threads for the test
-        try
-        {
-            for (int i = 0; i < threadCount; i++)
-            {
-                threads[i] = createOperation(i);
-            }
-        }
-        catch (Exception e)
+        for (int i = 0; i < threadCount; i++)
         {
-            System.err.println(e.getMessage());
-            return;
+            if (i == threadCount - 1)
+                itemsPerThread += modulo; // last one is going to handle N + modulo items
+
+            consumers[i] = new Consumer(itemsPerThread);
         }
 
+        new Producer().start();
+
         // starting worker threads
         for (int i = 0; i < threadCount; i++)
         {
-            threads[i].start();
+            consumers[i].start();
         }
 
         // initialization of the values
         boolean terminate = false;
-        epoch = total = latency = keyCount = 0;
+        latency = 0;
+        epoch = total = keyCount = 0;
 
         int interval = session.getProgressInterval();
         int epochIntervals = session.getProgressInterval() * 10;
         long testStartTime = System.currentTimeMillis();
 
-        out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time");
-
         while (!terminate)
         {
             Thread.sleep(100);
 
             int alive = 0;
-            for (Thread thread : threads)
+            for (Thread thread : consumers)
                 if (thread.isAlive()) alive++;
 
             if (alive == 0)
@@ -109,20 +119,9 @@ public final class Stress
                 oldLatency  = latency;
                 oldKeyCount = keyCount;
 
-                int currentTotal = 0, currentKeyCount = 0, currentLatency = 0;
-
-                for (Thread t : threads)
-                {
-                    OperationThread thread = (OperationThread) t;
-
-                    currentTotal    += session.operationCount.get(thread.index);
-                    currentKeyCount += session.keyCount.get(thread.index);
-                    currentLatency  += session.latencies.get(thread.index);
-                }
-
-                total    = currentTotal;
-                keyCount = currentKeyCount;
-                latency  = currentLatency;
+                total    = session.operations.get();
+                keyCount = session.keys.get();
+                latency  = session.latency.get();
 
                 int opDelta  = total - oldTotal;
                 int keyDelta = keyCount - oldKeyCount;
@@ -136,7 +135,7 @@ public final class Stress
         }
     }
 
-    private static Thread createOperation(int index)
+    private static Operation createOperation(int index)
     {
         switch (session.getOperation())
         {
@@ -174,4 +173,58 @@ public final class Stress
                                                             option.getLongOpt(), (option.hasArg()) ? "="+upperCaseName : "", option.getDescription()));
         }
     }
+
+    /**
+     * Produces exactly N items (awaits each to be consumed)
+     */
+    private static class Producer extends Thread
+    {
+        public void run()
+        {
+            for (int i = 0; i < session.getNumKeys(); i++)
+            {
+                try
+                {
+                    operations.put(createOperation(i));
+                }
+                catch (InterruptedException e)
+                {
+                    System.err.println("Producer error - " + e.getMessage());
+                    return;
+                }
+            }
+        }
+    }
+
+    /**
+     * Each consumes exactly N items from queue
+     */
+    private static class Consumer extends Thread
+    {
+        private final int items;
+
+        public Consumer(int toConsume)
+        {
+            items = toConsume;
+        }
+
+        public void run()
+        {
+            Cassandra.Client client = session.getClient();
+
+            for (int i = 0; i < items; i++)
+            {
+                try
+                {
+                    operations.take().run(client); // running job
+                }
+                catch (Exception e)
+                {
+                    System.err.println(e.getMessage());
+                    System.exit(-1);
+                }
+            }
+        }
+    }
+
 }

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java Wed Feb 23 19:25:40 2011
@@ -17,23 +17,23 @@
  */
 package org.apache.cassandra.contrib.stress.operations;
 
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
-public class IndexedRangeSlicer extends OperationThread
+public class IndexedRangeSlicer extends Operation
 {
     public IndexedRangeSlicer(int index)
     {
         super(index);
     }
 
-    public void run()
+    public void run(Cassandra.Client client) throws IOException
     {
         String format = "%0" + session.getTotalKeysLength() + "d";
         SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[]{}),
@@ -46,64 +46,59 @@ public class IndexedRangeSlicer extends 
 
         ByteBuffer columnName = ByteBuffer.wrap("C1".getBytes());
 
-        for (int i = range.begins(); i < range.size(); i++)
-        {
-            int received = 0;
+        int received = 0;
 
-            String startOffset = "0";
-            ByteBuffer value = ByteBuffer.wrap(values.get(i % values.size()).getBytes());
+        String startOffset = "0";
+        ByteBuffer value = ByteBuffer.wrap(values.get(index % values.size()).getBytes());
 
-            IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
+        IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value);
 
-            while (received < expectedPerValue)
-            {
-                IndexClause clause = new IndexClause(Arrays.asList(expression), ByteBuffer.wrap(startOffset.getBytes()),
-                                                                                session.getKeysPerCall());
+        while (received < expectedPerValue)
+        {
+            IndexClause clause = new IndexClause(Arrays.asList(expression),
+                                                 ByteBuffer.wrap(startOffset.getBytes()),
+                                                 session.getKeysPerCall());
 
-                List<KeySlice> results = null;
-                long start = System.currentTimeMillis();
+            List<KeySlice> results = null;
+            long start = System.currentTimeMillis();
 
-                boolean success = false;
-                String exceptionMessage = null;
+            boolean success = false;
+            String exceptionMessage = null;
 
-                for (int t = 0; t < session.getRetryTimes(); t++)
-                {
-                    if (success)
-                        break;
+            for (int t = 0; t < session.getRetryTimes(); t++)
+            {
+                if (success)
+                    break;
 
-                    try
-                    {
-                        results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
-                        success = (results.size() != 0);
-                    }
-                    catch (Exception e)
-                    {
-                        exceptionMessage = getExceptionMessage(e);
-                        success = false;
-                    }
+                try
+                {
+                    results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
+                    success = (results.size() != 0);
                 }
-
-                if (!success)
+                catch (Exception e)
                 {
-                    System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
-                                                                                              index,
-                                                                                              session.getRetryTimes(),
-                                                                                              startOffset,
-                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
-                    if (!session.ignoreErrors())
-                        return;
+                    exceptionMessage = getExceptionMessage(e);
+                    success = false;
                 }
+            }
+
+            if (!success)
+            {
+                error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
+                                    index,
+                                    session.getRetryTimes(),
+                                    startOffset,
+                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+            }
 
-                received += results.size();
+            received += results.size();
 
-                // convert max key found back to an integer, and increment it
-                startOffset = String.format(format, (1 + getMaxKey(results)));
+            // convert max key found back to an integer, and increment it
+            startOffset = String.format(format, (1 + getMaxKey(results)));
 
-                session.operationCount.getAndIncrement(index);
-                session.keyCount.getAndAdd(index, results.size());
-                session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
-            }
+            session.operations.getAndIncrement();
+            session.keys.getAndAdd(results.size());
+            session.latency.getAndAdd(System.currentTimeMillis() - start);
         }
     }
 

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java Wed Feb 23 19:25:40 2011
@@ -17,26 +17,27 @@
  */
 package org.apache.cassandra.contrib.stress.operations;
 
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class Inserter extends OperationThread
+public class Inserter extends Operation
 {
+
     public Inserter(int index)
     {
         super(index);
     }
 
-    public void run()
+    public void run(Cassandra.Client client) throws IOException
     {
         List<String> values  = generateValues();
         List<Column> columns = new ArrayList<Column>();
@@ -48,8 +49,10 @@ public class Inserter extends OperationT
         // columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)]
         for (int i = 0; i < session.getColumnsPerKey(); i++)
         {
-            byte[] columnName = ("C" + Integer.toString(i)).getBytes();
-            columns.add(new Column(ByteBuffer.wrap(columnName), ByteBuffer.wrap(new byte[] {}), System.currentTimeMillis()));
+            String columnName  = ("C" + Integer.toString(i));
+            String columnValue = values.get(index % values.size());
+
+            columns.add(new Column(ByteBufferUtil.bytes(columnName), ByteBufferUtil.bytes(columnValue), System.currentTimeMillis()));
         }
 
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
@@ -62,58 +65,47 @@ public class Inserter extends OperationT
             }
         }
 
-        for (int i : range)
-        {
-            String rawKey = String.format(format, i);
-            ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes());
-            Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
-
-            record.put(key, session.getColumnFamilyType() == ColumnFamilyType.Super
-                                                          ? getSuperColumnsMutationMap(superColumns)
-                                                          : getColumnsMutationMap(columns));
+        String rawKey = String.format(format, index);
+        Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
 
-            String value = values.get(i % values.size());
+        record.put(ByteBufferUtil.bytes(rawKey), session.getColumnFamilyType() == ColumnFamilyType.Super
+                                                                                ? getSuperColumnsMutationMap(superColumns)
+                                                                                : getColumnsMutationMap(columns));
 
-            for (Column c : columns)
-                c.value = ByteBuffer.wrap(value.getBytes());
+        long start = System.currentTimeMillis();
 
-            long start = System.currentTimeMillis();
+        boolean success = false;
+        String exceptionMessage = null;
 
-            boolean success = false;
-            String exceptionMessage = null;
+        for (int t = 0; t < session.getRetryTimes(); t++)
+        {
+            if (success)
+                break;
 
-            for (int t = 0; t < session.getRetryTimes(); t++)
+            try
             {
-                if (success)
-                    break;
-
-                try
-                {
-                    client.batch_mutate(record, session.getConsistencyLevel());
-                    success = true;
-                }
-                catch (Exception e)
-                {
-                    exceptionMessage = getExceptionMessage(e);
-                    success = false;
-                }
+                client.batch_mutate(record, session.getConsistencyLevel());
+                success = true;
             }
-
-            if (!success)
+            catch (Exception e)
             {
-                System.err.printf("Thread [%d] retried %d times - error inserting key %s %s%n", index,
-                                                                                                session.getRetryTimes(),
-                                                                                                rawKey,
-                                                                                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
-                if (!session.ignoreErrors())
-                    break;
+                exceptionMessage = getExceptionMessage(e);
+                success = false;
             }
+        }
 
-            session.operationCount.getAndIncrement(index);
-            session.keyCount.getAndIncrement(index);
-            session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
+        if (!success)
+        {
+            error(String.format("Operation [%d] retried %d times - error inserting key %s %s%n",
+                                index,
+                                session.getRetryTimes(),
+                                rawKey,
+                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
         }
+
+        session.operations.getAndIncrement();
+        session.keys.getAndIncrement();
+        session.latency.getAndAdd(System.currentTimeMillis() - start);
     }
 
     private Map<String, List<Mutation>> getSuperColumnsMutationMap(List<SuperColumn> superColumns)

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java Wed Feb 23 19:25:40 2011
@@ -17,90 +17,39 @@
  */
 package org.apache.cassandra.contrib.stress.operations;
 
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public class MultiGetter extends OperationThread
+public class MultiGetter extends Operation
 {
     public MultiGetter(int index)
     {
         super(index);
     }
 
-    public void run()
+    public void run(Cassandra.Client client) throws IOException
     {
         SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[]{}),
                                                                                       ByteBuffer.wrap(new byte[] {}),
                                                                                       false, session.getColumnsPerKey()));
 
         int offset = index * session.getKeysPerThread();
-        Map<ByteBuffer,List<ColumnOrSuperColumn>> results = null;
-        int count  = (((index + 1) * session.getKeysPerThread()) - offset) / session.getKeysPerCall();
+        Map<ByteBuffer,List<ColumnOrSuperColumn>> results;
 
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
         {
-            for (int i = 0; i < count; i++)
-            {
-                List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
-
-                for (int j = 0; j < session.getSuperColumns(); j++)
-                {
-                    ColumnParent parent = new ColumnParent("Super1").setSuper_column(("S" + j).getBytes());
-
-                    long start = System.currentTimeMillis();
-
-                    boolean success = false;
-                    String exceptionMessage = null;
-
-                    for (int t = 0; t < session.getRetryTimes(); t++)
-                    {
-                        if (success)
-                            break;
-
-                        try
-                        {
-                            results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
-                            success = (results.size() != 0);
-                        }
-                        catch (Exception e)
-                        {
-                            exceptionMessage = getExceptionMessage(e);
-                        }
-                    }
-
-                    if (!success)
-                    {
-                        System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
-                                                                                              index,
-                                                                                              session.getRetryTimes(),
-                                                                                              keys,
-                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
-                        if (!session.ignoreErrors())
-                            return;
-                    }
-
-                    session.operationCount.getAndIncrement(index);
-                    session.keyCount.getAndAdd(index, keys.size());
-                    session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
-
-                    offset += session.getKeysPerCall();
-                }
-            }
-        }
-        else
-        {
-            ColumnParent parent = new ColumnParent("Standard1");
+            List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
 
-            for (int i = 0; i < count; i++)
+            for (int j = 0; j < session.getSuperColumns(); j++)
             {
-                List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
+                ColumnParent parent = new ColumnParent("Super1").setSuper_column(("S" + j).getBytes());
 
                 long start = System.currentTimeMillis();
 
@@ -120,29 +69,68 @@ public class MultiGetter extends Operati
                     catch (Exception e)
                     {
                         exceptionMessage = getExceptionMessage(e);
-                        success = false;
                     }
                 }
 
                 if (!success)
                 {
-                    System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
-                                                                                        index,
-                                                                                        session.getRetryTimes(),
-                                                                                        keys,
-                                                                                        (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
-                    if (!session.ignoreErrors())
-                        return;
+                    error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+                                        index,
+                                        session.getRetryTimes(),
+                                        keys,
+                                        (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
                 }
 
-                session.operationCount.getAndIncrement(index);
-                session.keyCount.getAndAdd(index, keys.size());
-                session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
+                session.operations.getAndIncrement();
+                session.keys.getAndAdd(keys.size());
+                session.latency.getAndAdd(System.currentTimeMillis() - start);
 
                 offset += session.getKeysPerCall();
             }
         }
+        else
+        {
+            ColumnParent parent = new ColumnParent("Standard1");
+
+            List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall());
+
+            long start = System.currentTimeMillis();
+
+            boolean success = false;
+            String exceptionMessage = null;
+
+            for (int t = 0; t < session.getRetryTimes(); t++)
+            {
+                if (success)
+                    break;
+
+                try
+                {
+                    results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+                    success = (results.size() != 0);
+                }
+                catch (Exception e)
+                {
+                    exceptionMessage = getExceptionMessage(e);
+                    success = false;
+                }
+            }
+
+            if (!success)
+            {
+                error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+                                    index,
+                                    session.getRetryTimes(),
+                                    keys,
+                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+            }
+
+            session.operations.getAndIncrement();
+            session.keys.getAndAdd(keys.size());
+            session.latency.getAndAdd(System.currentTimeMillis() - start);
+
+            offset += session.getKeysPerCall();
+        }
     }
 
     private List<ByteBuffer> generateKeys(int start, int limit)

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java Wed Feb 23 19:25:40 2011
@@ -17,15 +17,17 @@
  */
 package org.apache.cassandra.contrib.stress.operations;
 
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class RangeSlicer extends OperationThread
+public class RangeSlicer extends Operation
 {
 
     public RangeSlicer(int index)
@@ -33,87 +35,29 @@ public class RangeSlicer extends Operati
         super(index);
     }
 
-    public void run()
+    public void run(Cassandra.Client client) throws IOException
     {
         String format = "%0" + session.getTotalKeysLength() + "d";
 
         // initial values
-        int current = range.begins();
-        int limit   = range.limit();
-        int count   = session.getColumnsPerKey();
-        int last    = current + session.getKeysPerCall();
+        int count = session.getColumnsPerKey();
 
         SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[] {}),
                                                                                       ByteBuffer.wrap(new byte[] {}),
-                                                                                      false, count));
+                                                                                      false,
+                                                                                      count));
 
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
         {
-            while (current < limit)
-            {
-                byte[] start = String.format(format, current).getBytes();
-                byte[] end   = String.format(format, last).getBytes();
-
-                List<KeySlice> slices = new ArrayList<KeySlice>();
-                KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(end);
-
-                for (int i = 0; i < session.getSuperColumns(); i++)
-                {
-                    String superColumnName = "S" + Integer.toString(i);
-                    ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBuffer.wrap(superColumnName.getBytes()));
-
-                    long startTime = System.currentTimeMillis();
-
-                    boolean success = false;
-                    String exceptionMessage = null;
-
-                    for (int t = 0; t < session.getRetryTimes(); t++)
-                    {
-                        try
-                        {
-                            slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
-                            success = (slices.size() != 0);
-                        }
-                        catch (Exception e)
-                        {
-                            exceptionMessage = getExceptionMessage(e);
-                            success = false;
-                        }
-                    }
-
-                    if (!success)
-                    {
-                        System.err.printf("Thread [%d] retried %d times - error on calling get_range_slices for range %s->%s %s%n",
-                                                                                            index,
-                                                                                            session.getRetryTimes(),
-                                                                                            new String(start),
-                                                                                            new String(end),
-                                                                                            (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
+            byte[] start = String.format(format, index).getBytes();
 
-                        if (!session.ignoreErrors())
-                            return;
-                    }
-
-                    session.operationCount.getAndIncrement(index);
-                    session.latencies.getAndAdd(index, System.currentTimeMillis() - startTime);
-                }
+            List<KeySlice> slices = new ArrayList<KeySlice>();
+            KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
 
-                current += slices.size() + 1;
-                last = current + slices.size() + 1;
-                session.keyCount.getAndAdd(index, slices.size());
-            }
-        }
-        else
-        {
-            ColumnParent parent = new ColumnParent("Standard1");
-
-            while (current < limit)
+            for (int i = 0; i < session.getSuperColumns(); i++)
             {
-                byte[] start = String.format(format, current).getBytes();
-                byte[] end   = String.format(format, last).getBytes();
-
-                List<KeySlice> slices = new ArrayList<KeySlice>();
-                KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(end);
+                String superColumnName = "S" + Integer.toString(i);
+                ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBuffer.wrap(superColumnName.getBytes()));
 
                 long startTime = System.currentTimeMillis();
 
@@ -122,9 +66,6 @@ public class RangeSlicer extends Operati
 
                 for (int t = 0; t < session.getRetryTimes(); t++)
                 {
-                    if (success)
-                        break;
-
                     try
                     {
                         slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
@@ -139,24 +80,62 @@ public class RangeSlicer extends Operati
 
                 if (!success)
                 {
-                    System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for range %s->%s %s%n",
-                                                                                              index,
-                                                                                              session.getRetryTimes(),
-                                                                                              new String(start),
-                                                                                              new String(end),
-                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
-                    if (!session.ignoreErrors())
-                        return;
+                    error(String.format("Operation [%d] retried %d times - error on calling get_range_slices for range offset %s %s%n",
+                                        index,
+                                        session.getRetryTimes(),
+                                        new String(start),
+                                        (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
                 }
 
-                current += slices.size() + 1;
-                last = current + slices.size() + 1;
+                session.operations.getAndIncrement();
+                session.latency.getAndAdd(System.currentTimeMillis() - startTime);
+            }
+
+            session.keys.getAndAdd(slices.size());
+        }
+        else
+        {
+            ColumnParent parent = new ColumnParent("Standard1");
+
+            byte[] start = String.format(format, index).getBytes();
+
+            List<KeySlice> slices = new ArrayList<KeySlice>();
+            KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+
+            long startTime = System.currentTimeMillis();
+
+            boolean success = false;
+            String exceptionMessage = null;
 
-                session.operationCount.getAndIncrement(index);
-                session.keyCount.getAndAdd(index, slices.size());
-                session.latencies.getAndAdd(index, System.currentTimeMillis() - startTime);
+            for (int t = 0; t < session.getRetryTimes(); t++)
+            {
+                if (success)
+                    break;
+
+                try
+                {
+                    slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+                    success = (slices.size() != 0);
+                }
+                catch (Exception e)
+                {
+                    exceptionMessage = getExceptionMessage(e);
+                    success = false;
+                }
             }
+
+            if (!success)
+            {
+                error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for range offset %s %s%n",
+                                    index,
+                                    session.getRetryTimes(),
+                                    new String(start),
+                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+            }
+
+            session.operations.getAndIncrement();
+            session.keys.getAndAdd(slices.size());
+            session.latency.getAndAdd(System.currentTimeMillis() - startTime);
         }
     }
 }

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java?rev=1073894&r1=1073893&r2=1073894&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java Wed Feb 23 19:25:40 2011
@@ -17,25 +17,22 @@
  */
 package org.apache.cassandra.contrib.stress.operations;
 
-import org.apache.cassandra.contrib.stress.util.OperationThread;
+import org.apache.cassandra.contrib.stress.util.Operation;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 import java.io.IOException;
-import java.lang.AssertionError;
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
 import java.util.List;
 
-public class Reader extends OperationThread
+public class Reader extends Operation
 {
     public Reader(int index)
     {
         super(index);
     }
 
-    public void run()
+    public void run(Cassandra.Client client) throws IOException
     {
         SliceRange sliceRange = new SliceRange();
 
@@ -50,75 +47,23 @@ public class Reader extends OperationThr
 
         if (session.getColumnFamilyType() == ColumnFamilyType.Super)
         {
-            runSuperColumnReader(predicate);
+            runSuperColumnReader(predicate, client);
         }
         else
         {
-            runColumnReader(predicate);
+            runColumnReader(predicate, client);
         }
     }
 
-    private void runSuperColumnReader(SlicePredicate predicate)
+    private void runSuperColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException
     {
-        for (int i = 0; i < session.getKeysPerThread(); i++)
-        {
-            byte[] rawKey = generateKey();
-            ByteBuffer key = ByteBuffer.wrap(rawKey);
-
-            for (int j = 0; j < session.getSuperColumns(); j++)
-            {
-                String superColumn = 'S' + Integer.toString(j);
-                ColumnParent parent = new ColumnParent("Super1").setSuper_column(superColumn.getBytes());
-
-                long start = System.currentTimeMillis();
-
-                boolean success = false;
-                String exceptionMessage = null;
-
-                for (int t = 0; t < session.getRetryTimes(); t++)
-                {
-                    if (success)
-                        break;
-
-                    try
-                    {
-                        List<ColumnOrSuperColumn> columns;
-                        columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
-                        success = (columns.size() != 0);
-                    }
-                    catch (Exception e)
-                    {
-                        exceptionMessage = getExceptionMessage(e);
-                        success = false;
-                    }
-                }
-
-                if (!success)
-                {
-                    System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
-                                                                                                  session.getRetryTimes(),
-                                                                                                  new String(rawKey),
-                                                                                                  (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
-
-                    if (!session.ignoreErrors())
-                        return;
-                }
-
-                session.operationCount.getAndIncrement(index);
-                session.keyCount.getAndIncrement(index);
-                session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
-            }
-        }
-    }
-
-    private void runColumnReader(SlicePredicate predicate)
-    {
-        ColumnParent parent = new ColumnParent("Standard1");
+        byte[] rawKey = generateKey();
+        ByteBuffer key = ByteBuffer.wrap(rawKey);
 
-        for (int i = 0; i < session.getKeysPerThread(); i++)
+        for (int j = 0; j < session.getSuperColumns(); j++)
         {
-            byte[] key = generateKey();
-            ByteBuffer keyBuffer = ByteBuffer.wrap(key);
+            String superColumn = 'S' + Integer.toString(j);
+            ColumnParent parent = new ColumnParent("Super1").setSuper_column(superColumn.getBytes());
 
             long start = System.currentTimeMillis();
 
@@ -133,7 +78,7 @@ public class Reader extends OperationThr
                 try
                 {
                     List<ColumnOrSuperColumn> columns;
-                    columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+                    columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
                     success = (columns.size() != 0);
                 }
                 catch (Exception e)
@@ -145,18 +90,61 @@ public class Reader extends OperationThr
 
             if (!success)
             {
-                System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
-                                                                                              session.getRetryTimes(),
-                                                                                              new String(key),
-                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
+                error(String.format("Operation [%d] retried %d times - error reading key %s %s%n",
+                                    index,
+                                    session.getRetryTimes(),
+                                    new String(rawKey),
+                                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
+            }
+
+            session.operations.getAndIncrement();
+            session.keys.getAndIncrement();
+            session.latency.getAndAdd(System.currentTimeMillis() - start);
+        }
+    }
+
+    private void runColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException
+    {
+        ColumnParent parent = new ColumnParent("Standard1");
+
+        byte[] key = generateKey();
+        ByteBuffer keyBuffer = ByteBuffer.wrap(key);
+
+        long start = System.currentTimeMillis();
+
+        boolean success = false;
+        String exceptionMessage = null;
 
-                if (!session.ignoreErrors())
-                    return;
+        for (int t = 0; t < session.getRetryTimes(); t++)
+        {
+            if (success)
+                break;
+
+            try
+            {
+                List<ColumnOrSuperColumn> columns;
+                columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+                success = (columns.size() != 0);
+            }
+            catch (Exception e)
+            {
+                exceptionMessage = getExceptionMessage(e);
+                success = false;
             }
+        }
 
-            session.operationCount.getAndIncrement(index);
-            session.keyCount.getAndIncrement(index);
-            session.latencies.getAndAdd(index, System.currentTimeMillis() - start);
+        if (!success)
+        {
+            error(String.format("Operation [%d] retried %d times - error reading key %s %s%n",
+                                index,
+                                session.getRetryTimes(),
+                                new String(key),
+                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"));
         }
+
+        session.operations.getAndIncrement();
+        session.keys.getAndIncrement();
+        session.latency.getAndAdd(System.currentTimeMillis() - start);
     }
+
 }