You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/02/17 23:13:41 UTC

svn commit: r1071811 - in /cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress: ./ operations/

Author: brandonwilliams
Date: Thu Feb 17 22:13:40 2011
New Revision: 1071811

URL: http://svn.apache.org/viewvc?rev=1071811&view=rev
Log:
Add 'keep trying' behavior to stress.java.
Patch by Pavel Yaskevich, reviewed by brandonwilliams for CASSANDRA-2047

Modified:
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
    cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java?rev=1071811&r1=1071810&r2=1071811&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java Thu Feb 17 22:13:40 2011
@@ -61,7 +61,8 @@ public class Session
         availableOptions.addOption("o",  "operation",            true,   "Operation to perform (INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET), default:INSERT");
         availableOptions.addOption("u",  "supercolumns",         true,   "Number of super columns per key, default:1");
         availableOptions.addOption("y",  "family-type",          true,   "Column Family Type (Super, Standard), default:Standard");
-        availableOptions.addOption("k",  "keep-going",           false,  "Ignore errors inserting or reading, default:false");
+        availableOptions.addOption("K",  "keep-trying",          true,   "Retry on-going operation N times (in case of failure). positive integer, default:10");
+        availableOptions.addOption("k",  "keep-going",           false,  "Ignore errors inserting or reading (when set, --keep-trying has no effect), default:false");
         availableOptions.addOption("i",  "progress-interval",    true,   "Progress Report Interval (seconds), default:10");
         availableOptions.addOption("g",  "keys-per-call",        true,   "Number of keys to get_range_slices or multiget per call, default:1000");
         availableOptions.addOption("l",  "replication-factor",   true,   "Replication Factor to use when creating needed column families, default:1");
@@ -80,13 +81,14 @@ public class Session
     private String[] nodes       = new String[] { "127.0.0.1" };
     private boolean random       = false;
     private boolean unframed     = false;
-    private boolean ignoreErrors = false;
+    private int retryTimes       = 10;
     private int port             = 9160;
     private int superColumns     = 1;
 
     private int progressInterval  = 10;
     private int keysPerCall       = 1000;
     private int replicationFactor = 1;
+    private boolean ignoreErrors  = false;
 
     private PrintStream out = System.out;
 
@@ -97,6 +99,7 @@ public class Session
     private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy";
     private Map<String, String> replicationStrategyOptions = new HashMap<String, String>();
 
+
     // required by Gaussian distribution.
     protected int   mean;
     protected float sigma;
@@ -188,8 +191,21 @@ public class Session
             if (cmd.hasOption("y"))
                 columnFamilyType = ColumnFamilyType.valueOf(cmd.getOptionValue("y"));
 
+            if (cmd.hasOption("K"))
+            {
+                retryTimes = Integer.valueOf(cmd.getOptionValue("K"));
+
+                if (retryTimes <= 0)
+                {
+                    throw new RuntimeException("--keep-trying option value should be > 0");
+                }
+            }
+
             if (cmd.hasOption("k"))
+            {
+                retryTimes = 1;
                 ignoreErrors = true;
+            }
 
             if (cmd.hasOption("i"))
                 progressInterval = Integer.parseInt(cmd.getOptionValue("i"));
@@ -297,6 +313,11 @@ public class Session
         return consistencyLevel;
     }
 
+    public int getRetryTimes()
+    {
+        return retryTimes;
+    }
+
     public boolean ignoreErrors()
     {
         return ignoreErrors;

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java?rev=1071811&r1=1071810&r2=1071811&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java Thu Feb 17 22:13:40 2011
@@ -63,21 +63,33 @@ public class IndexedRangeSlicer extends 
                 List<KeySlice> results = null;
                 long start = System.currentTimeMillis();
 
-                try
+                boolean success = false;
+                String exceptionMessage = null;
+
+                for (int t = 0; t < session.getRetryTimes(); t++)
                 {
-                    results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
+                    if (success)
+                        break;
 
-                    if (results.size() == 0)
+                    try
                     {
-                        System.err.printf("No indexed values from offset received: %s%n", startOffset);
-
-                        if (!session.ignoreErrors())
-                            break;
+                        results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
+                        success = (results.size() != 0);
+                    }
+                    catch (Exception e)
+                    {
+                        exceptionMessage = getExceptionMessage(e);
+                        success = false;
                     }
                 }
-                catch (Exception e)
+
+                if (!success)
                 {
-                    System.err.printf("Error on get_indexed_slices call for offset  %s - %s%n", startOffset, getExceptionMessage(e));
+                    System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
+                                                                                              index,
+                                                                                              session.getRetryTimes(),
+                                                                                              startOffset,
+                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                     if (!session.ignoreErrors())
                         return;

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java?rev=1071811&r1=1071810&r2=1071811&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java Thu Feb 17 22:13:40 2011
@@ -64,7 +64,8 @@ public class Inserter extends OperationT
 
         for (int i : range)
         {
-            ByteBuffer key = ByteBuffer.wrap(String.format(format, i).getBytes());
+            String rawKey = String.format(format, i);
+            ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes());
             Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
 
             record.put(key, session.getColumnFamilyType() == ColumnFamilyType.Super
@@ -78,23 +79,35 @@ public class Inserter extends OperationT
 
             long start = System.currentTimeMillis();
 
-            try
-            {
-                client.batch_mutate(record, session.getConsistencyLevel());
-            }
-            catch (Exception e)
+            boolean success = false;
+            String exceptionMessage = null;
+
+            for (int t = 0; t < session.getRetryTimes(); t++)
             {
+                if (success)
+                    break;
+
                 try
                 {
-                    System.err.printf("Error while inserting key %s - %s%n", ByteBufferUtil.string(key), getExceptionMessage(e));
+                    client.batch_mutate(record, session.getConsistencyLevel());
+                    success = true;
                 }
-                catch (CharacterCodingException e1)
+                catch (Exception e)
                 {
-                    throw new AssertionError(e1); // keys are valid strings
+                    exceptionMessage = getExceptionMessage(e);
+                    success = false;
                 }
+            }
+
+            if (!success)
+            {
+                System.err.printf("Thread [%d] retried %d times - error inserting key %s %s%n", index,
+                                                                                                session.getRetryTimes(),
+                                                                                                rawKey,
+                                                                                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                 if (!session.ignoreErrors())
-                    return;
+                    break;
             }
 
             session.operationCount.getAndIncrement(index);

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java?rev=1071811&r1=1071810&r2=1071811&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java Thu Feb 17 22:13:40 2011
@@ -55,21 +55,32 @@ public class MultiGetter extends Operati
 
                     long start = System.currentTimeMillis();
 
-                    try
+                    boolean success = false;
+                    String exceptionMessage = null;
+
+                    for (int t = 0; t < session.getRetryTimes(); t++)
                     {
-                        results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+                        if (success)
+                            break;
 
-                        if (results.size() == 0)
+                        try
                         {
-                            System.err.printf("Keys %s were not found.%n", keys);
-
-                            if (!session.ignoreErrors())
-                                break;
+                            results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+                            success = (results.size() != 0);
+                        }
+                        catch (Exception e)
+                        {
+                            exceptionMessage = getExceptionMessage(e);
                         }
                     }
-                    catch (Exception e)
+
+                    if (!success)
                     {
-                        System.err.printf("Error on multiget_slice call - %s%n", getExceptionMessage(e));
+                        System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+                                                                                              index,
+                                                                                              session.getRetryTimes(),
+                                                                                              keys,
+                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                         if (!session.ignoreErrors())
                             return;
@@ -93,21 +104,33 @@ public class MultiGetter extends Operati
 
                 long start = System.currentTimeMillis();
 
-                try
+                boolean success = false;
+                String exceptionMessage = null;
+
+                for (int t = 0; t < session.getRetryTimes(); t++)
                 {
-                    results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+                    if (success)
+                        break;
 
-                    if (results.size() == 0)
+                    try
                     {
-                        System.err.printf("Keys %s were not found.%n", keys);
-
-                        if (!session.ignoreErrors())
-                            break;
+                        results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+                        success = (results.size() != 0);
+                    }
+                    catch (Exception e)
+                    {
+                        exceptionMessage = getExceptionMessage(e);
+                        success = false;
                     }
                 }
-                catch (Exception e)
+
+                if (!success)
                 {
-                    System.err.printf("Error on multiget_slice call - %s%n", getExceptionMessage(e));
+                    System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+                                                                                        index,
+                                                                                        session.getRetryTimes(),
+                                                                                        keys,
+                                                                                        (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                     if (!session.ignoreErrors())
                         return;

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java?rev=1071811&r1=1071810&r2=1071811&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java Thu Feb 17 22:13:40 2011
@@ -64,21 +64,31 @@ public class RangeSlicer extends Operati
 
                     long startTime = System.currentTimeMillis();
 
-                    try
-                    {
-                        slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+                    boolean success = false;
+                    String exceptionMessage = null;
 
-                        if (slices.size() == 0)
+                    for (int t = 0; t < session.getRetryTimes(); t++)
+                    {
+                        try
                         {
-                            System.err.printf("Range %s->%s not found in Super Column %s.%n", new String(start), new String(end), superColumnName);
-
-                            if (!session.ignoreErrors())
-                                break;
+                            slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+                            success = (slices.size() != 0);
+                        }
+                        catch (Exception e)
+                        {
+                            exceptionMessage = getExceptionMessage(e);
+                            success = false;
                         }
                     }
-                    catch (Exception e)
+
+                    if (!success)
                     {
-                        System.err.printf("Error while reading Super Column %s - %s%n", superColumnName, getExceptionMessage(e));
+                        System.err.printf("Thread [%d] retried %d times - error on calling get_range_slices for range %s->%s %s%n",
+                                                                                            index,
+                                                                                            session.getRetryTimes(),
+                                                                                            new String(start),
+                                                                                            new String(end),
+                                                                                            (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                         if (!session.ignoreErrors())
                             return;
@@ -107,21 +117,34 @@ public class RangeSlicer extends Operati
 
                 long startTime = System.currentTimeMillis();
 
-                try
+                boolean success = false;
+                String exceptionMessage = null;
+
+                for (int t = 0; t < session.getRetryTimes(); t++)
                 {
-                    slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+                    if (success)
+                        break;
 
-                    if (slices.size() == 0)
+                    try
                     {
-                        System.err.printf("Range %s->%s not found.%n", String.format(format, current), String.format(format, last));
-
-                        if (!session.ignoreErrors())
-                            break;
+                        slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+                        success = (slices.size() != 0);
+                    }
+                    catch (Exception e)
+                    {
+                        exceptionMessage = getExceptionMessage(e);
+                        success = false;
                     }
                 }
-                catch (Exception e)
+
+                if (!success)
                 {
-                    System.err.printf("Error while reading range %s->%s - %s%n", String.format(format, current), String.format(format, last), getExceptionMessage(e));
+                    System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for range %s->%s %s%n",
+                                                                                              index,
+                                                                                              session.getRetryTimes(),
+                                                                                              new String(start),
+                                                                                              new String(end),
+                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                     if (!session.ignoreErrors())
                         return;

Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java?rev=1071811&r1=1071810&r2=1071811&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java (original)
+++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java Thu Feb 17 22:13:40 2011
@@ -22,6 +22,7 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import java.io.IOException;
 import java.lang.AssertionError;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
@@ -61,7 +62,8 @@ public class Reader extends OperationThr
     {
         for (int i = 0; i < session.getKeysPerThread(); i++)
         {
-            ByteBuffer key = ByteBuffer.wrap(generateKey());
+            byte[] rawKey = generateKey();
+            ByteBuffer key = ByteBuffer.wrap(rawKey);
 
             for (int j = 0; j < session.getSuperColumns(); j++)
             {
@@ -70,32 +72,36 @@ public class Reader extends OperationThr
 
                 long start = System.currentTimeMillis();
 
-                try
-                {
-                    List<ColumnOrSuperColumn> columns;
-                    columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
+                boolean success = false;
+                String exceptionMessage = null;
 
-                    if (columns.size() == 0)
-                    {
-                        System.err.printf("Key %s not found in Super Column %s.%n", ByteBufferUtil.string(key), superColumn);
-
-                        if (!session.ignoreErrors())
-                            break;
-                    }
-                }
-                catch (Exception e)
+                for (int t = 0; t < session.getRetryTimes(); t++)
                 {
+                    if (success)
+                        break;
+
                     try
                     {
-                        System.err.printf("Error while reading Super Column %s key %s - %s%n", superColumn, ByteBufferUtil.string(key), getExceptionMessage(e));
+                        List<ColumnOrSuperColumn> columns;
+                        columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
+                        success = (columns.size() != 0);
                     }
-                    catch (CharacterCodingException e1)
+                    catch (Exception e)
                     {
-                        throw new AssertionError(e1); // keys are valid string
+                        exceptionMessage = getExceptionMessage(e);
+                        success = false;
                     }
+                }
+
+                if (!success)
+                {
+                    System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
+                                                                                                  session.getRetryTimes(),
+                                                                                                  new String(rawKey),
+                                                                                                  (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                     if (!session.ignoreErrors())
-                        break;
+                        return;
                 }
 
                 session.operationCount.getAndIncrement(index);
@@ -116,25 +122,36 @@ public class Reader extends OperationThr
 
             long start = System.currentTimeMillis();
 
-            try
+            boolean success = false;
+            String exceptionMessage = null;
+
+            for (int t = 0; t < session.getRetryTimes(); t++)
             {
-                List<ColumnOrSuperColumn> columns;
-                columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+                if (success)
+                    break;
 
-                if (columns.size() == 0)
+                try
                 {
-                    System.err.println(String.format("Key %s not found.", new String(key)));
-
-                    if (!session.ignoreErrors())
-                        break;
+                    List<ColumnOrSuperColumn> columns;
+                    columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+                    success = (columns.size() != 0);
+                }
+                catch (Exception e)
+                {
+                    exceptionMessage = getExceptionMessage(e);
+                    success = false;
                 }
             }
-            catch (Exception e)
+
+            if (!success)
             {
-                System.err.printf("Error while reading key %s - %s%n", new String(key), getExceptionMessage(e));
+                System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
+                                                                                              session.getRetryTimes(),
+                                                                                              new String(key),
+                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                 if (!session.ignoreErrors())
-                    break;
+                    return;
             }
 
             session.operationCount.getAndIncrement(index);