You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/03/22 20:36:19 UTC

svn commit: r1084315 - in /cassandra/trunk: ./ conf/ contrib/ contrib/pig/ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/cache/ src/java/org/apache/cassandra/c...

Author: jbellis
Date: Tue Mar 22 19:36:18 2011
New Revision: 1084315

URL: http://svn.apache.org/viewvc?rev=1084315&view=rev
Log:
merge from 0.7

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/schema-sample.txt
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/contrib/pig/README.txt
    cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java
    cassandra/trunk/src/java/org/apache/cassandra/cli/Cli.g
    cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
    cassandra/trunk/src/java/org/apache/cassandra/cli/CliCompleter.java
    cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
    cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
    cassandra/trunk/test/system/__init__.py
    cassandra/trunk/test/system/test_thrift_server.py
    cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914
-/cassandra/branches/cassandra-0.7:1026516-1082796
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
+/cassandra/branches/cassandra-0.7:1026516-1084291
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Mar 22 19:36:18 2011
@@ -22,6 +22,12 @@
  * queue secondary indexes for flush before the parent (CASSANDRA-2330)
  * shut down server for OOM on a Thrift thread (CASSANDRA-2269)
  * reduce contention on Table.flusherLock (CASSANDRA-1954)
+ * fix comparator used for non-indexed secondary expressions in index scan
+   (CASSANDRA-2347)
+ * ensure size calculation and write phase of large-row compaction use
+   the same threshold for TTL expiration (CASSANDRA-2349)
+ * fix race when iterating CFs during add/drop (CASSANDRA-2350)
+ * add ConsistencyLevel command to CLI (CASSANDRA-2354)
 
 
 0.7.4

Modified: cassandra/trunk/conf/schema-sample.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/schema-sample.txt?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/conf/schema-sample.txt (original)
+++ cassandra/trunk/conf/schema-sample.txt Tue Mar 22 19:36:18 2011
@@ -1,7 +1,7 @@
 /*This file contains an example Keyspace that can be created using the
 cassandra-cli command line interface as follows.
 
-bin/cassandra-cli -host localhost --file conf/Keyspace1.txt
+bin/cassandra-cli -host localhost --file conf/schema-sample.txt
 
 The cassandra-cli includes online help that explains the statements below. You can
 accessed the help without connecting to a running cassandra instance by starting the

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1082796
+/cassandra/branches/cassandra-0.7/contrib:1026516-1084291
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Modified: cassandra/trunk/contrib/pig/README.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/README.txt?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/README.txt (original)
+++ cassandra/trunk/contrib/pig/README.txt Tue Mar 22 19:36:18 2011
@@ -47,6 +47,11 @@ grunt> orderednames = ORDER namecounts B
 grunt> topnames = LIMIT orderednames 50;
 grunt> dump topnames;
 
+Slices on columns can also be specified:
+grunt> rows = LOAD 'cassandra://Keyspace1/Standard1&slice_start=C2&slice_end=C4&i&limit=1&reversed=true' USING CassandraStorage();
+
+Binary values for slice_start and slice_end can be escaped such as '\u0255'
+
 Outputting to Cassandra requires the same format from input, so the simplest example is:
 
 grunt> rows = LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage();

Modified: cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Tue Mar 22 19:36:18 2011
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -60,10 +61,16 @@ public class CassandraStorage extends Lo
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = LogFactory.getLog(CassandraStorage.class);
 
+    private ByteBuffer slice_start = BOUND;
+    private ByteBuffer slice_end = BOUND;
+    private boolean slice_reverse = false;
+    private String keyspace;
+    private String column_family;
+
     private Configuration conf;
     private RecordReader reader;
     private RecordWriter writer;
-    private final int limit;
+    private int limit;
 
     public CassandraStorage() 
     { 
@@ -149,7 +156,7 @@ public class CassandraStorage extends Lo
         this.reader = reader;
     }
 
-    private String[] parseLocation(String location) throws IOException
+    private void setLocationFromUri(String location) throws IOException
     {
         // parse uri into keyspace and columnfamily
         String names[];
@@ -157,14 +164,30 @@ public class CassandraStorage extends Lo
         {
             if (!location.startsWith("cassandra://"))
                 throw new Exception("Bad scheme.");
-            String[] parts = location.split("/+");
-            names = new String[]{ parts[1], parts[2] };
+            String[] urlParts = location.split("\\?");
+            if (urlParts.length > 1)
+            {
+                for (String param : urlParts[1].split("&"))
+                {
+                    String[] pair = param.split("=");
+                    if (pair[0].equals("slice_start"))
+                        slice_start = ByteBufferUtil.bytes(pair[1]);
+                    else if (pair[0].equals("slice_end"))
+                        slice_end = ByteBufferUtil.bytes(pair[1]);
+                    else if (pair[0].equals("reversed"))
+                        slice_reverse = Boolean.parseBoolean(pair[1]);
+                    else if (pair[0].equals("limit"))
+                        limit = Integer.parseInt(pair[1]);
+                }
+            }
+            String[] parts = urlParts[0].split("/+");
+            keyspace = parts[1];
+            column_family = parts[2];
         }
         catch (Exception e)
         {
-            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>': " + e.getMessage());
+            throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage());
         }
-       return names;
     }
 
     private void setConnectionInformation() throws IOException
@@ -186,12 +209,15 @@ public class CassandraStorage extends Lo
     @Override
     public void setLocation(String location, Job job) throws IOException
     {
-        SliceRange range = new SliceRange(BOUND, BOUND, false, limit);
-        SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
         conf = job.getConfiguration();
-        ConfigHelper.setInputSlicePredicate(conf, predicate);
-        String[] names = parseLocation(location);
-        ConfigHelper.setInputColumnFamily(conf, names[0], names[1]);
+        setLocationFromUri(location);
+        if (ConfigHelper.getRawInputSlicePredicate(conf) == null)
+        {
+            SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit);
+            SlicePredicate predicate = new SlicePredicate().setSlice_range(range);
+            ConfigHelper.setInputSlicePredicate(conf, predicate);
+        }
+        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
     }
 
@@ -214,8 +240,8 @@ public class CassandraStorage extends Lo
     public void setStoreLocation(String location, Job job) throws IOException
     {
         conf = job.getConfiguration();
-        String[] names = parseLocation(location);
-        ConfigHelper.setOutputColumnFamily(conf, names[0], names[1]);
+        setLocationFromUri(location);
+        ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
     }
 

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1082796
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1084291
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1082796
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1084291
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1082796
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1084291
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1082796
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1084291
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 22 19:36:18 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1082796
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1084291
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cache/InstrumentedCache.java Tue Mar 22 19:36:18 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.cache;
  */
 
 
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -139,4 +140,9 @@ public class InstrumentedCache<K, V>
     {
         return map.keySet();
     }
+
+    public Set<Map.Entry<K, V>> getEntrySet()
+    {
+        return map.entrySet();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/Cli.g
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/Cli.g?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/Cli.g (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/Cli.g Tue Mar 22 19:36:18 2011
@@ -58,6 +58,7 @@ tokens {
     NODE_LIST;
     NODE_TRUNCATE;
     NODE_ASSUME;
+    NODE_CONSISTENCY_LEVEL;
 
     // Internal Nodes.
     NODE_COLUMN_ACCESS;
@@ -155,6 +156,7 @@ statement
     | listStatement
     | truncateStatement
     | assumeStatement
+    | consistencyLevelStatement
     | -> ^(NODE_NO_OP)
     ;
 
@@ -212,6 +214,8 @@ helpStatement
         -> ^(NODE_HELP NODE_TRUNCATE)
     | HELP ASSUME
         -> ^(NODE_HELP NODE_ASSUME)
+    | HELP CONSISTENCYLEVEL
+        -> ^(NODE_HELP NODE_CONSISTENCY_LEVEL)
     | HELP 
         -> ^(NODE_HELP)
     | '?'    
@@ -279,6 +283,11 @@ assumeStatement
         -> ^(NODE_ASSUME columnFamily $assumptionElement $defaultType)
     ;
 
+consistencyLevelStatement
+    : CONSISTENCYLEVEL 'AS' defaultType=Identifier
+        -> ^(NODE_CONSISTENCY_LEVEL $defaultType)
+    ;
+
 showClusterName
     : SHOW 'CLUSTER NAME'
         -> ^(NODE_SHOW_CLUSTER_NAME)
@@ -533,6 +542,7 @@ LIMIT:       'LIMIT';
 TRUNCATE:    'TRUNCATE';
 ASSUME:      'ASSUME';
 TTL:         'TTL';
+CONSISTENCYLEVEL:   'CONSISTENCYLEVEL';
 
 IP_ADDRESS 
     : IntegerLiteral '.' IntegerLiteral '.' IntegerLiteral '.' IntegerLiteral

Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliClient.java Tue Mar 22 19:36:18 2011
@@ -106,8 +106,8 @@ public class CliClient extends CliUserHe
     private String username = null;
     private Map<String, KsDef> keyspacesMap = new HashMap<String, KsDef>();
     private Map<String, AbstractType> cfKeysComparators;
-    
-    
+    private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;   
+ 
     public CliClient(CliSessionState cliSessionState, Cassandra.Client thriftClient)
     {
         this.sessionState = cliSessionState;
@@ -193,6 +193,9 @@ public class CliClient extends CliUserHe
                 case CliParser.NODE_ASSUME:
                     executeAssumeStatement(tree);
                     break;
+                case CliParser.NODE_CONSISTENCY_LEVEL:
+                    executeConsistencyLevelStatement(tree);
+                    break;
                 case CliParser.NODE_NO_OP:
                     // comment lines come here; they are treated as no ops.
                     break;
@@ -256,7 +259,7 @@ public class CliClient extends CliUserHe
         SliceRange range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);
         SlicePredicate predicate = new SlicePredicate().setColumn_names(null).setSlice_range(range);
 
-        int count = thriftClient.get_count(ByteBuffer.wrap(key.getBytes(Charsets.UTF_8)), colParent, predicate, ConsistencyLevel.ONE);
+        int count = thriftClient.get_count(ByteBuffer.wrap(key.getBytes(Charsets.UTF_8)), colParent, predicate, consistencyLevel);
         sessionState.out.printf("%d columns%n", count);
     }
     
@@ -306,7 +309,7 @@ public class CliClient extends CliUserHe
             path.setColumn(columnName);
 
         thriftClient.remove(ByteBuffer.wrap(key.getBytes(Charsets.UTF_8)), path,
-                             FBUtilities.timestampMicros(), ConsistencyLevel.ONE);
+                             FBUtilities.timestampMicros(), consistencyLevel);
         sessionState.out.println(String.format("%s removed.", (columnSpecCnt == 0) ? "row" : "column"));
     }
 
@@ -319,7 +322,7 @@ public class CliClient extends CliUserHe
             parent.setSuper_column(superColumnName);
 
         SliceRange range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1000000);
-        List<ColumnOrSuperColumn> columns = thriftClient.get_slice(key, parent, new SlicePredicate().setColumn_names(null).setSlice_range(range), ConsistencyLevel.ONE);
+        List<ColumnOrSuperColumn> columns = thriftClient.get_slice(key, parent, new SlicePredicate().setColumn_names(null).setSlice_range(range), consistencyLevel);
 
         AbstractType validator;
         CfDef cfDef = getCfDef(columnFamily);
@@ -445,7 +448,7 @@ public class CliClient extends CliUserHe
         Column column;
         try
         {
-            column = thriftClient.get(key, path, ConsistencyLevel.ONE).column;
+            column = thriftClient.get(key, path, consistencyLevel).column;
         }
         catch (NotFoundException e)
         {
@@ -571,7 +574,7 @@ public class CliClient extends CliUserHe
         try
         {
             ColumnParent parent = new ColumnParent(columnFamily);
-            slices = thriftClient.get_indexed_slices(parent, clause, predicate, ConsistencyLevel.ONE);
+            slices = thriftClient.get_indexed_slices(parent, clause, predicate, consistencyLevel);
             printSliceList(columnFamilyDef, slices);
         }
         catch (InvalidRequestException e)
@@ -668,7 +671,7 @@ public class CliClient extends CliUserHe
         }
 
         // do the insert
-        thriftClient.insert(getKeyAsBytes(columnFamily, keyTree), parent, columnToInsert, ConsistencyLevel.ONE);
+        thriftClient.insert(getKeyAsBytes(columnFamily, keyTree), parent, columnToInsert, consistencyLevel);
         sessionState.out.println("Value inserted.");
     }
 
@@ -1061,7 +1064,7 @@ public class CliClient extends CliUserHe
         range.setStart_key(startKey).setEnd_key(endKey);
 
         ColumnParent columnParent = new ColumnParent(columnFamily);
-        List<KeySlice> keySlices = thriftClient.get_range_slices(columnParent, predicate, range, ConsistencyLevel.ONE);
+        List<KeySlice> keySlices = thriftClient.get_range_slices(columnParent, predicate, range, consistencyLevel);
         printSliceList(columnFamilyDef, keySlices);
     }
 
@@ -1090,6 +1093,32 @@ public class CliClient extends CliUserHe
     }
 
     /**
+     * Command: CONSISTENCYLEVEL AS (ONE | QUORUM ...)
+     * Tree: ^(NODE_CONSISTENCY_LEVEL AS (ONE | QUORUM ...))
+     * @param statement - tree representing current statement
+     */
+    private void executeConsistencyLevelStatement(Tree statement)
+    {
+        if (!CliMain.isConnected())
+            return;
+
+        String userSuppliedLevel = statement.getChild(0).getText().toUpperCase();
+
+        try
+        {
+            consistencyLevel = ConsistencyLevel.valueOf(userSuppliedLevel);
+        }
+        catch (IllegalArgumentException e)
+        {
+            String elements = "ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, ANY";
+            sessionState.out.println(String.format("'%s' is invalid. Available: %s", userSuppliedLevel, elements));
+            return;
+        }
+
+        sessionState.out.println(String.format("Consistency level is set to '%s'.", consistencyLevel));
+    }
+
+    /**
      * Command: ASSUME <columnFamily> (VALIDATOR | COMPARATOR | KEYS | SUB_COMPARATOR) AS <type>
      * Tree: ^(NODE_ASSUME <columnFamily> (VALIDATOR | COMPARATOR | KEYS | SUB_COMPARATOR) <type>))
      * @param statement - tree representing current statement

Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliCompleter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliCompleter.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliCompleter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliCompleter.java Tue Mar 22 19:36:18 2011
@@ -36,6 +36,7 @@ public class CliCompleter extends Simple
             "drop column family",
             "rename keyspace",
             "rename column family",
+            "consistencylevel",
             
             "help connect",
             "help describe keyspace",
@@ -56,7 +57,8 @@ public class CliCompleter extends Simple
             "help del",
             "help count",
             "help list",
-            "help truncate"
+            "help truncate",
+            "help consistencylevel"
     };
     private static String[] keyspaceCommands = {
             "get",

Modified: cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cli/CliUserHelp.java Tue Mar 22 19:36:18 2011
@@ -327,7 +327,11 @@ public class CliUserHelp {
                 state.out.println("example:");
                 state.out.println("assume Users comparator as lexicaluuid;");
                 break;
-
+            case CliParser.NODE_CONSISTENCY_LEVEL:
+                state.out.println("consistencylevel as <level>");
+                state.out.println("example:");
+                state.out.println("consistencylevel as QUORUM");
+                break;
             default:
                 state.out.println("?");
                 break;
@@ -374,6 +378,8 @@ public class CliUserHelp {
             state.out.println("truncate <column_family>;                      Truncate specified column family.");
             state.out.println("assume <column_family> <attribute> as <type>;");
             state.out.println("              Assume a given column family attributes to match a specified type.");
+            state.out.println("consistencylevel as <level>;");
+            state.out.println("                  Change the consistency level for set,get, and list operations.");
             state.out.println("list <cf>;                                   List all rows in the column family.");
             state.out.println("list <cf>[<startKey>:];");
             state.out.println("                       List rows in the column family beginning with <startKey>.");

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Tue Mar 22 19:36:18 2011
@@ -37,7 +37,6 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.marshal.AbstractCommutativeType;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.IColumnSerializer;
-import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.IIterableColumns;
 import org.apache.cassandra.utils.FBUtilities;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Tue Mar 22 19:36:18 2011
@@ -132,7 +132,7 @@ public class ColumnFamilySerializer impl
         ColumnFamilyStore interner = intern ? Table.open(CFMetaData.getCF(cf.id()).left).getColumnFamilyStore(cf.id()) : null;
         for (int i = 0; i < size; ++i)
         {
-            IColumn column = cf.getColumnSerializer().deserialize(dis, interner, fromRemote);
+            IColumn column = cf.getColumnSerializer().deserialize(dis, interner, fromRemote, (int) (System.currentTimeMillis() / 1000));
             cf.addColumn(column);
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Mar 22 19:36:18 2011
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.cache.AutoSavingKeyCache;
 import org.apache.cassandra.cache.AutoSavingRowCache;
+import org.apache.cassandra.cache.JMXInstrumentedCache;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
@@ -1624,7 +1625,7 @@ public class ColumnFamilyStore implement
             IColumn column = data.getColumn(expression.column_name);
             if (column == null)
                 return false;
-            int v = data.getComparator().compare(column.value(), expression.value);
+            int v = data.metadata().getValueValidator(expression.column_name).compare(column.value(), expression.value);
             if (!satisfies(v, expression.op))
                 return false;
         }
@@ -1769,6 +1770,16 @@ public class ColumnFamilyStore implement
         return keyCache.getSize();
     }
 
+    public JMXInstrumentedCache<DecoratedKey, ColumnFamily> getRowCache()
+    {
+        return ssTables.getRowCache();
+    }
+
+    public JMXInstrumentedCache<Pair<Descriptor, DecoratedKey>, Long> getKeyCache()
+    {
+        return ssTables.getKeyCache();
+    }
+
     public static Iterable<ColumnFamilyStore> all()
     {
         Iterable<ColumnFamilyStore>[] stores = new Iterable[DatabaseDescriptor.getTables().size()];

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Tue Mar 22 19:36:18 2011
@@ -25,14 +25,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ColumnSerializer implements IColumnSerializer
@@ -81,6 +79,11 @@ public class ColumnSerializer implements
      */
     public Column deserialize(DataInput dis, ColumnFamilyStore interner, boolean fromRemote) throws IOException
     {
+        return deserialize(dis, interner, fromRemote, (int) (System.currentTimeMillis() / 1000));
+    }
+
+    public Column deserialize(DataInput dis, ColumnFamilyStore interner, boolean fromRemote, int expireBefore) throws IOException
+    {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         if (name.remaining() <= 0)
             throw new CorruptColumnException("invalid column name length " + name.remaining());
@@ -103,7 +106,7 @@ public class ColumnSerializer implements
             int expiration = dis.readInt();
             long ts = dis.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(dis);
-            if ((int) (System.currentTimeMillis() / 1000 ) > expiration)
+            if (expiration < expireBefore)
             {
                 // the column is now expired, we can safely return a simple
                 // tombstone

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Tue Mar 22 19:36:18 2011
@@ -375,8 +375,7 @@ public class CompactionManager implement
     {
         for (String ksname : DatabaseDescriptor.getNonSystemTables())
         {
-            Table ks = Table.open(ksname);
-            for (ColumnFamilyStore cfs : ks.columnFamilyStores.values())
+            for (ColumnFamilyStore cfs : Table.open(ksname).getColumnFamilyStores())
                 cfs.disableAutoCompaction();
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Tue Mar 22 19:36:18 2011
@@ -371,6 +371,11 @@ class SuperColumnSerializer implements I
 
     public IColumn deserialize(DataInput dis, ColumnFamilyStore interner, boolean fromRemote) throws IOException
     {
+        return deserialize(dis, interner, fromRemote, (int)(System.currentTimeMillis() / 1000));
+    }
+
+    public IColumn deserialize(DataInput dis, ColumnFamilyStore interner, boolean fromRemote, int expireBefore) throws IOException
+    {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         int localDeleteTime = dis.readInt();
         if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
@@ -382,7 +387,7 @@ class SuperColumnSerializer implements I
         /* read the number of columns */
         int size = dis.readInt();
         ColumnSerializer serializer = Column.serializer();
-        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, interner, size, fromRemote);
+        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, interner, size, fromRemote, expireBefore);
         SuperColumn superColumn = new SuperColumn(name, new ConcurrentSkipListMap<ByteBuffer,IColumn>(preSortedMap));
         if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Mar 22 19:36:18 2011
@@ -23,10 +23,7 @@ import java.io.IOError;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -91,7 +88,7 @@ public class Table
     /* Table name. */
     public final String name;
     /* ColumnFamilyStore per column family */
-    public final Map<Integer, ColumnFamilyStore> columnFamilyStores = new HashMap<Integer, ColumnFamilyStore>(); // TODO make private again
+    private final Map<Integer, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<Integer, ColumnFamilyStore>();
     private final Object[] indexLocks;
     private ScheduledFuture<?> flushTask;
     private volatile AbstractReplicationStrategy replicationStrategy;

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Tue Mar 22 19:36:18 2011
@@ -159,6 +159,11 @@ public class ConfigHelper
         return predicateFromString(conf.get(INPUT_PREDICATE_CONFIG));
     }
 
+    public static String getRawInputSlicePredicate(Configuration conf)
+    {
+        return conf.get(INPUT_PREDICATE_CONFIG);
+    }
+
     private static String predicateToString(SlicePredicate predicate)
     {
         assert predicate != null;

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java Tue Mar 22 19:36:18 2011
@@ -28,5 +28,5 @@ import org.apache.cassandra.db.IColumn;
 
 public interface IColumnSerializer extends ICompactSerializer2<IColumn>
 {
-    public IColumn deserialize(DataInput in, ColumnFamilyStore interner, boolean fromRemote) throws IOException;
+    public IColumn deserialize(DataInput in, ColumnFamilyStore interner, boolean fromRemote, int expireBefore) throws IOException;
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Tue Mar 22 19:36:18 2011
@@ -52,6 +52,9 @@ public class SSTableIdentityIterator imp
     public final int columnCount;
     private final long columnPosition;
 
+    // Used by lazilyCompactedRow, so that we see the same things when deserializing the first and second time
+    private final int expireBefore;
+
     /**
      * Used to iterate through the columns of a row.
      * @param sstable SSTable we are reading ffrom.
@@ -75,6 +78,7 @@ public class SSTableIdentityIterator imp
         this.key = key;
         this.dataStart = dataStart;
         this.dataSize = dataSize;
+        this.expireBefore = (int)(System.currentTimeMillis() / 1000);
         finishedAt = dataStart + dataSize;
 
         try
@@ -137,7 +141,7 @@ public class SSTableIdentityIterator imp
     {
         try
         {
-            return sstable.getColumnSerializer().deserialize(file);
+            return sstable.getColumnSerializer().deserialize(file, null, false, expireBefore);
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Tue Mar 22 19:36:18 2011
@@ -39,7 +39,7 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
@@ -575,7 +575,7 @@ public class SSTableReader extends SSTab
         return ColumnFamily.create(metadata);
     }
 
-    public ICompactSerializer2<IColumn> getColumnSerializer()
+    public IColumnSerializer getColumnSerializer()
     {
         return metadata.cfType == ColumnFamilyType.Standard
                ? Column.serializer()

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java Tue Mar 22 19:36:18 2011
@@ -45,8 +45,9 @@ public class ColumnSortedMap implements 
     private final int length;
     private final ColumnFamilyStore interner;
     private final boolean fromRemote;
+    private final int expireBefore;
 
-    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote)
+    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote, int expireBefore)
     {
         this.comparator = comparator;
         this.serializer = serializer;
@@ -54,6 +55,7 @@ public class ColumnSortedMap implements 
         this.dis = dis;
         this.length = length;
         this.fromRemote = fromRemote;
+        this.expireBefore = expireBefore;
     }
 
     public int size()
@@ -143,7 +145,7 @@ public class ColumnSortedMap implements 
 
     public Set<Map.Entry<ByteBuffer, IColumn>> entrySet()
     {
-        return new ColumnSet(serializer, dis, interner, length, fromRemote);
+        return new ColumnSet(serializer, dis, interner, length, fromRemote, expireBefore);
     }
 }
 
@@ -154,14 +156,16 @@ class ColumnSet implements Set<Map.Entry
     private final int length;
     private final ColumnFamilyStore interner;
     private boolean fromRemote;
+    private final int expireBefore;
 
-    public ColumnSet(ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote)
+    public ColumnSet(ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote, int expireBefore)
     {
         this.serializer = serializer;
         this.dis = dis;
         this.interner = interner;
         this.length = length;
         this.fromRemote = fromRemote;
+        this.expireBefore = expireBefore;
     }
 
     public int size()
@@ -181,7 +185,7 @@ class ColumnSet implements Set<Map.Entry
 
     public Iterator<Entry<ByteBuffer, IColumn>> iterator()
     {
-        return new ColumnIterator(serializer, dis, interner, length, fromRemote);
+        return new ColumnIterator(serializer, dis, interner, length, fromRemote, expireBefore);
     }
 
     public Object[] toArray()
@@ -237,14 +241,16 @@ class ColumnIterator implements Iterator
     private final boolean fromRemote;
     private int count = 0;
     private ColumnFamilyStore interner;
+    private final int expireBefore;
 
-    public ColumnIterator(ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote)
+    public ColumnIterator(ColumnSerializer serializer, DataInput dis, ColumnFamilyStore interner, int length, boolean fromRemote, int expireBefore)
     {
         this.dis = dis;
         this.serializer = serializer;
         this.interner = interner;
         this.length = length;
         this.fromRemote = fromRemote;
+        this.expireBefore = expireBefore;
     }
 
     private IColumn deserializeNext()
@@ -252,7 +258,7 @@ class ColumnIterator implements Iterator
         try
         {
             count++;
-            return serializer.deserialize(dis, interner, fromRemote);
+            return serializer.deserialize(dis, interner, fromRemote, expireBefore);
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/test/system/__init__.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/__init__.py?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/test/system/__init__.py (original)
+++ cassandra/trunk/test/system/__init__.py Tue Mar 22 19:36:18 2011
@@ -161,8 +161,9 @@ class ThriftTester(BaseTester):
             Cassandra.CfDef('Keyspace1', 'Super4', column_type='Super', subcomparator_type='UTF8Type'),
             Cassandra.CfDef('Keyspace1', 'Counter1', default_validation_class='CounterColumnType'),
             Cassandra.CfDef('Keyspace1', 'SuperCounter1', column_type='Super', default_validation_class='CounterColumnType'),
-            Cassandra.CfDef('Keyspace1', 'Indexed1', column_metadata=[Cassandra.ColumnDef('birthdate', 'LongType', Cassandra.IndexType.KEYS, 'birthdate')]),
-            Cassandra.CfDef('Keyspace1', 'Indexed2', comparator_type='TimeUUIDType', column_metadata=[Cassandra.ColumnDef(uuid.UUID('00000000-0000-1000-0000-000000000000').bytes, 'LongType', Cassandra.IndexType.KEYS, 'birthdate')]),
+            Cassandra.CfDef('Keyspace1', 'Indexed1', column_metadata=[Cassandra.ColumnDef('birthdate', 'LongType', Cassandra.IndexType.KEYS, 'birthdate_index')]),
+            Cassandra.CfDef('Keyspace1', 'Indexed2', comparator_type='TimeUUIDType', column_metadata=[Cassandra.ColumnDef(uuid.UUID('00000000-0000-1000-0000-000000000000').bytes, 'LongType', Cassandra.IndexType.KEYS)]),
+            Cassandra.CfDef('Keyspace1', 'Indexed3', comparator_type='TimeUUIDType', column_metadata=[Cassandra.ColumnDef(uuid.UUID('00000000-0000-1000-0000-000000000000').bytes, 'UTF8Type', Cassandra.IndexType.KEYS)]),
 
         ])
 

Modified: cassandra/trunk/test/system/test_thrift_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Tue Mar 22 19:36:18 2011
@@ -1767,7 +1767,22 @@ class TestMutations(ThriftTester):
         assert result[0].key == 'key3'
         assert len(result[0].columns) == 2, result[0].columns
         
-        cp = ColumnParent('Indexed2')
+    def test_index_scan_uuid_names(self):
+        _set_keyspace('Keyspace1')
+        sp = SlicePredicate(slice_range=SliceRange('', ''))
+        cp = ColumnParent('Indexed3') # timeuuid name, utf8 values
+        u = uuid.UUID('00000000-0000-1000-0000-000000000000').bytes
+        u2 = uuid.UUID('00000000-0000-1000-0000-000000000001').bytes
+        client.insert('key1', ColumnParent('Indexed3'), Column(u, 'a', 0), ConsistencyLevel.ONE)
+        client.insert('key1', ColumnParent('Indexed3'), Column(u2, 'b', 0), ConsistencyLevel.ONE)
+        # name comparator + data validator of incompatible types -- see CASSANDRA-2347
+        clause = IndexClause([IndexExpression(u, IndexOperator.EQ, 'a'),
+                              IndexExpression(u2, IndexOperator.EQ, 'b')], '')
+        result = client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE)
+        assert len(result) == 1, result
+
+        cp = ColumnParent('Indexed2') # timeuuid name, long values
+
         # name must be valid (TimeUUID)
         clause = IndexClause([IndexExpression('foo', IndexOperator.EQ, uuid.UUID('00000000-0000-1000-0000-000000000000').bytes)], '')
         _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp, ConsistencyLevel.ONE), InvalidRequestException)

Modified: cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java Tue Mar 22 19:36:18 2011
@@ -20,15 +20,21 @@ package org.apache.cassandra;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.util.FileUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class CleanupHelper extends SchemaLoader
 {
@@ -76,4 +82,30 @@ public class CleanupHelper extends Schem
             throw new RuntimeException(e);
         }
     }
+
+    protected void insertData(String keyspace, String columnFamily, int offset, int numberOfRows) throws IOException
+    {
+        for (int i = offset; i < offset + numberOfRows; i++)
+        {
+            ByteBuffer key = ByteBufferUtil.bytes("key" + i);
+            RowMutation rowMutation = new RowMutation(keyspace, key);
+            QueryPath path = new QueryPath(columnFamily, null, ByteBufferUtil.bytes("col" + i));
+
+            rowMutation.add(path, ByteBufferUtil.bytes("val" + i), System.currentTimeMillis());
+            rowMutation.applyUnsafe();
+        }
+    }
+
+    /* usually used to populate the cache */
+    protected void readData(String keyspace, String columnFamily, int offset, int numberOfRows) throws IOException
+    {
+        ColumnFamilyStore store = Table.open(keyspace).getColumnFamilyStore(columnFamily);
+        for (int i = offset; i < offset + numberOfRows; i++)
+        {
+            DecoratedKey key = Util.dk("key" + i);
+            QueryPath path = new QueryPath(columnFamily, null, ByteBufferUtil.bytes("col" + i));
+
+            store.getColumnFamily(key, path, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1);
+        }
+    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java Tue Mar 22 19:36:18 2011
@@ -22,6 +22,8 @@ package org.apache.cassandra.db;
 
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 import org.junit.Test;
@@ -29,22 +31,71 @@ import org.junit.Test;
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
 
 public class KeyCacheTest extends CleanupHelper
 {
     private static final String TABLE1 = "KeyCacheSpace";
+    private static final String COLUMN_FAMILY1 = "Standard1";
+    private static final String COLUMN_FAMILY2 = "Standard2";
 
     @Test
     public void testKeyCache50() throws IOException, ExecutionException, InterruptedException
     {
-        testKeyCache("Standard1", 64);
+        testKeyCache(COLUMN_FAMILY1, 64);
     }
 
     @Test
     public void testKeyCache100() throws IOException, ExecutionException, InterruptedException
     {
-        testKeyCache("Standard2", 128);
+        testKeyCache(COLUMN_FAMILY2, 128);
+    }
+
+    @Test
+    public void testKeyCacheLoad() throws Exception
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        ColumnFamilyStore store = Table.open(TABLE1).getColumnFamilyStore(COLUMN_FAMILY2);
+
+        // empty the cache
+        store.invalidateKeyCache();
+        assert store.getKeyCacheSize() == 0;
+
+        // insert data and force to disk
+        insertData(TABLE1, COLUMN_FAMILY2, 0, 100);
+        store.forceBlockingFlush();
+
+        // populate the cache
+        readData(TABLE1, COLUMN_FAMILY2, 0, 100);
+        assert store.getKeyCacheSize() == 100;
+
+        // really? our caches don't implement the map interface? (hence no .addAll)
+        Map<Pair<Descriptor, DecoratedKey>, Long> savedMap = new HashMap<Pair<Descriptor, DecoratedKey>, Long>();
+        for (Map.Entry<Pair<Descriptor, DecoratedKey>, Long> entry : store.getKeyCache().getEntrySet())
+        {
+            savedMap.put(entry.getKey(), entry.getValue());
+        }
+
+        // force the cache to disk
+        store.keyCache.submitWrite().get();
+
+        // empty the cache again to make sure values came from disk
+        store.invalidateKeyCache();
+        assert store.getKeyCacheSize() == 0;
+
+        // load the cache from disk
+        store.unregisterMBean(); // unregistering old MBean to test how key cache will be loaded
+        ColumnFamilyStore newStore = ColumnFamilyStore.createColumnFamilyStore(Table.open(TABLE1), COLUMN_FAMILY2);
+        assert newStore.getKeyCacheSize() == 100;
+
+        assert savedMap.size() == 100;
+        for (Map.Entry<Pair<Descriptor, DecoratedKey>, Long> entry : savedMap.entrySet())
+        {
+            assert newStore.getKeyCache().get(entry.getKey()).equals(entry.getValue());
+        }
     }
 
     public void testKeyCache(String cfName, int expectedCacheSize) throws IOException, ExecutionException, InterruptedException
@@ -87,4 +138,5 @@ public class KeyCacheTest extends Cleanu
         CompactionManager.instance.submitMajor(store, 0, Integer.MAX_VALUE).get();
         keyCacheSize = store.getKeyCacheCapacity();
         assert keyCacheSize == 1 : keyCacheSize;
-    }}
+    }
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java?rev=1084315&r1=1084314&r2=1084315&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RowCacheTest.java Tue Mar 22 19:36:18 2011
@@ -18,33 +18,33 @@
 
 package org.apache.cassandra.db;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Collection;
 
-import org.apache.cassandra.CleanupHelper;
+import org.junit.Test;
 
+import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-import org.junit.Test;
-
 public class RowCacheTest extends CleanupHelper
 {
+    private String KEYSPACE = "RowCacheSpace";
+    private String COLUMN_FAMILY_WITH_CACHE = "CachedCF";
+    private String COLUMN_FAMILY_WITHOUT_CACHE = "CFWithoutCache";
+
     @Test
     public void testRowCache() throws Exception
     {
-        String KEYSPACE = "RowCacheSpace";
-        String COLUMN_FAMILY_WITH_CACHE = "CachedCF";
-        String COLUMN_FAMILY_WITHOUT_CACHE = "CFWithoutCache";
-
         CompactionManager.instance.disableAutoCompaction();
 
         Table table = Table.open(KEYSPACE);
         ColumnFamilyStore cachedStore  = table.getColumnFamilyStore(COLUMN_FAMILY_WITH_CACHE);
         ColumnFamilyStore noCacheStore = table.getColumnFamilyStore(COLUMN_FAMILY_WITHOUT_CACHE);
 
+        // empty the row cache
+        cachedStore.invalidateRowCache();
+
         // inserting 100 rows into both column families
         insertData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, 100);
         insertData(KEYSPACE, COLUMN_FAMILY_WITHOUT_CACHE, 0, 100);
@@ -109,17 +109,37 @@ public class RowCacheTest extends Cleanu
         }
     }
 
-    private void insertData(String keyspace, String columnFamily, int offset, int numberOfRows) throws IOException
+    @Test
+    public void testRowCacheLoad() throws Exception
     {
-        for (int i = offset; i < offset + numberOfRows; i++)
-        {
-            ByteBuffer key = ByteBufferUtil.bytes("key" + i);
-            RowMutation rowMutation = new RowMutation(keyspace, key);
-            QueryPath path = new QueryPath(columnFamily, null, ByteBufferUtil.bytes("col" + i));
+        CompactionManager.instance.disableAutoCompaction();
+
+        ColumnFamilyStore store = Table.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY_WITH_CACHE);
+
+        // empty the cache
+        store.invalidateRowCache();
+        assert store.getRowCacheSize() == 0;
+
+        // insert data and fill the cache
+        insertData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, 100);
+        readData(KEYSPACE, COLUMN_FAMILY_WITH_CACHE, 0, 100);
+        assert store.getRowCacheSize() == 100;
 
-            rowMutation.add(path, ByteBufferUtil.bytes("val" + i), System.currentTimeMillis());
-            rowMutation.applyUnsafe();
+        // force the cache to disk
+        store.rowCache.submitWrite().get();
+
+        // empty the cache again to make sure values came from disk
+        store.invalidateRowCache();
+        assert store.getRowCacheSize() == 0;
+
+        // load the cache from disk
+        store.rowCache.readSaved();
+        assert store.getRowCacheSize() == 100;
+
+        for (int i = 0; i < 100; i++)
+        {
+            // verify the correct data was found
+            assert store.getRawCachedRow(Util.dk("key" + i)).getColumn(ByteBufferUtil.bytes("col" + i)).value().equals(ByteBufferUtil.bytes("val" + i));
         }
     }
-
 }