You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/18 23:52:23 UTC

svn commit: r1072184 - in /cassandra/trunk: ./ conf/ contrib/ contrib/py_stress/ contrib/stress/src/org/apache/cassandra/contrib/stress/ contrib/stress/src/org/apache/cassandra/contrib/stress/operations/ interface/thrift/gen-java/org/apache/cassandra/t...

Author: jbellis
Date: Fri Feb 18 22:52:22 2011
New Revision: 1072184

URL: http://svn.apache.org/viewvc?rev=1072184&view=rev
Log:
merge from 0.7

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/contrib/py_stress/stress.py
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
    cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/cli/CliMain.java
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071070
-/cassandra/branches/cassandra-0.7:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1071777
+/cassandra/branches/cassandra-0.7:1026516-1071868
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Feb 18 22:52:22 2011
@@ -17,7 +17,13 @@
  * fixes for cache save/load (CASSANDRA-2172, -2174)
  * Handle whole-row deletions in CFOutputFormat (CASSANDRA-2014)
  * Make memtable_flush_writers flush in parallel (CASSANDRA-2178)
+ * make key cache preheating default to false; enable with
+   -Dcompaction_preheat_key_cache=true (CASSANDRA-2175)
+ * refactor stress.py to have only one copy of the format string 
+   used for creating row keys (CASSANDRA-2108)
+ * validate index names for \w+ (CASSANDRA-2196)
  * Fix Cassandra cli to respect timeout if schema does not settle (CASSANDRA-2187)
+ * update memtable_throughput to be a long (CASSANDRA-2158)
 
 
 0.7.2
@@ -92,6 +98,7 @@
  * bound hints CF throughput between 32M and 256M (CASSANDRA-2148)
  * continue starting when invalid saved cache entries are encountered
    (CASSANDRA-2076)
+ * add max_hint_window_in_ms option (CASSANDRA-1459)
 
 
 0.7.0-final

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Fri Feb 18 22:52:22 2011
@@ -231,6 +231,11 @@ column_index_size_in_kb: 64
 # will be logged specifying the row key.
 in_memory_compaction_limit_in_mb: 64
 
+# Track cached row keys during compaction, and re-cache their new
+# positions in the compacted sstable.  Disable if you use really large
+# key caches.
+compaction_preheat_key_cache: true
+
 # Time to wait for a reply from other nodes before failing the command 
 rpc_timeout_in_ms: 10000
 

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.7/contrib:1026516-1071868
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Modified: cassandra/trunk/contrib/py_stress/stress.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/py_stress/stress.py?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/py_stress/stress.py (original)
+++ cassandra/trunk/contrib/py_stress/stress.py Fri Feb 18 22:52:22 2011
@@ -127,6 +127,9 @@ if options.nodefile != None:
     with open(options.nodefile) as f:
         nodes = [n.strip() for n in f.readlines() if len(n.strip()) > 0]
 
+#format string for keys
+fmt = '%0' + str(len(str(total_keys))) + 'd'
+
 # a generator that generates all keys according to a bell curve centered
 # around the middle of the keys generated (0..total_keys).  Remember that
 # about 68% of keys will be within stdev away from the mean and 
@@ -148,7 +151,6 @@ def generate_values():
     return values
 
 def key_generator_gauss():
-    fmt = '%0' + str(len(str(total_keys))) + 'd'
     while True:
         guess = gauss(mean, stdev)
         if 0 <= guess < total_keys:
@@ -157,7 +159,6 @@ def key_generator_gauss():
 # a generator that will generate all keys w/ equal probability.  this is the
 # worst case for caching.
 def key_generator_random():
-    fmt = '%0' + str(len(str(total_keys))) + 'd'
     return fmt % randint(0, total_keys - 1)
 
 key_generator = key_generator_gauss
@@ -220,7 +221,6 @@ class Inserter(Operation):
     def run(self):
         values = generate_values()
         columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)]
-        fmt = '%0' + str(len(str(total_keys))) + 'd'
         if 'super' == options.cftype:
             supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)]
         for i in self.range:
@@ -295,7 +295,6 @@ class RangeSlicer(Operation):
         end = self.range[-1]
         current = begin
         last = current + options.rangecount
-        fmt = '%0' + str(len(str(total_keys))) + 'd'
         p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
         if 'super' == options.cftype:
             while current < end:
@@ -348,7 +347,6 @@ class RangeSlicer(Operation):
 # from the thread's appointed range
 class IndexedRangeSlicer(Operation):
     def run(self):
-        fmt = '%0' + str(len(str(total_keys))) + 'd'
         p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
         values = generate_values()
         parent = ColumnParent('Standard1')

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java Fri Feb 18 22:52:22 2011
@@ -19,9 +19,7 @@ package org.apache.cassandra.contrib.str
 
 import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 import java.util.concurrent.atomic.AtomicLongArray;
 
@@ -29,6 +27,7 @@ import org.apache.commons.cli.*;
 
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.thrift.*;
+import org.apache.commons.lang.StringUtils;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
@@ -62,12 +61,15 @@ public class Session
         availableOptions.addOption("o",  "operation",            true,   "Operation to perform (INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET), default:INSERT");
         availableOptions.addOption("u",  "supercolumns",         true,   "Number of super columns per key, default:1");
         availableOptions.addOption("y",  "family-type",          true,   "Column Family Type (Super, Standard), default:Standard");
-        availableOptions.addOption("k",  "keep-going",           false,  "Ignore errors inserting or reading, default:false");
+        availableOptions.addOption("K",  "keep-trying",          true,   "Retry on-going operation N times (in case of failure). positive integer, default:10");
+        availableOptions.addOption("k",  "keep-going",           false,  "Ignore errors inserting or reading (when set, --keep-trying has no effect), default:false");
         availableOptions.addOption("i",  "progress-interval",    true,   "Progress Report Interval (seconds), default:10");
         availableOptions.addOption("g",  "keys-per-call",        true,   "Number of keys to get_range_slices or multiget per call, default:1000");
         availableOptions.addOption("l",  "replication-factor",   true,   "Replication Factor to use when creating needed column families, default:1");
         availableOptions.addOption("e",  "consistency-level",    true,   "Consistency Level to use (ONE, QUORUM, LOCAL_QUORUM, EACH_QUORUM, ALL, ANY), default:ONE");
         availableOptions.addOption("x",  "create-index",         true,   "Type of index to create on needed column families (KEYS)");
+        availableOptions.addOption("R",  "replication-strategy", true,   "Replication strategy to use (only on insert if keyspace does not exist), default:org.apache.cassandra.locator.SimpleStrategy");
+        availableOptions.addOption("O",  "strategy-properties",  true,   "Replication strategy properties in the following format <dc_name>:<num>,<dc_name>:<num>,...");
     }
 
     private int numKeys          = 1000 * 1000;
@@ -79,13 +81,14 @@ public class Session
     private String[] nodes       = new String[] { "127.0.0.1" };
     private boolean random       = false;
     private boolean unframed     = false;
-    private boolean ignoreErrors = false;
+    private int retryTimes       = 10;
     private int port             = 9160;
     private int superColumns     = 1;
 
     private int progressInterval  = 10;
     private int keysPerCall       = 1000;
     private int replicationFactor = 1;
+    private boolean ignoreErrors  = false;
 
     private PrintStream out = System.out;
 
@@ -93,6 +96,9 @@ public class Session
     private Stress.Operation operation = Stress.Operation.INSERT;
     private ColumnFamilyType columnFamilyType = ColumnFamilyType.Standard;
     private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
+    private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy";
+    private Map<String, String> replicationStrategyOptions = new HashMap<String, String>();
+
 
     // required by Gaussian distribution.
     protected int   mean;
@@ -185,8 +191,21 @@ public class Session
             if (cmd.hasOption("y"))
                 columnFamilyType = ColumnFamilyType.valueOf(cmd.getOptionValue("y"));
 
+            if (cmd.hasOption("K"))
+            {
+                retryTimes = Integer.valueOf(cmd.getOptionValue("K"));
+
+                if (retryTimes <= 0)
+                {
+                    throw new RuntimeException("--keep-trying option value should be > 0");
+                }
+            }
+
             if (cmd.hasOption("k"))
+            {
+                retryTimes = 1;
                 ignoreErrors = true;
+            }
 
             if (cmd.hasOption("i"))
                 progressInterval = Integer.parseInt(cmd.getOptionValue("i"));
@@ -202,6 +221,24 @@ public class Session
 
             if (cmd.hasOption("x"))
                 indexType = IndexType.valueOf(cmd.getOptionValue("x").toUpperCase());
+
+            if (cmd.hasOption("R"))
+                replicationStrategy = cmd.getOptionValue("R");
+
+            if (cmd.hasOption("O"))
+            {
+                String[] pairs = StringUtils.split(cmd.getOptionValue("O"), ',');
+
+                for (String pair : pairs)
+                {
+                    String[] keyAndValue = StringUtils.split(pair, ':');
+
+                    if (keyAndValue.length != 2)
+                        throw new RuntimeException("Invalid --strategy-properties value.");
+
+                    replicationStrategyOptions.put(keyAndValue[0], keyAndValue[1]);
+                }
+            }
         }
         catch (ParseException e)
         {
@@ -276,6 +313,11 @@ public class Session
         return consistencyLevel;
     }
 
+    public int getRetryTimes()
+    {
+        return retryTimes;
+    }
+
     public boolean ignoreErrors()
     {
         return ignoreErrors;
@@ -337,8 +379,14 @@ public class Session
         CfDef superCfDef = new CfDef("Keyspace1", "Super1").setColumn_metadata(Arrays.asList(superSubColumn)).setColumn_type("Super");
 
         keyspace.setName("Keyspace1");
-        keyspace.setStrategy_class("org.apache.cassandra.locator.SimpleStrategy");
+        keyspace.setStrategy_class(replicationStrategy);
         keyspace.setReplication_factor(replicationFactor);
+
+        if (!replicationStrategyOptions.isEmpty())
+        {
+            keyspace.setStrategy_options(replicationStrategyOptions);
+        }
+
         keyspace.setCf_defs(new ArrayList<CfDef>(Arrays.asList(standardCfDef, superCfDef)));
 
         Cassandra.Client client = getClient(false);

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java Fri Feb 18 22:52:22 2011
@@ -63,21 +63,33 @@ public class IndexedRangeSlicer extends 
                 List<KeySlice> results = null;
                 long start = System.currentTimeMillis();
 
-                try
+                boolean success = false;
+                String exceptionMessage = null;
+
+                for (int t = 0; t < session.getRetryTimes(); t++)
                 {
-                    results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
+                    if (success)
+                        break;
 
-                    if (results.size() == 0)
+                    try
                     {
-                        System.err.printf("No indexed values from offset received: %s%n", startOffset);
-
-                        if (!session.ignoreErrors())
-                            break;
+                        results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel());
+                        success = (results.size() != 0);
+                    }
+                    catch (Exception e)
+                    {
+                        exceptionMessage = getExceptionMessage(e);
+                        success = false;
                     }
                 }
-                catch (Exception e)
+
+                if (!success)
                 {
-                    System.err.printf("Error on get_indexed_slices call for offset  %s - %s%n", startOffset, getExceptionMessage(e));
+                    System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n",
+                                                                                              index,
+                                                                                              session.getRetryTimes(),
+                                                                                              startOffset,
+                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                     if (!session.ignoreErrors())
                         return;

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java Fri Feb 18 22:52:22 2011
@@ -64,7 +64,8 @@ public class Inserter extends OperationT
 
         for (int i : range)
         {
-            ByteBuffer key = ByteBuffer.wrap(String.format(format, i).getBytes());
+            String rawKey = String.format(format, i);
+            ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes());
             Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
 
             record.put(key, session.getColumnFamilyType() == ColumnFamilyType.Super
@@ -78,23 +79,35 @@ public class Inserter extends OperationT
 
             long start = System.currentTimeMillis();
 
-            try
-            {
-                client.batch_mutate(record, session.getConsistencyLevel());
-            }
-            catch (Exception e)
+            boolean success = false;
+            String exceptionMessage = null;
+
+            for (int t = 0; t < session.getRetryTimes(); t++)
             {
+                if (success)
+                    break;
+
                 try
                 {
-                    System.err.printf("Error while inserting key %s - %s%n", ByteBufferUtil.string(key), getExceptionMessage(e));
+                    client.batch_mutate(record, session.getConsistencyLevel());
+                    success = true;
                 }
-                catch (CharacterCodingException e1)
+                catch (Exception e)
                 {
-                    throw new AssertionError(e1); // keys are valid strings
+                    exceptionMessage = getExceptionMessage(e);
+                    success = false;
                 }
+            }
+
+            if (!success)
+            {
+                System.err.printf("Thread [%d] retried %d times - error inserting key %s %s%n", index,
+                                                                                                session.getRetryTimes(),
+                                                                                                rawKey,
+                                                                                                (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                 if (!session.ignoreErrors())
-                    return;
+                    break;
             }
 
             session.operationCount.getAndIncrement(index);

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java Fri Feb 18 22:52:22 2011
@@ -55,21 +55,32 @@ public class MultiGetter extends Operati
 
                     long start = System.currentTimeMillis();
 
-                    try
+                    boolean success = false;
+                    String exceptionMessage = null;
+
+                    for (int t = 0; t < session.getRetryTimes(); t++)
                     {
-                        results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+                        if (success)
+                            break;
 
-                        if (results.size() == 0)
+                        try
                         {
-                            System.err.printf("Keys %s were not found.%n", keys);
-
-                            if (!session.ignoreErrors())
-                                break;
+                            results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+                            success = (results.size() != 0);
+                        }
+                        catch (Exception e)
+                        {
+                            exceptionMessage = getExceptionMessage(e);
                         }
                     }
-                    catch (Exception e)
+
+                    if (!success)
                     {
-                        System.err.printf("Error on multiget_slice call - %s%n", getExceptionMessage(e));
+                        System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+                                                                                              index,
+                                                                                              session.getRetryTimes(),
+                                                                                              keys,
+                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                         if (!session.ignoreErrors())
                             return;
@@ -93,21 +104,33 @@ public class MultiGetter extends Operati
 
                 long start = System.currentTimeMillis();
 
-                try
+                boolean success = false;
+                String exceptionMessage = null;
+
+                for (int t = 0; t < session.getRetryTimes(); t++)
                 {
-                    results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+                    if (success)
+                        break;
 
-                    if (results.size() == 0)
+                    try
                     {
-                        System.err.printf("Keys %s were not found.%n", keys);
-
-                        if (!session.ignoreErrors())
-                            break;
+                        results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel());
+                        success = (results.size() != 0);
+                    }
+                    catch (Exception e)
+                    {
+                        exceptionMessage = getExceptionMessage(e);
+                        success = false;
                     }
                 }
-                catch (Exception e)
+
+                if (!success)
                 {
-                    System.err.printf("Error on multiget_slice call - %s%n", getExceptionMessage(e));
+                    System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n",
+                                                                                        index,
+                                                                                        session.getRetryTimes(),
+                                                                                        keys,
+                                                                                        (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                     if (!session.ignoreErrors())
                         return;

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java Fri Feb 18 22:52:22 2011
@@ -64,21 +64,31 @@ public class RangeSlicer extends Operati
 
                     long startTime = System.currentTimeMillis();
 
-                    try
-                    {
-                        slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+                    boolean success = false;
+                    String exceptionMessage = null;
 
-                        if (slices.size() == 0)
+                    for (int t = 0; t < session.getRetryTimes(); t++)
+                    {
+                        try
                         {
-                            System.err.printf("Range %s->%s not found in Super Column %s.%n", new String(start), new String(end), superColumnName);
-
-                            if (!session.ignoreErrors())
-                                break;
+                            slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+                            success = (slices.size() != 0);
+                        }
+                        catch (Exception e)
+                        {
+                            exceptionMessage = getExceptionMessage(e);
+                            success = false;
                         }
                     }
-                    catch (Exception e)
+
+                    if (!success)
                     {
-                        System.err.printf("Error while reading Super Column %s - %s%n", superColumnName, getExceptionMessage(e));
+                        System.err.printf("Thread [%d] retried %d times - error on calling get_range_slices for range %s->%s %s%n",
+                                                                                            index,
+                                                                                            session.getRetryTimes(),
+                                                                                            new String(start),
+                                                                                            new String(end),
+                                                                                            (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                         if (!session.ignoreErrors())
                             return;
@@ -107,21 +117,34 @@ public class RangeSlicer extends Operati
 
                 long startTime = System.currentTimeMillis();
 
-                try
+                boolean success = false;
+                String exceptionMessage = null;
+
+                for (int t = 0; t < session.getRetryTimes(); t++)
                 {
-                    slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+                    if (success)
+                        break;
 
-                    if (slices.size() == 0)
+                    try
                     {
-                        System.err.printf("Range %s->%s not found.%n", String.format(format, current), String.format(format, last));
-
-                        if (!session.ignoreErrors())
-                            break;
+                        slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel());
+                        success = (slices.size() != 0);
+                    }
+                    catch (Exception e)
+                    {
+                        exceptionMessage = getExceptionMessage(e);
+                        success = false;
                     }
                 }
-                catch (Exception e)
+
+                if (!success)
                 {
-                    System.err.printf("Error while reading range %s->%s - %s%n", String.format(format, current), String.format(format, last), getExceptionMessage(e));
+                    System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for range %s->%s %s%n",
+                                                                                              index,
+                                                                                              session.getRetryTimes(),
+                                                                                              new String(start),
+                                                                                              new String(end),
+                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                     if (!session.ignoreErrors())
                         return;

Modified: cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java (original)
+++ cassandra/trunk/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java Fri Feb 18 22:52:22 2011
@@ -22,6 +22,7 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import java.io.IOException;
 import java.lang.AssertionError;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
@@ -61,7 +62,8 @@ public class Reader extends OperationThr
     {
         for (int i = 0; i < session.getKeysPerThread(); i++)
         {
-            ByteBuffer key = ByteBuffer.wrap(generateKey());
+            byte[] rawKey = generateKey();
+            ByteBuffer key = ByteBuffer.wrap(rawKey);
 
             for (int j = 0; j < session.getSuperColumns(); j++)
             {
@@ -70,32 +72,36 @@ public class Reader extends OperationThr
 
                 long start = System.currentTimeMillis();
 
-                try
-                {
-                    List<ColumnOrSuperColumn> columns;
-                    columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
+                boolean success = false;
+                String exceptionMessage = null;
 
-                    if (columns.size() == 0)
-                    {
-                        System.err.printf("Key %s not found in Super Column %s.%n", ByteBufferUtil.string(key), superColumn);
-
-                        if (!session.ignoreErrors())
-                            break;
-                    }
-                }
-                catch (Exception e)
+                for (int t = 0; t < session.getRetryTimes(); t++)
                 {
+                    if (success)
+                        break;
+
                     try
                     {
-                        System.err.printf("Error while reading Super Column %s key %s - %s%n", superColumn, ByteBufferUtil.string(key), getExceptionMessage(e));
+                        List<ColumnOrSuperColumn> columns;
+                        columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel());
+                        success = (columns.size() != 0);
                     }
-                    catch (CharacterCodingException e1)
+                    catch (Exception e)
                     {
-                        throw new AssertionError(e1); // keys are valid string
+                        exceptionMessage = getExceptionMessage(e);
+                        success = false;
                     }
+                }
+
+                if (!success)
+                {
+                    System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
+                                                                                                  session.getRetryTimes(),
+                                                                                                  new String(rawKey),
+                                                                                                  (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                     if (!session.ignoreErrors())
-                        break;
+                        return;
                 }
 
                 session.operationCount.getAndIncrement(index);
@@ -116,25 +122,36 @@ public class Reader extends OperationThr
 
             long start = System.currentTimeMillis();
 
-            try
+            boolean success = false;
+            String exceptionMessage = null;
+
+            for (int t = 0; t < session.getRetryTimes(); t++)
             {
-                List<ColumnOrSuperColumn> columns;
-                columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+                if (success)
+                    break;
 
-                if (columns.size() == 0)
+                try
                 {
-                    System.err.println(String.format("Key %s not found.", new String(key)));
-
-                    if (!session.ignoreErrors())
-                        break;
+                    List<ColumnOrSuperColumn> columns;
+                    columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel());
+                    success = (columns.size() != 0);
+                }
+                catch (Exception e)
+                {
+                    exceptionMessage = getExceptionMessage(e);
+                    success = false;
                 }
             }
-            catch (Exception e)
+
+            if (!success)
             {
-                System.err.printf("Error while reading key %s - %s%n", new String(key), getExceptionMessage(e));
+                System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index,
+                                                                                              session.getRetryTimes(),
+                                                                                              new String(key),
+                                                                                              (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")");
 
                 if (!session.ignoreErrors())
-                    break;
+                    return;
             }
 
             session.operationCount.getAndIncrement(index);

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071070
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1071868
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071070
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1071868
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071070
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1071868
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071070
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1071868
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Feb 18 22:52:22 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071070
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1071683,1071867-1071868
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1071777
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1071868
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliMain.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliMain.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliMain.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliMain.java Fri Feb 18 22:52:22 2011
@@ -90,32 +90,6 @@ public class CliMain
         thriftClient = cassandraClient;
         cliClient = new CliClient(sessionState, thriftClient);
         
-        if (sessionState.keyspace != null)
-        {
-            try
-            {
-                sessionState.keyspace = CliCompiler.getKeySpace(sessionState.keyspace, thriftClient.describe_keyspaces());;
-                thriftClient.set_keyspace(sessionState.keyspace);
-                cliClient.setKeySpace(sessionState.keyspace);
-                updateCompletor(CliUtils.getCfNamesByKeySpace(cliClient.getKSMetaData(sessionState.keyspace)));
-            }
-            catch (InvalidRequestException e)
-            {
-                sessionState.err.println("Keyspace " + sessionState.keyspace + " not found");
-                return;
-            }
-            catch (TException e)
-            {
-                sessionState.err.println("Did you specify 'keyspace'?");
-                return;
-            }
-            catch (NotFoundException e)
-            {
-                sessionState.err.println("Keyspace " + sessionState.keyspace + " not found");
-                return;
-            }
-        }
-
         if ((sessionState.username != null) && (sessionState.password != null))
         {
             // Authenticate
@@ -149,6 +123,32 @@ public class CliMain
             }
         }
         
+        if (sessionState.keyspace != null)
+        {
+            try
+            {
+                sessionState.keyspace = CliCompiler.getKeySpace(sessionState.keyspace, thriftClient.describe_keyspaces());;
+                thriftClient.set_keyspace(sessionState.keyspace);
+                cliClient.setKeySpace(sessionState.keyspace);
+                updateCompletor(CliUtils.getCfNamesByKeySpace(cliClient.getKSMetaData(sessionState.keyspace)));
+            }
+            catch (InvalidRequestException e)
+            {
+                sessionState.err.println("Keyspace " + sessionState.keyspace + " not found");
+                return;
+            }
+            catch (TException e)
+            {
+                sessionState.err.println("Did you specify 'keyspace'?");
+                return;
+            }
+            catch (NotFoundException e)
+            {
+                sessionState.err.println("Keyspace " + sessionState.keyspace + " not found");
+                return;
+            }
+        }
+
         // Lookup the cluster name, this is to make it clear which cluster the user is connected to
         String clusterName;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Fri Feb 18 22:52:22 2011
@@ -110,6 +110,7 @@ public class Config
     public Double reduce_cache_sizes_at = 1.0;
     public double reduce_cache_capacity_to = 0.6;
     public int hinted_handoff_throttle_delay_in_ms = 0;
+    public boolean compaction_preheat_key_cache = true;
 
     public static enum CommitLogSync {
         periodic,

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Feb 18 22:52:22 2011
@@ -1196,4 +1196,9 @@ public class DatabaseDescriptor
     {
         return conf.hinted_handoff_throttle_delay_in_ms;
     }
+
+    public static boolean getPreheatKeyCache()
+    {
+        return conf.compaction_preheat_key_cache;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Fri Feb 18 22:52:22 2011
@@ -431,12 +431,15 @@ public class CompactionManager implement
                 long position = writer.append(row);
                 totalkeysWritten++;
 
-                for (SSTableReader sstable : sstables)
+                if (DatabaseDescriptor.getPreheatKeyCache())
                 {
-                    if (sstable.getCachedPosition(row.key) != null)
+                    for (SSTableReader sstable : sstables)
                     {
-                        cachedKeys.put(row.key, position);
-                        break;
+                        if (sstable.getCachedPosition(row.key) != null)
+                        {
+                            cachedKeys.put(row.key, position);
+                            break;
+                        }
                     }
                 }
             }
@@ -448,7 +451,7 @@ public class CompactionManager implement
 
         SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables));
         cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
-        for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet())
+        for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off
             ssTable.cacheKey(entry.getKey(), entry.getValue());
         submitMinorIfNeeded(cfs);
 

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1072184&r1=1072183&r2=1072184&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Fri Feb 18 22:52:22 2011
@@ -28,6 +28,7 @@ import java.util.*;
 
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.IFilter;
@@ -155,4 +156,49 @@ public class StreamingTransferTest exten
         assert null != cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test"), new QueryPath("Standard1")));
         assert null != cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer1"), new QueryPath("Standard1")));
     }
+
+    @Test
+    public void testTransferOfMultipleColumnFamilies() throws Exception
+    {
+        String keyspace = "Keyspace1";
+        IPartitioner p = StorageService.getPartitioner();
+        String[] columnFamilies = new String[] { "Standard1", "Standard2", "Standard3" };
+        List<SSTableReader> ssTableReaders = new ArrayList<SSTableReader>();
+
+        // ranges to transfer
+        List<Range> ranges = new ArrayList<Range>();
+
+        for (String cf : columnFamilies)
+        {
+            Set<String> content = new HashSet<String>();
+
+            content.add("data-" + cf + "-1");
+            content.add("data-" + cf + "-2");
+            content.add("data-" + cf + "-3");
+
+            SSTableUtils.Context context = SSTableUtils.prepare().ks(keyspace).cf(cf);
+
+            ssTableReaders.add(context.write(content));
+            ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("data-" + cf + "-3"))));
+        }
+
+        StreamOutSession session = StreamOutSession.create(keyspace, LOCAL, null);
+        StreamOut.transferSSTables(session, ssTableReaders, ranges);
+
+        session.await();
+
+        for (String cf : columnFamilies)
+        {
+            ColumnFamilyStore store = Table.open(keyspace).getColumnFamilyStore(cf);
+            List<Row> rows = Util.getRangeSlice(store);
+
+            assert rows.size() >= 3;
+
+            for (int i = 0; i < 3; i++)
+            {
+                String expectedKey = "data-" + cf + "-" + (i + 1);
+                assertEquals(p.decorateKey(ByteBufferUtil.bytes(expectedKey)), rows.get(i).key);
+            }
+        }
+    }
 }