You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2011/11/11 17:55:34 UTC

svn commit: r1200948 - in /cassandra/trunk: ./ bin/ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/cql/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cas...

Author: xedin
Date: Fri Nov 11 16:55:33 2011
New Revision: 1200948

URL: http://svn.apache.org/viewvc?rev=1200948&view=rev
Log:
merge from 1.0

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/bin/cqlsh   (props changed)
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
    cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7:1026516-1183000
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0:1167085-1200945
 /cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Nov 11 16:55:33 2011
@@ -16,6 +16,9 @@
  * automatically compute sha1 sum for uncompressed data files (CASSANDRA-3456)
  * fix reading metadata/statistics component for version < h (CASSANDRA-3474)
  * add sstable forward-compatibility (CASSANDRA-3478)
+ * report compression ratio in CFSMBean (CASSANDRA-3393)
+ * fix incorrect size exception during streaming of counters (CASSANDRA-3481)
+ * (CQL) fix for counter decrement syntax (CASSANDRA-3418)
 Merged from 0.8:
  * Make counter shard merging thread safe (CASSANDRA-3178)
  * fix updating CF row_cache_provider (CASSANDRA-3414)
@@ -26,7 +29,7 @@ Merged from 0.8:
  * Revert CASSANDRA-2855
  * Fix bug preventing the use of efficient cross-DC writes (CASSANDRA-3472)
  * `describe ring` command for CLI (CASSANDRA-3220)
-
+ * (Hadoop) skip empty rows when entire row is requested, redux (CASSANDRA-2855)
 
 1.0.2
  * "defragment" rows for name-based queries under STCS (CASSANDRA-2503)
@@ -46,6 +49,7 @@ Merged from 0.8:
  * acquire compactionlock during truncate (CASSANDRA-3399)
  * fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415)
 
+
 1.0.1
  * acquire references during index build to prevent delete problems
    on Windows (CASSANDRA-3314)
@@ -124,6 +128,7 @@ Merged from 0.8:
  * remove incorrect optimization from slice read path (CASSANDRA-3390)
  * Fix race in AntiEntropyService (CASSANDRA-3400)
 
+
 1.0.0-final
  * close scrubbed sstable fd before deleting it (CASSANDRA-3318)
  * fix bug preventing obsolete commitlog segments from being removed
@@ -143,6 +148,7 @@ Merged from 0.8:
  * Fix hsha thrift server (CASSANDRA-3346)
  * Make sure repair only stream needed sstables (CASSANDRA-3345)
 
+
 1.0.0-rc2
  * Log a meaningful warning when a node receives a message for a repair session
    that doesn't exist anymore (CASSANDRA-3256)
@@ -165,6 +171,7 @@ Merged from 0.8:
  * Evict gossip state immediately when a token is taken over by a new IP 
    (CASSANDRA-3259)
 
+
 1.0.0-rc1
  * Update CQL to generate microsecond timestamps by default (CASSANDRA-3227)
  * Fix counting CFMetadata towards Memtable liveRatio (CASSANDRA-3023)
@@ -187,6 +194,7 @@ Merged from 0.8:
  * File descriptor limit increased in packaging (CASSANDRA-3206)
  * Fix deadlock in commit log during flush (CASSANDRA-3253) 
 
+
 1.0.0-beta1
  * removed binarymemtable (CASSANDRA-2692)
  * add commitlog_total_space_in_mb to prevent fragmented logs (CASSANDRA-2427)
@@ -273,6 +281,7 @@ Merged from 0.8:
  * Pluggable compaction strategy (CASSANDRA-1610)
  * Add new broadcast_address config option (CASSANDRA-2491)
 
+
 0.8.7
  * Kill server on wrapped OOME such as from FileChannel.map (CASSANDRA-3201)
  * Allow using quotes in "USE <keyspace>;" CLI command (CASSANDRA-3208)
@@ -308,6 +317,7 @@ Merged from 0.8:
  * Allow using number as DC name when creating keyspace in CQL (CASSANDRA-3239)
  * Force flush of system table after updating/removing a token (CASSANDRA-3243)
 
+
 0.8.6
  * revert CASSANDRA-2388
  * change TokenRange.endpoints back to listen/broadcast address to match

Propchange: cassandra/trunk/bin/cqlsh
------------------------------------------------------------------------------
    svn:executable = *

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
 /cassandra/branches/cassandra-0.7/contrib:1026516-1183000
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0/contrib:1167085-1200945
 /cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1183000
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1200945
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1183000
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1200945
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1183000
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1200945
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1183000
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1200945
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Nov 11 16:55:33 2011
@@ -1,10 +1,10 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1183000
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1198724,1198726-1199259,1199284,1200226,1200471
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1200153,1200227
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1200945
 /cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g Fri Nov 11 16:55:33 2011
@@ -59,6 +59,13 @@ options {
         if (recognitionErrors.size() > 0)
             throw new InvalidRequestException(recognitionErrors.get((recognitionErrors.size()-1)));
     }
+
+    // used by UPDATE of the counter columns to validate if '-' was supplied by user
+    public void validateMinusSupplied(Object op, final Term value, IntStream stream) throws MissingTokenException
+    {
+        if (op == null && Long.parseLong(value.getText()) > 0)
+            throw new MissingTokenException(102, stream, value);
+    }
 }
 
 @lexer::header {
@@ -455,10 +462,16 @@ termPair[Map<Term, Term> columns]
     :   key=term '=' value=term { columns.put(key, value); }
     ;
 
+intTerm returns [Term integer]
+    : t=INTEGER { $integer = new Term($t.text, $t.type); }
+    ;
+
 termPairWithOperation[Map<Term, Operation> columns]
     : key=term '=' (value=term { columns.put(key, new Operation(value)); }
-		    | c=term ( '+' v=term { columns.put(key, new Operation(c, org.apache.cassandra.cql.Operation.OperationType.PLUS, v)); }
-                            | '-' v=term { columns.put(key, new Operation(c, org.apache.cassandra.cql.Operation.OperationType.MINUS, v)); } ))
+		               | c=term ( '+'  v=term { columns.put(key, new Operation(c, org.apache.cassandra.cql.Operation.OperationType.PLUS, v)); }
+                            | op='-'? v=intTerm
+                                  { validateMinusSupplied(op, v, input);
+                                    columns.put(key, new Operation(c, org.apache.cassandra.cql.Operation.OperationType.MINUS, v)); } ))
     ;
 
 // Note: ranges are inclusive so >= and >, and < and <= all have the same semantics.  

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java Fri Nov 11 16:55:33 2011
@@ -220,7 +220,7 @@ public class UpdateStatement extends Abs
 
                     if (op.type == OperationType.MINUS)
                     {
-                        value *= -1;
+                        if (value > 0) value *= -1;
                     }
                 }
                 catch (NumberFormatException e)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java Fri Nov 11 16:55:33 2011
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.sstable.SSTableMetadata;
 
@@ -114,10 +115,10 @@ public class ColumnFamilySerializer impl
 
     public ColumnFamily deserialize(DataInput dis) throws IOException
     {
-        return deserialize(dis, false, ThreadSafeSortedColumns.factory());
+        return deserialize(dis, IColumnSerializer.Flag.LOCAL, ThreadSafeSortedColumns.factory());
     }
 
-    public ColumnFamily deserialize(DataInput dis, boolean fromRemote, ISortedColumns.Factory factory) throws IOException
+    public ColumnFamily deserialize(DataInput dis, IColumnSerializer.Flag flag, ISortedColumns.Factory factory) throws IOException
     {
         if (!dis.readBoolean())
             return null;
@@ -128,22 +129,22 @@ public class ColumnFamilySerializer impl
             throw new UnserializableColumnFamilyException("Couldn't find cfId=" + cfId, cfId);
         ColumnFamily cf = ColumnFamily.create(cfId, factory);
         deserializeFromSSTableNoColumns(cf, dis);
-        deserializeColumns(dis, cf, fromRemote);
+        deserializeColumns(dis, cf, flag);
         return cf;
     }
 
-    public void deserializeColumns(DataInput dis, ColumnFamily cf, boolean fromRemote) throws IOException
+    public void deserializeColumns(DataInput dis, ColumnFamily cf, IColumnSerializer.Flag flag) throws IOException
     {
         int size = dis.readInt();
-        deserializeColumns(dis, cf, size, fromRemote);
+        deserializeColumns(dis, cf, size, flag);
     }
 
     /* column count is already read from DataInput */
-    public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, boolean fromRemote) throws IOException
+    public void deserializeColumns(DataInput dis, ColumnFamily cf, int size, IColumnSerializer.Flag flag) throws IOException
     {
         for (int i = 0; i < size; ++i)
         {
-            IColumn column = cf.getColumnSerializer().deserialize(dis, fromRemote, (int) (System.currentTimeMillis() / 1000));
+            IColumn column = cf.getColumnSerializer().deserialize(dis, flag, (int) (System.currentTimeMillis() / 1000));
             cf.addColumn(column);
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Nov 11 16:55:33 2011
@@ -1772,6 +1772,12 @@ public class ColumnFamilyStore implement
         return data.getEstimatedColumnCountHistogram();
     }
 
+    @Override
+    public double getCompressionRatio()
+    {
+        return data.getCompressionRatio();
+    }
+    
     /** true if this CFS contains secondary index data */
     public boolean isIndex()
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Fri Nov 11 16:55:33 2011
@@ -213,6 +213,7 @@ public interface ColumnFamilyStoreMBean
 
     public long[] getEstimatedRowSizeHistogram();
     public long[] getEstimatedColumnCountHistogram();
+    public double getCompressionRatio();
 
     /**
      * Returns a list of the names of the built column indexes for current store

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Fri Nov 11 16:55:33 2011
@@ -70,7 +70,7 @@ public class ColumnSerializer implements
 
     public Column deserialize(DataInput dis) throws IOException
     {
-        return deserialize(dis, false);
+        return deserialize(dis, Flag.LOCAL);
     }
 
     /*
@@ -78,12 +78,12 @@ public class ColumnSerializer implements
      * deserialize comes from a remote host. If it does, then we must clear
      * the delta.
      */
-    public Column deserialize(DataInput dis, boolean fromRemote) throws IOException
+    public Column deserialize(DataInput dis, IColumnSerializer.Flag flag) throws IOException
     {
-        return deserialize(dis, fromRemote, (int) (System.currentTimeMillis() / 1000));
+        return deserialize(dis, flag, (int) (System.currentTimeMillis() / 1000));
     }
 
-    public Column deserialize(DataInput dis, boolean fromRemote, int expireBefore) throws IOException
+    public Column deserialize(DataInput dis, IColumnSerializer.Flag flag, int expireBefore) throws IOException
     {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         if (name.remaining() <= 0)
@@ -104,7 +104,7 @@ public class ColumnSerializer implements
             long timestampOfLastDelete = dis.readLong();
             long ts = dis.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(dis);
-            return CounterColumn.create(name, value, ts, timestampOfLastDelete, fromRemote);
+            return CounterColumn.create(name, value, ts, timestampOfLastDelete, flag);
         }
         else if ((b & EXPIRATION_MASK) != 0)
         {
@@ -112,7 +112,7 @@ public class ColumnSerializer implements
             int expiration = dis.readInt();
             long ts = dis.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(dis);
-            return ExpiringColumn.create(name, value, ts, ttl, expiration, expireBefore);
+            return ExpiringColumn.create(name, value, ts, ttl, expiration, expireBefore, flag);
         }
         else
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java Fri Nov 11 16:55:33 2011
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.context.C
 import org.apache.cassandra.db.context.IContext.ContextRelationship;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.service.IWriteResponseHandler;
@@ -76,11 +77,11 @@ public class CounterColumn extends Colum
         this.timestampOfLastDelete = timestampOfLastDelete;
     }
 
-    public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, boolean fromRemote)
+    public static CounterColumn create(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete, IColumnSerializer.Flag flag)
     {
         // #elt being negative means we have to clean delta
         short count = value.getShort(value.position());
-        if (fromRemote || count < 0)
+        if (flag == IColumnSerializer.Flag.FROM_REMOTE || (flag == IColumnSerializer.Flag.LOCAL && count < 0))
             value = CounterContext.instance().clearAllDelta(value);
         return new CounterColumn(name, value, timestamp, timestampOfLastDelete);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Nov 11 16:55:33 2011
@@ -412,6 +412,21 @@ public class DataTracker
         return histogram;
     }
 
+    public double getCompressionRatio()
+    {
+        double sum = 0;
+        int total = 0;
+        for (SSTableReader sstable : getSSTables())
+        {
+            if (sstable.getCompressionRatio() != Double.MIN_VALUE)
+            {
+                sum += sstable.getCompressionRatio();
+                total++;
+            }
+        }
+        return total != 0 ? (double)sum/total: 0;
+    }
+
     public long getMinRowSize()
     {
         long min = 0;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java Fri Nov 11 16:55:33 2011
@@ -25,6 +25,7 @@ import java.security.MessageDigest;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -60,9 +61,9 @@ public class ExpiringColumn extends Colu
     }
 
     /** @return Either a DeletedColumn, or an ExpiringColumn. */
-    public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore)
+    public static Column create(ByteBuffer name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, IColumnSerializer.Flag flag)
     {
-        if (localExpirationTime >= expireBefore)
+        if (localExpirationTime >= expireBefore || flag == IColumnSerializer.Flag.PRESERVE_SIZE)
             return new ExpiringColumn(name, value, timestamp, timeToLive, localExpirationTime);
         // the column is now expired, we can safely return a simple tombstone
         return new DeletedColumn(name, localExpirationTime, timestamp);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Fri Nov 11 16:55:33 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -105,7 +106,7 @@ class ReadResponseSerializer implements 
         if (!isDigest)
         {
             // This is coming from a remote host
-            row = Row.serializer().deserialize(dis, version, true, ArrayBackedSortedColumns.factory());
+            row = Row.serializer().deserialize(dis, version, IColumnSerializer.Flag.FROM_REMOTE, ArrayBackedSortedColumns.factory());
         }
 
         return isDigest ? new ReadResponse(ByteBuffer.wrap(digest)) : new ReadResponse(row);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Fri Nov 11 16:55:33 2011
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 
 import java.io.*;
 
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -61,15 +62,15 @@ public class Row
             ColumnFamily.serializer().serialize(row.cf, dos);
         }
 
-        public Row deserialize(DataInput dis, int version, boolean fromRemote, ISortedColumns.Factory factory) throws IOException
+        public Row deserialize(DataInput dis, int version, IColumnSerializer.Flag flag, ISortedColumns.Factory factory) throws IOException
         {
             return new Row(StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(dis)),
-                           ColumnFamily.serializer().deserialize(dis, fromRemote, factory));
+                           ColumnFamily.serializer().deserialize(dis, flag, factory));
         }
 
         public Row deserialize(DataInput dis, int version) throws IOException
         {
-            return deserialize(dis, version, false, ThreadSafeSortedColumns.factory());
+            return deserialize(dis, version, IColumnSerializer.Flag.LOCAL, ThreadSafeSortedColumns.factory());
         }
 
         public long serializedSize(Row row, int version)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Fri Nov 11 16:55:33 2011
@@ -31,6 +31,7 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.Message;
@@ -388,7 +389,7 @@ public class RowMutation implements IMut
             }
         }
 
-        public RowMutation deserialize(DataInput dis, int version, boolean fromRemote) throws IOException
+        public RowMutation deserialize(DataInput dis, int version, IColumnSerializer.Flag flag) throws IOException
         {
             String table = dis.readUTF();
             ByteBuffer key = ByteBufferUtil.readWithShortLength(dis);
@@ -397,7 +398,7 @@ public class RowMutation implements IMut
             for (int i = 0; i < size; ++i)
             {
                 Integer cfid = Integer.valueOf(dis.readInt());
-                ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, fromRemote, ThreadSafeSortedColumns.factory());
+                ColumnFamily cf = ColumnFamily.serializer().deserialize(dis, flag, ThreadSafeSortedColumns.factory());
                 modifications.put(cfid, cf);
             }
             return new RowMutation(table, key, modifications);
@@ -405,7 +406,7 @@ public class RowMutation implements IMut
 
         public RowMutation deserialize(DataInput dis, int version) throws IOException
         {
-            return deserialize(dis, version, true);
+            return deserialize(dis, version, IColumnSerializer.Flag.FROM_REMOTE);
         }
 
         public long serializedSize(RowMutation rm, int version)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Fri Nov 11 16:55:33 2011
@@ -336,15 +336,15 @@ class SuperColumnSerializer implements I
 
     public IColumn deserialize(DataInput dis) throws IOException
     {
-        return deserialize(dis, false);
+        return deserialize(dis, IColumnSerializer.Flag.LOCAL);
     }
 
-    public IColumn deserialize(DataInput dis, boolean fromRemote) throws IOException
+    public IColumn deserialize(DataInput dis, IColumnSerializer.Flag flag) throws IOException
     {
-        return deserialize(dis, fromRemote, (int)(System.currentTimeMillis() / 1000));
+        return deserialize(dis, flag, (int)(System.currentTimeMillis() / 1000));
     }
 
-    public IColumn deserialize(DataInput dis, boolean fromRemote, int expireBefore) throws IOException
+    public IColumn deserialize(DataInput dis, IColumnSerializer.Flag flag, int expireBefore) throws IOException
     {
         ByteBuffer name = ByteBufferUtil.readWithShortLength(dis);
         int localDeleteTime = dis.readInt();
@@ -357,7 +357,7 @@ class SuperColumnSerializer implements I
         /* read the number of columns */
         int size = dis.readInt();
         ColumnSerializer serializer = Column.serializer();
-        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, fromRemote, expireBefore);
+        ColumnSortedMap preSortedMap = new ColumnSortedMap(comparator, serializer, dis, size, flag, expireBefore);
         SuperColumn superColumn = new SuperColumn(name, ThreadSafeSortedColumns.factory().fromSorted(preSortedMap, false));
         superColumn.delete(localDeleteTime, markedForDeleteAt);
         return superColumn;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Fri Nov 11 16:55:33 2011
@@ -42,6 +42,7 @@ import org.apache.cassandra.concurrent.S
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -281,7 +282,7 @@ public class CommitLog implements Commit
                     {
                         // assuming version here. We've gone to lengths to make sure what gets written to the CL is in
                         // the current version.  so do make sure the CL is drained prior to upgrading a node.
-                        rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, false);
+                        rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), MessagingService.version_, IColumnSerializer.Flag.LOCAL);
                     }
                     catch (UnserializableColumnFamilyException ex)
                     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Fri Nov 11 16:55:33 2011
@@ -41,6 +41,7 @@ import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -55,6 +56,7 @@ public class ColumnFamilyRecordReader ex
     private RowIterator iter;
     private Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> currentRow;
     private SlicePredicate predicate;
+    private boolean isEmptyPredicate;
     private int totalRowCount; // total number of rows to fetch
     private int batchRowCount; // fetch this many per batch
     private String cfName;
@@ -89,11 +91,33 @@ public class ColumnFamilyRecordReader ex
         return ((float)iter.rowsRead()) / totalRowCount;
     }
     
+    static boolean isEmptyPredicate(SlicePredicate predicate)
+    {
+        if (predicate == null)
+            return true;
+              
+        if (predicate.isSetColumn_names() && predicate.getSlice_range() == null)
+            return false;
+        
+        if (predicate.getSlice_range() == null)
+            return true;
+        
+        byte[] start  = predicate.getSlice_range().getStart();
+        byte[] finish = predicate.getSlice_range().getFinish(); 
+        if ( (start == null || start == ArrayUtils.EMPTY_BYTE_ARRAY) &&
+             (finish == null || finish == ArrayUtils.EMPTY_BYTE_ARRAY) )
+            return true;
+        
+        
+        return false;       
+    }
+    
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
     {
         this.split = (ColumnFamilySplit) split;
         Configuration conf = context.getConfiguration();
         predicate = ConfigHelper.getInputSlicePredicate(conf);
+        isEmptyPredicate = isEmptyPredicate(predicate);
         totalRowCount = ConfigHelper.getInputSplitSize(conf);
         batchRowCount = ConfigHelper.getRangeBatchSize(conf);
         cfName = ConfigHelper.getInputColumnFamily(conf);
@@ -237,6 +261,7 @@ public class ColumnFamilyRecordReader ex
             } 
             else if (startToken.equals(split.getEndToken()))
             {
+                // reached end of the split
                 rows = null;
                 return;
             }
@@ -257,14 +282,37 @@ public class ColumnFamilyRecordReader ex
                     rows = null;
                     return;
                 }
-                               
-                // reset to iterate through this new batch
-                i = 0;
                 
                 // prepare for the next slice to be read
                 KeySlice lastRow = rows.get(rows.size() - 1);
                 ByteBuffer rowkey = lastRow.key;
                 startToken = partitioner.getTokenFactory().toString(partitioner.getToken(rowkey));
+                
+                // remove ghosts when fetching all columns
+                if (isEmptyPredicate)
+                {
+                    Iterator<KeySlice> it = rows.iterator();
+                    
+                    while(it.hasNext())
+                    {
+                        KeySlice ks = it.next();
+                        
+                        if (ks.getColumnsSize() == 0)
+                        {
+                           it.remove();
+                        }
+                    }
+                
+                    // all ghosts, spooky
+                    if (rows.isEmpty())
+                    {
+                        maybeInit();
+                        return;
+                    }
+                }
+                
+                // reset to iterate through this new batch
+                i = 0;             
             }
             catch (Exception e)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/IColumnSerializer.java Fri Nov 11 16:55:33 2011
@@ -27,5 +27,21 @@ import org.apache.cassandra.db.IColumn;
 
 public interface IColumnSerializer extends ISerializer<IColumn>
 {
-    public IColumn deserialize(DataInput in, boolean fromRemote, int expireBefore) throws IOException;
+    /**
+     * Flag affecting deserialization behavior.
+     *  - LOCAL: for deserialization of local data (Expired columns are
+     *      converted to tombstones (to gain disk space)).
+     *  - FROM_REMOTE: for deserialization of data received from remote hosts
+     *      (Expired columns are converted to tombstone and counters have
+     *      their delta cleared)
+     *  - PRESERVE_SIZE: used when no transformation must be performed, i.e,
+     *      when we must ensure that deserializing and reserializing the
+     *      result yield the exact same bytes. Streaming uses this.
+     */
+    public static enum Flag
+    {
+        LOCAL, FROM_REMOTE, PRESERVE_SIZE;
+    }
+
+    public IColumn deserialize(DataInput in, Flag flag, int expireBefore) throws IOException;
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java Fri Nov 11 16:55:33 2011
@@ -23,14 +23,15 @@ import java.io.IOException;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import org.apache.cassandra.io.sstable.SSTableMetadata.Collector;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.SequentialWriter;
 
 public class CompressedSequentialWriter extends SequentialWriter
 {
-    public static SequentialWriter open(String dataFilePath, String indexFilePath, boolean skipIOCache, CompressionParameters parameters) throws IOException
+    public static SequentialWriter open(String dataFilePath, String indexFilePath, boolean skipIOCache, CompressionParameters parameters, Collector sstableMetadataCollector) throws IOException
     {
-        return new CompressedSequentialWriter(new File(dataFilePath), indexFilePath, skipIOCache, parameters);
+        return new CompressedSequentialWriter(new File(dataFilePath), indexFilePath, skipIOCache, parameters, sstableMetadataCollector);
     }
 
     // holds offset in the file where current chunk should be written
@@ -49,7 +50,11 @@ public class CompressedSequentialWriter 
 
     private final Checksum checksum = new CRC32();
 
-    public CompressedSequentialWriter(File file, String indexFilePath, boolean skipIOCache, CompressionParameters parameters) throws IOException
+    private long originalSize = 0, compressedSize = 0;
+
+    private Collector sstableMetadataCollector;
+    
+    public CompressedSequentialWriter(File file, String indexFilePath, boolean skipIOCache, CompressionParameters parameters, Collector sstableMetadataCollector) throws IOException
     {
         super(file, parameters.chunkLength(), skipIOCache);
         this.compressor = parameters.sstableCompressor;
@@ -60,6 +65,7 @@ public class CompressedSequentialWriter 
         /* Index File (-CompressionInfo.db component) and it's header */
         metadataWriter = new CompressionMetadata.Writer(indexFilePath);
         metadataWriter.writeHeader(parameters);
+        this.sstableMetadataCollector = sstableMetadataCollector;
     }
 
     @Override
@@ -82,6 +88,9 @@ public class CompressedSequentialWriter 
         // compressing data with buffer re-use
         int compressedLength = compressor.compress(buffer, 0, validBufferBytes, compressed, 0);
 
+        originalSize += validBufferBytes;
+        compressedSize += compressedLength;
+        
         // update checksum
         checksum.update(buffer, 0, validBufferBytes);
 
@@ -179,7 +188,7 @@ public class CompressedSequentialWriter 
             return; // already closed
 
         super.close();
-
+        sstableMetadataCollector.addCompressionRatio(compressedSize, originalSize);
         metadataWriter.finalizeHeader(current, chunkCount);
         metadataWriter.close();
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java Fri Nov 11 16:55:33 2011
@@ -56,7 +56,7 @@ public class Descriptor
     // f (0.7.0): switched bloom filter implementations in data component
     // g (0.8): tracks flushed-at context in metadata component
     // h (1.0): tracks max client timestamp in metadata component
-    public static final String CURRENT_VERSION = "h";
+    public static final String CURRENT_VERSION = "hb";
 
     public final File directory;
     /** version has the following format: <code>[a-z]+</code> */
@@ -74,6 +74,7 @@ public class Descriptor
     public final boolean usesOldBloomFilter;
     public final boolean metadataIncludesReplayPosition;
     public final boolean tracksMaxTimestamp;
+    public final boolean hasCompressionRatio;
 
     public enum TempState
     {
@@ -115,6 +116,7 @@ public class Descriptor
         usesOldBloomFilter = version.compareTo("f") < 0;
         metadataIncludesReplayPosition = version.compareTo("g") >= 0;
         tracksMaxTimestamp = version.compareTo("h") >= 0;
+        hasCompressionRatio = version.compareTo("hb") >= 0;
         isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Fri Nov 11 16:55:33 2011
@@ -30,6 +30,7 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
 import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.BytesReadTracker;
 
@@ -41,7 +42,7 @@ public class SSTableIdentityIterator imp
     private final DataInput input;
     private final long dataStart;
     public final long dataSize;
-    public final boolean fromRemote;
+    public final IColumnSerializer.Flag flag;
 
     private final ColumnFamily columnFamily;
     private final int columnCount;
@@ -82,17 +83,17 @@ public class SSTableIdentityIterator imp
     public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey<?> key, long dataStart, long dataSize, boolean checkData)
     throws IOException
     {
-        this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, false);
+        this(sstable.metadata, file, key, dataStart, dataSize, checkData, sstable, IColumnSerializer.Flag.LOCAL);
     }
 
-    public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey<?> key, long dataStart, long dataSize, boolean fromRemote)
+    public SSTableIdentityIterator(CFMetaData metadata, DataInput file, DecoratedKey<?> key, long dataStart, long dataSize, IColumnSerializer.Flag flag)
     throws IOException
     {
-        this(metadata, file, key, dataStart, dataSize, false, null, fromRemote);
+        this(metadata, file, key, dataStart, dataSize, false, null, flag);
     }
 
     // sstable may be null *if* deserializeRowHeader is false
-    private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey<?> key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, boolean fromRemote)
+    private SSTableIdentityIterator(CFMetaData metadata, DataInput input, DecoratedKey<?> key, long dataStart, long dataSize, boolean checkData, SSTableReader sstable, IColumnSerializer.Flag flag)
     throws IOException
     {
         this.input = input;
@@ -101,7 +102,7 @@ public class SSTableIdentityIterator imp
         this.dataStart = dataStart;
         this.dataSize = dataSize;
         this.expireBefore = (int)(System.currentTimeMillis() / 1000);
-        this.fromRemote = fromRemote;
+        this.flag = flag;
         this.validateColumns = checkData;
 
         try
@@ -173,7 +174,7 @@ public class SSTableIdentityIterator imp
     {
         try
         {
-            IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, fromRemote, expireBefore);
+            IColumn column = columnFamily.getColumnSerializer().deserialize(inputWithTracker, flag, expireBefore);
             if (validateColumns)
                 column.validateFields(columnFamily.metadata());
             return column;
@@ -228,7 +229,7 @@ public class SSTableIdentityIterator imp
         assert inputWithTracker.getBytesRead() == headerSize();
         ColumnFamily cf = columnFamily.cloneMeShallow(ArrayBackedSortedColumns.factory(), false);
         // since we already read column count, just pass that value and continue deserialization
-        ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, fromRemote);
+        ColumnFamily.serializer().deserializeColumns(inputWithTracker, cf, columnCount, flag);
         if (validateColumns)
         {
             try

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java Fri Nov 11 16:55:33 2011
@@ -51,19 +51,21 @@ public class SSTableMetadata
     protected final EstimatedHistogram estimatedColumnCount;
     protected final ReplayPosition replayPosition;
     protected final long maxTimestamp;
+    protected final double compressionRatio;
     public static final SSTableMetadataSerializer serializer = new SSTableMetadataSerializer();
 
     private SSTableMetadata()
     {
-        this(defaultRowSizeHistogram(), defaultColumnCountHistogram(), ReplayPosition.NONE, Long.MIN_VALUE);
+        this(defaultRowSizeHistogram(), defaultColumnCountHistogram(), ReplayPosition.NONE, Long.MIN_VALUE, Double.MIN_VALUE);
     }
 
-    private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp)
+    private SSTableMetadata(EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition replayPosition, long maxTimestamp, double cr)
     {
         this.estimatedRowSize = rowSizes;
         this.estimatedColumnCount = columnCounts;
         this.replayPosition = replayPosition;
         this.maxTimestamp = maxTimestamp;
+        this.compressionRatio = cr;
     }
 
     public static SSTableMetadata createDefaultInstance()
@@ -96,6 +98,11 @@ public class SSTableMetadata
         return maxTimestamp;
     }
 
+    public double getCompressionRatio()
+    {
+        return compressionRatio;
+    }
+
     static EstimatedHistogram defaultColumnCountHistogram()
     {
         // EH of 114 can track a max value of 2395318855, i.e., > 2B columns
@@ -114,6 +121,7 @@ public class SSTableMetadata
         protected EstimatedHistogram estimatedColumnCount;
         protected ReplayPosition replayPosition;
         protected long maxTimestamp;
+        protected double compressionRatio;
 
         private Collector()
         {
@@ -121,6 +129,7 @@ public class SSTableMetadata
             this.estimatedColumnCount = defaultColumnCountHistogram();
             this.replayPosition = ReplayPosition.NONE;
             this.maxTimestamp = Long.MIN_VALUE;
+            this.compressionRatio = Double.MIN_VALUE;
         }
 
         public void addRowSize(long rowSize)
@@ -133,6 +142,15 @@ public class SSTableMetadata
             estimatedColumnCount.add(columnCount);
         }
 
+        /**
+         * Ratio is compressed/uncompressed and it is
+         * if you have 1.x then compression isn't helping 
+         */
+        public void addCompressionRatio(long compressed, long uncompressed)
+        {
+            compressionRatio = (double) compressed/uncompressed;
+        }
+        
         public void updateMaxTimestamp(long potentialMax)
         {
             maxTimestamp = Math.max(maxTimestamp, potentialMax);
@@ -140,7 +158,7 @@ public class SSTableMetadata
 
         public SSTableMetadata finalizeMetadata()
         {
-            return new SSTableMetadata(estimatedRowSize, estimatedColumnCount, replayPosition, maxTimestamp);
+            return new SSTableMetadata(estimatedRowSize, estimatedColumnCount, replayPosition, maxTimestamp, compressionRatio);
         }
 
         public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize)
@@ -172,6 +190,7 @@ public class SSTableMetadata
             EstimatedHistogram.serializer.serialize(sstableStats.getEstimatedColumnCount(), dos);
             ReplayPosition.serializer.serialize(sstableStats.getReplayPosition(), dos);
             dos.writeLong(sstableStats.getMaxTimestamp());
+            dos.writeDouble(sstableStats.getCompressionRatio());
         }
 
         public SSTableMetadata deserialize(Descriptor descriptor) throws IOException
@@ -187,7 +206,7 @@ public class SSTableMetadata
             DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile)));
             try
             {
-                return deserialize(dis, descriptor.metadataIncludesReplayPosition, descriptor.tracksMaxTimestamp);
+                return deserialize(dis, descriptor);
             }
             finally
             {
@@ -195,16 +214,18 @@ public class SSTableMetadata
             }
         }
 
-        public SSTableMetadata deserialize(DataInputStream dis, boolean includesReplayPosition, boolean tracksMaxTimestamp) throws IOException
+        public SSTableMetadata deserialize(DataInputStream dis, Descriptor desc) throws IOException
         {
             EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(dis);
             EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(dis);
-            ReplayPosition replayPosition = includesReplayPosition
+            ReplayPosition replayPosition = desc.metadataIncludesReplayPosition
                                           ? ReplayPosition.serializer.deserialize(dis)
                                           : ReplayPosition.NONE;
-            long maxTimestamp = tracksMaxTimestamp ? dis.readLong() : Long.MIN_VALUE;
-
-            return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp);
+            long maxTimestamp = desc.tracksMaxTimestamp ? dis.readLong() : Long.MIN_VALUE;
+            double compressionRatio = desc.hasCompressionRatio
+                                        ? dis.readDouble()
+                                        : Double.MIN_VALUE;
+            return new SSTableMetadata(rowSizes, columnCounts, replayPosition, maxTimestamp, compressionRatio);
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Nov 11 16:55:33 2011
@@ -899,6 +899,11 @@ public class SSTableReader extends SSTab
         return sstableMetadata.getEstimatedColumnCount();
     }
 
+    public double getCompressionRatio()
+    {
+        return sstableMetadata.getCompressionRatio();
+    }
+
     public ReplayPosition getReplayPosition()
     {
         return sstableMetadata.getReplayPosition();

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Fri Nov 11 16:55:33 2011
@@ -34,6 +34,7 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.StorageService;
@@ -89,7 +90,8 @@ public class SSTableWriter extends SSTab
             dataFile = CompressedSequentialWriter.open(getFilename(),
                                                        descriptor.filenameFor(Component.COMPRESSION_INFO),
                                                        true,
-                                                       metadata.compressionParameters());
+                                                       metadata.compressionParameters(),
+                                                       sstableMetadataCollector);
         }
         else
         {
@@ -230,8 +232,9 @@ public class SSTableWriter extends SSTab
         ColumnFamily cf = ColumnFamily.create(metadata, ArrayBackedSortedColumns.factory());
         for (int i = 0; i < columnCount; i++)
         {
-            // deserialize column with fromRemote false, in order to keep size of streamed column
-            IColumn column = cf.getColumnSerializer().deserialize(in, false, Integer.MIN_VALUE);
+            // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the
+            // data size received, so we must reserialize the exact same data
+            IColumn column = cf.getColumnSerializer().deserialize(in, IColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE);
             if (column instanceof CounterColumn)
             {
                 column = ((CounterColumn) column).markDeltaToBeCleared();

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/ColumnSortedMap.java Fri Nov 11 16:55:33 2011
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ColumnSerializer;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.io.IColumnSerializer;
 
 /**
  * Facade over a DataInput that contains IColumns in sorted order.
@@ -43,16 +44,16 @@ public class ColumnSortedMap implements 
     private final DataInput dis;
     private final Comparator<ByteBuffer> comparator;
     private final int length;
-    private final boolean fromRemote;
+    private final IColumnSerializer.Flag flag;
     private final int expireBefore;
 
-    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
+    public ColumnSortedMap(Comparator<ByteBuffer> comparator, ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
     {
         this.comparator = comparator;
         this.serializer = serializer;
         this.dis = dis;
         this.length = length;
-        this.fromRemote = fromRemote;
+        this.flag = flag;
         this.expireBefore = expireBefore;
     }
 
@@ -143,7 +144,7 @@ public class ColumnSortedMap implements 
 
     public Set<Map.Entry<ByteBuffer, IColumn>> entrySet()
     {
-        return new ColumnSet(serializer, dis, length, fromRemote, expireBefore);
+        return new ColumnSet(serializer, dis, length, flag, expireBefore);
     }
 }
 
@@ -152,15 +153,15 @@ class ColumnSet implements Set<Map.Entry
     private final ColumnSerializer serializer;
     private final DataInput dis;
     private final int length;
-    private boolean fromRemote;
+    private IColumnSerializer.Flag flag;
     private final int expireBefore;
 
-    public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
+    public ColumnSet(ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
     {
         this.serializer = serializer;
         this.dis = dis;
         this.length = length;
-        this.fromRemote = fromRemote;
+        this.flag = flag;
         this.expireBefore = expireBefore;
     }
 
@@ -181,7 +182,7 @@ class ColumnSet implements Set<Map.Entry
 
     public Iterator<Entry<ByteBuffer, IColumn>> iterator()
     {
-        return new ColumnIterator(serializer, dis, length, fromRemote, expireBefore);
+        return new ColumnIterator(serializer, dis, length, flag, expireBefore);
     }
 
     public Object[] toArray()
@@ -234,16 +235,16 @@ class ColumnIterator implements Iterator
     private final ColumnSerializer serializer;
     private final DataInput dis;
     private final int length;
-    private final boolean fromRemote;
+    private final IColumnSerializer.Flag flag;
     private int count = 0;
     private final int expireBefore;
 
-    public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, boolean fromRemote, int expireBefore)
+    public ColumnIterator(ColumnSerializer serializer, DataInput dis, int length, IColumnSerializer.Flag flag, int expireBefore)
     {
         this.dis = dis;
         this.serializer = serializer;
         this.length = length;
-        this.fromRemote = fromRemote;
+        this.flag = flag;
         this.expireBefore = expireBefore;
     }
 
@@ -252,7 +253,7 @@ class ColumnIterator implements Iterator
         try
         {
             count++;
-            return serializer.deserialize(dis, fromRemote, expireBefore);
+            return serializer.deserialize(dis, flag, expireBefore);
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Fri Nov 11 16:55:33 2011
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.db.compaction.PrecompactedRow;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
@@ -129,7 +130,8 @@ public class IncomingStreamReader
                         // need to update row cache
                         if (controller == null)
                             controller = new CompactionController(cfs, Collections.<SSTableReader>emptyList(), Integer.MIN_VALUE, true);
-                        SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, true);
+                        // Note: Because we won't just echo the columns, there is no need to use the PRESERVE_SIZE flag, contrarily to what appendFromStream does below
+                        SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, key, 0, dataSize, IColumnSerializer.Flag.FROM_REMOTE);
                         PrecompactedRow row = new PrecompactedRow(controller, Collections.singletonList(iter));
                         // We don't expire anything so the row shouldn't be empty
                         assert !row.isEmpty();

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java Fri Nov 11 16:55:33 2011
@@ -39,6 +39,7 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.db.context.CounterContext;
 import static org.apache.cassandra.db.context.CounterContext.ContextState;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.IColumnSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.Allocator;
 import org.apache.cassandra.utils.HeapAllocator;
@@ -295,7 +296,7 @@ public class CounterColumnTest extends S
         assert original.equals(deserialized);
 
         bufIn = new ByteArrayInputStream(serialized, 0, serialized.length);
-        CounterColumn deserializedOnRemote = (CounterColumn)Column.serializer().deserialize(new DataInputStream(bufIn), true);
+        CounterColumn deserializedOnRemote = (CounterColumn)Column.serializer().deserialize(new DataInputStream(bufIn), IColumnSerializer.Flag.FROM_REMOTE);
         assert deserializedOnRemote.name().equals(original.name());
         assert deserializedOnRemote.total() == original.total();
         assert deserializedOnRemote.value().equals(cc.clearAllDelta(original.value()));

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java Fri Nov 11 16:55:33 2011
@@ -24,6 +24,7 @@ import java.util.concurrent.Callable;
 
 import org.junit.Test;
 
+import org.apache.cassandra.io.sstable.SSTableMetadata;
 import org.apache.cassandra.io.util.*;
 
 import static org.junit.Assert.assertEquals;
@@ -54,8 +55,9 @@ public class CompressedRandomAccessReade
 
         try
         {
+            SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(null);
             SequentialWriter writer = compressed
-                ? new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance))
+                ? new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector)
                 : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
 
             writer.write("The quick ".getBytes());
@@ -104,7 +106,8 @@ public class CompressedRandomAccessReade
         File metadata = new File(file.getPath() + ".meta");
         metadata.deleteOnExit();
 
-        SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), false, new CompressionParameters(SnappyCompressor.instance));
+        SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(null);
+        SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
 
         writer.write(CONTENT.getBytes());
         writer.close();

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataSerializerTest.java Fri Nov 11 16:55:33 2011
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.DataOutputStream;
 import java.io.DataInputStream;
+import java.io.File;
 import java.io.IOException;
 
 import org.junit.Test;
@@ -58,7 +59,8 @@ public class SSTableMetadataSerializerTe
 
         ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray());
         DataInputStream dis = new DataInputStream(byteInput);
-        SSTableMetadata stats = SSTableMetadata.serializer.deserialize(dis, true, true);
+        Descriptor desc = new Descriptor(Descriptor.CURRENT_VERSION, new File("."), "", "", 0, false);
+        SSTableMetadata stats = SSTableMetadata.serializer.deserialize(dis, desc);
 
         assert stats.getEstimatedRowSize().equals(originalMetadata.getEstimatedRowSize());
         assert stats.getEstimatedRowSize().equals(rowSizes);

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1200948&r1=1200947&r2=1200948&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Fri Nov 11 16:55:33 2011
@@ -83,19 +83,13 @@ public class StreamingTransferTest exten
 
         // transfer the first and last key
         logger.debug("Transferring " + cfs.columnFamily);
-        int[] offs = new int[]{1, 3};
-        IPartitioner p = StorageService.getPartitioner();
-        List<Range> ranges = new ArrayList<Range>();
-        ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
-        ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
-        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null);
-        StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
-        session.await();
+        transfer(table, sstable);
 
         // confirm that a single SSTable was transferred and registered
         assertEquals(1, cfs.getSSTables().size());
 
         // and that the index and filter were properly recovered
+        int[] offs = new int[]{1, 3};
         List<Row> rows = Util.getRangeSlice(cfs);
         assertEquals(offs.length, rows.size());
         for (int i = 0; i < offs.length; i++)
@@ -119,6 +113,17 @@ public class StreamingTransferTest exten
         return keys;
     }
 
+    private void transfer(Table table, SSTableReader sstable) throws Exception
+    {
+        IPartitioner p = StorageService.getPartitioner();
+        List<Range> ranges = new ArrayList<Range>();
+        ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
+        ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
+        StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null);
+        StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
+        session.await();
+    }
+
     @Test
     public void testTransferTable() throws Exception
     {
@@ -222,6 +227,12 @@ public class StreamingTransferTest exten
             .write(cleanedEntries);
         SSTableReader streamed = cfs.getSSTables().iterator().next();
         SSTableUtils.assertContentEquals(cleaned, streamed);
+
+        // Retransfer the file, making sure it is now idempotent (see CASSANDRA-3481)
+        cfs.clearUnsafe();
+        transfer(table, streamed);
+        SSTableReader restreamed = cfs.getSSTables().iterator().next();
+        SSTableUtils.assertContentEquals(streamed, restreamed);
     }
 
     @Test
@@ -320,7 +331,7 @@ public class StreamingTransferTest exten
             assertEquals(entry.getKey(), rows.get(0).key);
         }
     }
- 
+
     public interface Mutator
     {
         public void mutate(String key, String col, long timestamp) throws Exception;